1
#include "source/common/network/multi_connection_base_impl.h"
2

            
3
#include <vector>
4

            
5
namespace Envoy {
6
namespace Network {
7

            
8
MultiConnectionBaseImpl::MultiConnectionBaseImpl(Event::Dispatcher& dispatcher,
9
                                                 ConnectionProviderPtr connection_provider)
10
105
    : id_(ConnectionImpl::next_global_id_++), dispatcher_(dispatcher),
11
105
      connection_provider_(std::move(connection_provider)),
12
105
      next_attempt_timer_(dispatcher_.createTimer([this]() -> void { tryAnotherConnection(); })) {
13
105
  ENVOY_LOG_EVENT(debug, "multi_connection_new_cx", "[C{}] connections={}", id_,
14
105
                  connection_provider_->totalConnections());
15
105
  connections_.push_back(createNextConnection());
16
105
}
17

            
18
105
MultiConnectionBaseImpl::~MultiConnectionBaseImpl() = default;
19

            
20
85
void MultiConnectionBaseImpl::connect() {
21
85
  ENVOY_BUG(!connect_finished_, "connection already connected");
22
85
  connections_[0]->connect();
23
85
  maybeScheduleNextAttempt();
24
85
}
25

            
26
3
void MultiConnectionBaseImpl::addWriteFilter(WriteFilterSharedPtr filter) {
27
3
  if (connect_finished_) {
28
2
    connections_[0]->addWriteFilter(filter);
29
2
    return;
30
2
  }
31
  // Filters should only be notified of events on the final connection, so defer adding
32
  // filters until the final connection has been determined.
33
1
  post_connect_state_.write_filters_.push_back(filter);
34
1
}
35

            
36
3
void MultiConnectionBaseImpl::addFilter(FilterSharedPtr filter) {
37
3
  if (connect_finished_) {
38
2
    connections_[0]->addFilter(filter);
39
2
    return;
40
2
  }
41
  // Filters should only be notified of events on the final connection, so defer adding
42
  // filters until the final connection has been determined.
43
1
  post_connect_state_.filters_.push_back(filter);
44
1
}
45

            
46
16
void MultiConnectionBaseImpl::addReadFilter(ReadFilterSharedPtr filter) {
47
16
  if (connect_finished_) {
48
2
    connections_[0]->addReadFilter(filter);
49
2
    return;
50
2
  }
51
  // Filters should only be notified of events on the final connection, so defer adding
52
  // filters until the final connection has been determined.
53
14
  post_connect_state_.read_filters_.push_back(filter);
54
14
}
55

            
56
2
void MultiConnectionBaseImpl::removeReadFilter(ReadFilterSharedPtr filter) {
57
2
  if (connect_finished_) {
58
1
    connections_[0]->removeReadFilter(filter);
59
1
    return;
60
1
  }
61
  // Filters should only be notified of events on the final connection, so remove
62
  // the filters from the list of deferred filters.
63
1
  auto i = post_connect_state_.read_filters_.begin();
64
1
  while (i != post_connect_state_.read_filters_.end()) {
65
1
    if (*i == filter) {
66
1
      post_connect_state_.read_filters_.erase(i);
67
1
      return;
68
1
    }
69
1
  }
70
  IS_ENVOY_BUG("Failed to remove read filter");
71
}
72

            
73
6
bool MultiConnectionBaseImpl::initializeReadFilters() {
74
6
  if (connect_finished_) {
75
4
    return connections_[0]->initializeReadFilters();
76
4
  }
77
  // Filters should only be notified of events on the final connection, so defer
78
  // initialization of the filters until the final connection has been determined.
79
2
  if (post_connect_state_.read_filters_.empty()) {
80
1
    return false;
81
1
  }
82
1
  post_connect_state_.initialize_read_filters_ = true;
83
1
  return true;
84
2
}
85

            
86
void MultiConnectionBaseImpl::addAccessLogHandler(AccessLog::InstanceSharedPtr handler) {
87
  if (connect_finished_) {
88
    connections_[0]->addAccessLogHandler(handler);
89
    return;
90
  }
91
  // Access log handlers should only be notified of events on the final connection, so defer adding
92
  // access log handlers until the final connection has been determined.
93
  post_connect_state_.access_log_handlers_.push_back(handler);
94
}
95

            
96
3
void MultiConnectionBaseImpl::addBytesSentCallback(Connection::BytesSentCb cb) {
97
3
  if (connect_finished_) {
98
2
    connections_[0]->addBytesSentCallback(cb);
99
2
    return;
100
2
  }
101
  // Callbacks should only be notified of events on the final connection, so defer adding
102
  // callbacks until the final connection has been determined.
103
1
  post_connect_state_.bytes_sent_callbacks_.push_back(cb);
104
1
}
105

            
106
3
void MultiConnectionBaseImpl::enableHalfClose(bool enabled) {
107
3
  if (!connect_finished_) {
108
1
    per_connection_state_.enable_half_close_ = enabled;
109
1
  }
110
3
  for (auto& connection : connections_) {
111
3
    connection->enableHalfClose(enabled);
112
3
  }
113
3
}
114

            
115
3
bool MultiConnectionBaseImpl::isHalfCloseEnabled() const {
116
3
  return connections_[0]->isHalfCloseEnabled();
117
3
}
118

            
119
bool MultiConnectionBaseImpl::setSocketOption(Network::SocketOptionName name,
120
2
                                              absl::Span<uint8_t> value) {
121
2
  bool success = true;
122
2
  for (auto& connection : connections_) {
123
2
    if (!connection->setSocketOption(name, value)) {
124
1
      success = false;
125
1
    }
126
2
  }
127
2
  return success;
128
2
}
129

            
130
1
std::string MultiConnectionBaseImpl::nextProtocol() const {
131
1
  return connections_[0]->nextProtocol();
132
1
}
133

            
134
12
void MultiConnectionBaseImpl::noDelay(bool enable) {
135
12
  if (!connect_finished_) {
136
11
    per_connection_state_.no_delay_ = enable;
137
11
  }
138
12
  for (auto& connection : connections_) {
139
12
    connection->noDelay(enable);
140
12
  }
141
12
}
142

            
143
9
Connection::ReadDisableStatus MultiConnectionBaseImpl::readDisable(bool disable) {
144
9
  if (connect_finished_) {
145
1
    return connections_[0]->readDisable(disable);
146
1
  }
147

            
148
8
  if (!post_connect_state_.read_disable_count_.has_value()) {
149
2
    post_connect_state_.read_disable_count_ = 0;
150
2
  }
151

            
152
8
  auto read_disable_state = ReadDisableStatus::StillReadDisabled;
153

            
154
8
  if (disable) {
155
5
    if (post_connect_state_.read_disable_count_ == 0) {
156
2
      read_disable_state = ReadDisableStatus::TransitionedToReadDisabled;
157
2
    }
158

            
159
5
    post_connect_state_.read_disable_count_.value()++;
160
5
  } else {
161
3
    ASSERT(post_connect_state_.read_disable_count_ != 0);
162
3
    post_connect_state_.read_disable_count_.value()--;
163

            
164
3
    if (post_connect_state_.read_disable_count_ == 0) {
165
1
      read_disable_state = ReadDisableStatus::TransitionedToReadEnabled;
166
1
    }
167
3
  }
168

            
169
8
  return read_disable_state;
170
9
}
171

            
172
10
void MultiConnectionBaseImpl::detectEarlyCloseWhenReadDisabled(bool value) {
173
10
  if (!connect_finished_) {
174
9
    per_connection_state_.detect_early_close_when_read_disabled_ = value;
175
9
  }
176
10
  for (auto& connection : connections_) {
177
10
    connection->detectEarlyCloseWhenReadDisabled(value);
178
10
  }
179
10
}
180

            
181
6
bool MultiConnectionBaseImpl::readEnabled() const {
182
6
  if (!connect_finished_) {
183
5
    return !post_connect_state_.read_disable_count_.has_value() ||
184
5
           post_connect_state_.read_disable_count_ == 0;
185
5
  }
186
1
  return connections_[0]->readEnabled();
187
6
}
188

            
189
38
ConnectionInfoSetter& MultiConnectionBaseImpl::connectionInfoSetter() {
190
38
  return connections_[0]->connectionInfoSetter();
191
38
}
192

            
193
4
const ConnectionInfoProvider& MultiConnectionBaseImpl::connectionInfoProvider() const {
194
4
  return connections_[0]->connectionInfoProvider();
195
4
}
196

            
197
1
ConnectionInfoProviderSharedPtr MultiConnectionBaseImpl::connectionInfoProviderSharedPtr() const {
198
1
  return connections_[0]->connectionInfoProviderSharedPtr();
199
1
}
200

            
201
absl::optional<Connection::UnixDomainSocketPeerCredentials>
202
1
MultiConnectionBaseImpl::unixSocketPeerCredentials() const {
203
1
  return connections_[0]->unixSocketPeerCredentials();
204
1
}
205

            
206
1
Ssl::ConnectionInfoConstSharedPtr MultiConnectionBaseImpl::ssl() const {
207
1
  return connections_[0]->ssl();
208
1
}
209

            
210
2
Connection::State MultiConnectionBaseImpl::state() const {
211
2
  if (!connect_finished_) {
212
1
    ASSERT(connections_[0]->state() == Connection::State::Open);
213
1
  }
214
2
  return connections_[0]->state();
215
2
}
216

            
217
12
bool MultiConnectionBaseImpl::connecting() const {
218
12
  ASSERT(connect_finished_ || connections_[0]->connecting());
219
12
  return connections_[0]->connecting();
220
12
}
221

            
222
10
void MultiConnectionBaseImpl::write(Buffer::Instance& data, bool end_stream) {
223
10
  if (connect_finished_) {
224
4
    connections_[0]->write(data, end_stream);
225
4
    return;
226
4
  }
227

            
228
  // Data should only be written on the final connection, so defer actually writing
229
  // until the final connection has been determined.
230
6
  if (!post_connect_state_.write_buffer_.has_value()) {
231
5
    post_connect_state_.end_stream_ = false;
232
5
    post_connect_state_.write_buffer_ = dispatcher_.getWatermarkFactory().createBuffer(
233
5
        [this]() -> void { this->onWriteBufferLowWatermark(); },
234
5
        [this]() -> void { this->onWriteBufferHighWatermark(); },
235
        // ConnectionCallbacks do not have a method to receive overflow watermark
236
        // notification. So this class, like ConnectionImpl, has a no-op handler.
237
5
        []() -> void { /* TODO(adisuissa): Handle overflow watermark */ });
238
5
    if (per_connection_state_.buffer_limits_.has_value()) {
239
2
      post_connect_state_.write_buffer_.value()->setWatermarks(
240
2
          per_connection_state_.buffer_limits_.value());
241
2
    }
242
5
  }
243

            
244
6
  post_connect_state_.write_buffer_.value()->move(data);
245
6
  ASSERT(!post_connect_state_.end_stream_.value()); // Don't write after end_stream.
246
6
  post_connect_state_.end_stream_ = end_stream;
247
6
}
248

            
249
43
void MultiConnectionBaseImpl::setBufferLimits(uint32_t limit) {
250
43
  if (!connect_finished_) {
251
42
    ASSERT(!per_connection_state_.buffer_limits_.has_value());
252
42
    per_connection_state_.buffer_limits_ = limit;
253
42
    if (post_connect_state_.write_buffer_.has_value()) {
254
1
      post_connect_state_.write_buffer_.value()->setWatermarks(limit);
255
1
    }
256
42
  }
257
43
  for (auto& connection : connections_) {
258
43
    connection->setBufferLimits(limit);
259
43
  }
260
43
}
261

            
262
2
void MultiConnectionBaseImpl::setBufferHighWatermarkTimeout(std::chrono::milliseconds timeout) {
263
2
  if (!connect_finished_) {
264
2
    per_connection_state_.buffer_high_watermark_timeout_ = timeout;
265
2
  }
266
3
  for (auto& connection : connections_) {
267
3
    connection->setBufferHighWatermarkTimeout(timeout);
268
3
  }
269
2
}
270

            
271
9
uint32_t MultiConnectionBaseImpl::bufferLimit() const { return connections_[0]->bufferLimit(); }
272

            
273
7
bool MultiConnectionBaseImpl::aboveHighWatermark() const {
274
7
  if (!connect_finished_) {
275
    // Writes are deferred, so return the watermark status from the deferred write buffer.
276
3
    return post_connect_state_.write_buffer_.has_value() &&
277
3
           post_connect_state_.write_buffer_.value()->highWatermarkTriggered();
278
3
  }
279

            
280
4
  return connections_[0]->aboveHighWatermark();
281
7
}
282

            
283
1
const ConnectionSocket::OptionsSharedPtr& MultiConnectionBaseImpl::socketOptions() const {
284
  // Note, this might change before connect finishes.
285
1
  return connections_[0]->socketOptions();
286
1
}
287

            
288
1
absl::string_view MultiConnectionBaseImpl::requestedServerName() const {
289
  // Note, this might change before connect finishes.
290
1
  return connections_[0]->requestedServerName();
291
1
}
292

            
293
45
StreamInfo::StreamInfo& MultiConnectionBaseImpl::streamInfo() {
294
  // Note, this might change before connect finishes.
295
45
  return connections_[0]->streamInfo();
296
45
}
297

            
298
1
const StreamInfo::StreamInfo& MultiConnectionBaseImpl::streamInfo() const {
299
  // Note, this might change before connect finishes.
300
1
  return connections_[0]->streamInfo();
301
1
}
302

            
303
14
absl::string_view MultiConnectionBaseImpl::transportFailureReason() const {
304
  // Note, this might change before connect finishes.
305
14
  return connections_[0]->transportFailureReason();
306
14
}
307

            
308
absl::string_view MultiConnectionBaseImpl::localCloseReason() const {
309
  // Note, this might change before connect finishes.
310
  return connections_[0]->localCloseReason();
311
}
312

            
313
4
bool MultiConnectionBaseImpl::startSecureTransport() {
314
4
  if (!connect_finished_) {
315
2
    per_connection_state_.start_secure_transport_ = true;
316
2
  }
317
4
  bool ret = true;
318
5
  for (auto& connection : connections_) {
319
5
    if (!connection->startSecureTransport()) {
320
3
      ret = false;
321
3
    }
322
5
  }
323
4
  return ret;
324
4
}
325

            
326
1
absl::optional<std::chrono::milliseconds> MultiConnectionBaseImpl::lastRoundTripTime() const {
327
  // Note, this might change before connect finishes.
328
1
  return connections_[0]->lastRoundTripTime();
329
1
}
330

            
331
absl::optional<uint64_t> MultiConnectionBaseImpl::congestionWindowInBytes() const {
332
  // Note, this value changes constantly even within the same connection.
333
  return connections_[0]->congestionWindowInBytes();
334
}
335

            
336
24
void MultiConnectionBaseImpl::addConnectionCallbacks(ConnectionCallbacks& cb) {
337
24
  if (connect_finished_) {
338
1
    connections_[0]->addConnectionCallbacks(cb);
339
1
    return;
340
1
  }
341
  // Callbacks should only be notified of events on the final connection, so defer adding
342
  // callbacks until the final connection has been determined.
343
23
  post_connect_state_.connection_callbacks_.push_back(&cb);
344
23
}
345

            
346
2
void MultiConnectionBaseImpl::removeConnectionCallbacks(ConnectionCallbacks& cb) {
347
2
  if (connect_finished_) {
348
1
    connections_[0]->removeConnectionCallbacks(cb);
349
1
    return;
350
1
  }
351
  // Callbacks should only be notified of events on the final connection, so remove
352
  // the callback from the list of deferred callbacks.
353
1
  auto i = post_connect_state_.connection_callbacks_.begin();
354
1
  while (i != post_connect_state_.connection_callbacks_.end()) {
355
1
    if (*i == &cb) {
356
1
      post_connect_state_.connection_callbacks_.erase(i);
357
1
      return;
358
1
    }
359
1
  }
360
  IS_ENVOY_BUG("Failed to remove connection callbacks");
361
}
362

            
363
7
void MultiConnectionBaseImpl::close(ConnectionCloseType type, absl::string_view details) {
364
7
  if (connect_finished_) {
365
3
    connections_[0]->close(type, details);
366
3
    return;
367
3
  }
368

            
369
4
  connect_finished_ = true;
370
4
  ENVOY_LOG(trace, "Disabling next attempt timer.");
371
4
  next_attempt_timer_->disableTimer();
372
10
  for (size_t i = 0; i < connections_.size(); ++i) {
373
6
    connections_[i]->removeConnectionCallbacks(*callbacks_wrappers_[i]);
374
6
    if (i != 0) {
375
      // Wait to close the final connection until the post-connection callbacks
376
      // have been added.
377
2
      connections_[i]->close(ConnectionCloseType::NoFlush, details);
378
2
    }
379
6
  }
380
4
  connections_.resize(1);
381
4
  callbacks_wrappers_.clear();
382

            
383
4
  for (auto cb : post_connect_state_.connection_callbacks_) {
384
3
    if (cb) {
385
3
      connections_[0]->addConnectionCallbacks(*cb);
386
3
    }
387
3
  }
388
4
  connections_[0]->close(type, details);
389
4
}
390

            
391
StreamInfo::DetectedCloseType MultiConnectionBaseImpl::detectedCloseType() const {
392
  return connections_[0]->detectedCloseType();
393
};
394

            
395
15
Event::Dispatcher& MultiConnectionBaseImpl::dispatcher() const {
396
15
  ASSERT(&dispatcher_ == &connections_[0]->dispatcher());
397
15
  return connections_[0]->dispatcher();
398
15
}
399

            
400
2
uint64_t MultiConnectionBaseImpl::id() const { return id_; }
401

            
402
1
void MultiConnectionBaseImpl::hashKey(std::vector<uint8_t>& hash_key) const {
403
  // Pack the id into sizeof(id_) uint8_t entries in the hash_key vector.
404
1
  hash_key.reserve(hash_key.size() + sizeof(id_));
405
9
  for (unsigned i = 0; i < sizeof(id_); ++i) {
406
8
    hash_key.push_back(0xFF & (id_ >> (8 * i)));
407
8
  }
408
1
}
409

            
410
12
void MultiConnectionBaseImpl::setConnectionStats(const ConnectionStats& stats) {
411
12
  if (!connect_finished_) {
412
11
    per_connection_state_.connection_stats_ = std::make_unique<ConnectionStats>(stats);
413
11
  }
414
12
  for (auto& connection : connections_) {
415
12
    connection->setConnectionStats(stats);
416
12
  }
417
12
}
418

            
419
2
void MultiConnectionBaseImpl::setDelayedCloseTimeout(std::chrono::milliseconds timeout) {
420
2
  if (!connect_finished_) {
421
1
    per_connection_state_.delayed_close_timeout_ = timeout;
422
1
  }
423
2
  for (auto& connection : connections_) {
424
2
    connection->setDelayedCloseTimeout(timeout);
425
2
  }
426
2
}
427

            
428
void MultiConnectionBaseImpl::dumpState(std::ostream& os, int indent_level) const {
429
  const char* spaces = spacesForLevel(indent_level);
430
  os << spaces << "MultiConnectionBaseImpl " << this << DUMP_MEMBER(id_)
431
     << DUMP_MEMBER(connect_finished_) << "\n";
432

            
433
  for (auto& connection : connections_) {
434
    DUMP_DETAILS(connection);
435
  }
436
}
437

            
438
140
ClientConnectionPtr MultiConnectionBaseImpl::createNextConnection() {
439
140
  ASSERT(connection_provider_->hasNextConnection());
440
140
  auto connection = connection_provider_->createNextConnection(id_);
441
140
  callbacks_wrappers_.push_back(std::make_unique<ConnectionCallbacksWrapper>(*this, *connection));
442
140
  connection->addConnectionCallbacks(*callbacks_wrappers_.back());
443

            
444
140
  if (per_connection_state_.detect_early_close_when_read_disabled_.has_value()) {
445
8
    connection->detectEarlyCloseWhenReadDisabled(
446
8
        per_connection_state_.detect_early_close_when_read_disabled_.value());
447
8
  }
448
140
  if (per_connection_state_.no_delay_.has_value()) {
449
8
    connection->noDelay(per_connection_state_.no_delay_.value());
450
8
  }
451
140
  if (per_connection_state_.connection_stats_) {
452
8
    connection->setConnectionStats(*per_connection_state_.connection_stats_);
453
8
  }
454
140
  if (per_connection_state_.buffer_limits_.has_value()) {
455
8
    connection->setBufferLimits(per_connection_state_.buffer_limits_.value());
456
8
  }
457
140
  if (per_connection_state_.buffer_high_watermark_timeout_.has_value()) {
458
2
    connection->setBufferHighWatermarkTimeout(
459
2
        per_connection_state_.buffer_high_watermark_timeout_.value());
460
2
  }
461
140
  if (per_connection_state_.enable_half_close_.has_value()) {
462
1
    connection->enableHalfClose(per_connection_state_.enable_half_close_.value());
463
1
  }
464
140
  if (per_connection_state_.delayed_close_timeout_.has_value()) {
465
1
    connection->setDelayedCloseTimeout(per_connection_state_.delayed_close_timeout_.value());
466
1
  }
467
140
  if (per_connection_state_.start_secure_transport_.has_value()) {
468
1
    ASSERT(per_connection_state_.start_secure_transport_);
469
1
    connection->startSecureTransport();
470
1
  }
471

            
472
140
  return connection;
473
140
}
474

            
475
35
void MultiConnectionBaseImpl::tryAnotherConnection() {
476
35
  ENVOY_LOG(trace, "Trying another connection.");
477
35
  connections_.push_back(createNextConnection());
478
35
  connections_.back()->connect();
479
35
  maybeScheduleNextAttempt();
480
35
}
481

            
482
120
void MultiConnectionBaseImpl::maybeScheduleNextAttempt() {
483
120
  if (!connection_provider_->hasNextConnection()) {
484
11
    return;
485
11
  }
486
109
  ENVOY_LOG(trace, "Scheduling next attempt.");
487
109
  next_attempt_timer_->enableTimer(std::chrono::milliseconds(300));
488
109
}
489

            
490
73
void MultiConnectionBaseImpl::onEvent(ConnectionEvent event, ConnectionCallbacksWrapper* wrapper) {
491
73
  switch (event) {
492
57
  case ConnectionEvent::Connected: {
493
57
    ENVOY_CONN_LOG_EVENT(debug, "multi_connection_cx_ok", "connection={}", *this,
494
57
                         connection_provider_->nextConnection());
495
57
    break;
496
  }
497
  case ConnectionEvent::LocalClose:
498
16
  case ConnectionEvent::RemoteClose: {
499
16
    ENVOY_CONN_LOG_EVENT(debug, "multi_connection_cx_attempt_failed", "connection={}", *this,
500
16
                         connection_provider_->nextConnection());
501
    // This connection attempt has failed. If possible, start another connection attempt
502
    // immediately, instead of waiting for the timer.
503
16
    if (connection_provider_->hasNextConnection()) {
504
7
      ENVOY_LOG(trace, "Disabling next attempt timer.");
505
7
      next_attempt_timer_->disableTimer();
506
7
      tryAnotherConnection();
507
7
    }
508
    // If there is at least one more attempt running then the current attempt can be destroyed.
509
16
    if (connections_.size() > 1) {
510
      // Nuke this connection and associated callbacks and let a subsequent attempt proceed.
511
10
      cleanupWrapperAndConnection(wrapper);
512
10
      return;
513
10
    }
514
6
    ASSERT(connections_.size() == 1);
515
    // This connection attempt failed but there are no more attempts to be made, so pass
516
    // the failure up by setting up this connection as the final one.
517
6
    ENVOY_CONN_LOG_EVENT(debug, "multi_connection_cx_failed", "connections={}", *this,
518
6
                         connection_provider_->totalConnections());
519
6
    break;
520
16
  }
521
  case ConnectionEvent::ConnectedZeroRtt: {
522
    IS_ENVOY_BUG("Unexpected 0-RTT event received on TCP connection.");
523
    return;
524
16
  }
525
73
  }
526

            
527
  // Close all other connections and configure the final connection.
528
63
  setUpFinalConnection(event, wrapper);
529
63
}
530

            
531
void MultiConnectionBaseImpl::setUpFinalConnection(ConnectionEvent event,
532
63
                                                   ConnectionCallbacksWrapper* wrapper) {
533
63
  ASSERT(event != ConnectionEvent::ConnectedZeroRtt);
534
63
  connect_finished_ = true;
535
63
  ENVOY_LOG(trace, "Disabling next attempt timer due to final connection.");
536
63
  next_attempt_timer_->disableTimer();
537
  // Remove the proxied connection callbacks from all connections.
538
82
  for (auto& w : callbacks_wrappers_) {
539
82
    w->connection().removeConnectionCallbacks(*w);
540
82
  }
541

            
542
  // Close and delete any other connections.
543
63
  auto it = connections_.begin();
544
145
  while (it != connections_.end()) {
545
82
    if (it->get() != &(wrapper->connection())) {
546
19
      (*it)->close(ConnectionCloseType::NoFlush);
547
19
      dispatcher_.deferredDelete(std::move(*it));
548
19
      it = connections_.erase(it);
549
63
    } else {
550
63
      ++it;
551
63
    }
552
82
  }
553
63
  ASSERT(connections_.size() == 1);
554
63
  callbacks_wrappers_.clear();
555

            
556
  // Apply post-connect state to the final socket.
557
63
  for (const auto& cb : post_connect_state_.bytes_sent_callbacks_) {
558
1
    connections_[0]->addBytesSentCallback(cb);
559
1
  }
560

            
561
63
  if (event == ConnectionEvent::Connected) {
562
    // Apply post-connect state which is only connections which have succeeded.
563
57
    for (auto& filter : post_connect_state_.filters_) {
564
1
      connections_[0]->addFilter(filter);
565
1
    }
566
57
    for (auto& filter : post_connect_state_.write_filters_) {
567
1
      connections_[0]->addWriteFilter(filter);
568
1
    }
569
57
    for (auto& filter : post_connect_state_.read_filters_) {
570
6
      connections_[0]->addReadFilter(filter);
571
6
    }
572
57
    for (auto& handler : post_connect_state_.access_log_handlers_) {
573
      connections_[0]->addAccessLogHandler(handler);
574
    }
575
57
    if (post_connect_state_.initialize_read_filters_.has_value() &&
576
57
        post_connect_state_.initialize_read_filters_.value()) {
577
      // initialize_read_filters_ is set to true in initializeReadFilters() only when
578
      // there are read filters installed. The underlying connection's initializeReadFilters()
579
      // will always return true when read filters are installed so this should always
580
      // return true.
581
1
      ASSERT(!post_connect_state_.read_filters_.empty());
582
1
      bool initialized = connections_[0]->initializeReadFilters();
583
1
      ASSERT(initialized);
584
1
    }
585
57
    if (post_connect_state_.read_disable_count_.has_value()) {
586
3
      for (int i = 0; i < post_connect_state_.read_disable_count_.value(); ++i) {
587
2
        connections_[0]->readDisable(true);
588
2
      }
589
1
    }
590

            
591
57
    if (post_connect_state_.write_buffer_.has_value()) {
592
      // write_buffer_ and end_stream_ are both set together in write().
593
5
      ASSERT(post_connect_state_.end_stream_.has_value());
594
      // If a buffer limit was set, ensure that it was applied to the connection.
595
5
      if (per_connection_state_.buffer_limits_.has_value()) {
596
3
        ASSERT(connections_[0]->bufferLimit() == per_connection_state_.buffer_limits_.value());
597
3
      }
598
5
      connections_[0]->write(*post_connect_state_.write_buffer_.value(),
599
5
                             post_connect_state_.end_stream_.value());
600
5
    }
601
57
  }
602

            
603
  // Add connection callbacks after moving data from the deferred write buffer so that
604
  // any high watermark notification is swallowed and not conveyed to the callbacks, since
605
  // that was already delivered to the callbacks when the data was written to the buffer.
606
71
  for (auto cb : post_connect_state_.connection_callbacks_) {
607
19
    if (cb) {
608
19
      connections_[0]->addConnectionCallbacks(*cb);
609
19
    }
610
19
  }
611
63
}
612

            
613
10
void MultiConnectionBaseImpl::cleanupWrapperAndConnection(ConnectionCallbacksWrapper* wrapper) {
614
10
  wrapper->connection().removeConnectionCallbacks(*wrapper);
615
32
  for (auto it = connections_.begin(); it != connections_.end();) {
616
22
    if (it->get() == &(wrapper->connection())) {
617
10
      (*it)->close(ConnectionCloseType::NoFlush);
618
10
      dispatcher_.deferredDelete(std::move(*it));
619
10
      it = connections_.erase(it);
620
12
    } else {
621
12
      ++it;
622
12
    }
623
22
  }
624

            
625
32
  for (auto it = callbacks_wrappers_.begin(); it != callbacks_wrappers_.end();) {
626
22
    if (it->get() == wrapper) {
627
10
      it = callbacks_wrappers_.erase(it);
628
12
    } else {
629
12
      ++it;
630
12
    }
631
22
  }
632
10
}
633

            
634
1
void MultiConnectionBaseImpl::onWriteBufferLowWatermark() {
635
  // Only called when moving write data from the deferred write buffer to
636
  // the underlying connection. In this case, the connection callbacks must
637
  // not be notified since this should be transparent to the callbacks.
638
1
}
639

            
640
3
void MultiConnectionBaseImpl::onWriteBufferHighWatermark() {
641
3
  ASSERT(!connect_finished_);
642
3
  for (auto callback : post_connect_state_.connection_callbacks_) {
643
1
    if (callback) {
644
1
      callback->onAboveWriteBufferHighWatermark();
645
1
    }
646
1
  }
647
3
}
648

            
649
} // namespace Network
650
} // namespace Envoy