Line data Source code
1 : #include "source/common/network/multi_connection_base_impl.h"
2 :
3 : #include <vector>
4 :
5 : namespace Envoy {
6 : namespace Network {
7 :
8 : MultiConnectionBaseImpl::MultiConnectionBaseImpl(Event::Dispatcher& dispatcher,
9 : ConnectionProviderPtr connection_provider)
10 : : id_(ConnectionImpl::next_global_id_++), dispatcher_(dispatcher),
11 : connection_provider_(std::move(connection_provider)),
12 0 : next_attempt_timer_(dispatcher_.createTimer([this]() -> void { tryAnotherConnection(); })) {
13 0 : ENVOY_LOG_EVENT(debug, "multi_connection_new_cx", "[C{}] connections={}", id_,
14 0 : connection_provider_->totalConnections());
15 0 : connections_.push_back(createNextConnection());
16 0 : }
17 :
18 0 : MultiConnectionBaseImpl::~MultiConnectionBaseImpl() = default;
19 :
20 0 : void MultiConnectionBaseImpl::connect() {
21 0 : ENVOY_BUG(!connect_finished_, "connection already connected");
22 0 : connections_[0]->connect();
23 0 : maybeScheduleNextAttempt();
24 0 : }
25 :
26 0 : void MultiConnectionBaseImpl::addWriteFilter(WriteFilterSharedPtr filter) {
27 0 : if (connect_finished_) {
28 0 : connections_[0]->addWriteFilter(filter);
29 0 : return;
30 0 : }
31 : // Filters should only be notified of events on the final connection, so defer adding
32 : // filters until the final connection has been determined.
33 0 : post_connect_state_.write_filters_.push_back(filter);
34 0 : }
35 :
36 0 : void MultiConnectionBaseImpl::addFilter(FilterSharedPtr filter) {
37 0 : if (connect_finished_) {
38 0 : connections_[0]->addFilter(filter);
39 0 : return;
40 0 : }
41 : // Filters should only be notified of events on the final connection, so defer adding
42 : // filters until the final connection has been determined.
43 0 : post_connect_state_.filters_.push_back(filter);
44 0 : }
45 :
46 0 : void MultiConnectionBaseImpl::addReadFilter(ReadFilterSharedPtr filter) {
47 0 : if (connect_finished_) {
48 0 : connections_[0]->addReadFilter(filter);
49 0 : return;
50 0 : }
51 : // Filters should only be notified of events on the final connection, so defer adding
52 : // filters until the final connection has been determined.
53 0 : post_connect_state_.read_filters_.push_back(filter);
54 0 : }
55 :
56 0 : void MultiConnectionBaseImpl::removeReadFilter(ReadFilterSharedPtr filter) {
57 0 : if (connect_finished_) {
58 0 : connections_[0]->removeReadFilter(filter);
59 0 : return;
60 0 : }
61 : // Filters should only be notified of events on the final connection, so remove
62 : // the filters from the list of deferred filters.
63 0 : auto i = post_connect_state_.read_filters_.begin();
64 0 : while (i != post_connect_state_.read_filters_.end()) {
65 0 : if (*i == filter) {
66 0 : post_connect_state_.read_filters_.erase(i);
67 0 : return;
68 0 : }
69 0 : }
70 0 : IS_ENVOY_BUG("Failed to remove read filter");
71 0 : }
72 :
73 0 : bool MultiConnectionBaseImpl::initializeReadFilters() {
74 0 : if (connect_finished_) {
75 0 : return connections_[0]->initializeReadFilters();
76 0 : }
77 : // Filters should only be notified of events on the final connection, so defer
78 : // initialization of the filters until the final connection has been determined.
79 0 : if (post_connect_state_.read_filters_.empty()) {
80 0 : return false;
81 0 : }
82 0 : post_connect_state_.initialize_read_filters_ = true;
83 0 : return true;
84 0 : }
85 :
86 0 : void MultiConnectionBaseImpl::addBytesSentCallback(Connection::BytesSentCb cb) {
87 0 : if (connect_finished_) {
88 0 : connections_[0]->addBytesSentCallback(cb);
89 0 : return;
90 0 : }
91 : // Callbacks should only be notified of events on the final connection, so defer adding
92 : // callbacks until the final connection has been determined.
93 0 : post_connect_state_.bytes_sent_callbacks_.push_back(cb);
94 0 : }
95 :
96 0 : void MultiConnectionBaseImpl::enableHalfClose(bool enabled) {
97 0 : if (!connect_finished_) {
98 0 : per_connection_state_.enable_half_close_ = enabled;
99 0 : }
100 0 : for (auto& connection : connections_) {
101 0 : connection->enableHalfClose(enabled);
102 0 : }
103 0 : }
104 :
105 0 : bool MultiConnectionBaseImpl::isHalfCloseEnabled() const {
106 0 : return connections_[0]->isHalfCloseEnabled();
107 0 : }
108 :
109 0 : std::string MultiConnectionBaseImpl::nextProtocol() const {
110 0 : return connections_[0]->nextProtocol();
111 0 : }
112 :
113 0 : void MultiConnectionBaseImpl::noDelay(bool enable) {
114 0 : if (!connect_finished_) {
115 0 : per_connection_state_.no_delay_ = enable;
116 0 : }
117 0 : for (auto& connection : connections_) {
118 0 : connection->noDelay(enable);
119 0 : }
120 0 : }
121 :
122 0 : Connection::ReadDisableStatus MultiConnectionBaseImpl::readDisable(bool disable) {
123 0 : if (connect_finished_) {
124 0 : return connections_[0]->readDisable(disable);
125 0 : }
126 :
127 0 : if (!post_connect_state_.read_disable_count_.has_value()) {
128 0 : post_connect_state_.read_disable_count_ = 0;
129 0 : }
130 :
131 0 : auto read_disable_state = ReadDisableStatus::StillReadDisabled;
132 :
133 0 : if (disable) {
134 0 : if (post_connect_state_.read_disable_count_ == 0) {
135 0 : read_disable_state = ReadDisableStatus::TransitionedToReadDisabled;
136 0 : }
137 :
138 0 : post_connect_state_.read_disable_count_.value()++;
139 0 : } else {
140 0 : ASSERT(post_connect_state_.read_disable_count_ != 0);
141 0 : post_connect_state_.read_disable_count_.value()--;
142 :
143 0 : if (post_connect_state_.read_disable_count_ == 0) {
144 0 : read_disable_state = ReadDisableStatus::TransitionedToReadEnabled;
145 0 : }
146 0 : }
147 :
148 0 : return read_disable_state;
149 0 : }
150 :
151 0 : void MultiConnectionBaseImpl::detectEarlyCloseWhenReadDisabled(bool value) {
152 0 : if (!connect_finished_) {
153 0 : per_connection_state_.detect_early_close_when_read_disabled_ = value;
154 0 : }
155 0 : for (auto& connection : connections_) {
156 0 : connection->detectEarlyCloseWhenReadDisabled(value);
157 0 : }
158 0 : }
159 :
160 0 : bool MultiConnectionBaseImpl::readEnabled() const {
161 0 : if (!connect_finished_) {
162 0 : return !post_connect_state_.read_disable_count_.has_value() ||
163 0 : post_connect_state_.read_disable_count_ == 0;
164 0 : }
165 0 : return connections_[0]->readEnabled();
166 0 : }
167 :
168 0 : ConnectionInfoSetter& MultiConnectionBaseImpl::connectionInfoSetter() {
169 0 : return connections_[0]->connectionInfoSetter();
170 0 : }
171 :
172 0 : const ConnectionInfoProvider& MultiConnectionBaseImpl::connectionInfoProvider() const {
173 0 : return connections_[0]->connectionInfoProvider();
174 0 : }
175 :
176 0 : ConnectionInfoProviderSharedPtr MultiConnectionBaseImpl::connectionInfoProviderSharedPtr() const {
177 0 : return connections_[0]->connectionInfoProviderSharedPtr();
178 0 : }
179 :
180 : absl::optional<Connection::UnixDomainSocketPeerCredentials>
181 0 : MultiConnectionBaseImpl::unixSocketPeerCredentials() const {
182 0 : return connections_[0]->unixSocketPeerCredentials();
183 0 : }
184 :
185 0 : Ssl::ConnectionInfoConstSharedPtr MultiConnectionBaseImpl::ssl() const {
186 0 : return connections_[0]->ssl();
187 0 : }
188 :
189 0 : Connection::State MultiConnectionBaseImpl::state() const {
190 0 : if (!connect_finished_) {
191 0 : ASSERT(connections_[0]->state() == Connection::State::Open);
192 0 : }
193 0 : return connections_[0]->state();
194 0 : }
195 :
196 0 : bool MultiConnectionBaseImpl::connecting() const {
197 0 : ASSERT(connect_finished_ || connections_[0]->connecting());
198 0 : return connections_[0]->connecting();
199 0 : }
200 :
201 0 : void MultiConnectionBaseImpl::write(Buffer::Instance& data, bool end_stream) {
202 0 : if (connect_finished_) {
203 0 : connections_[0]->write(data, end_stream);
204 0 : return;
205 0 : }
206 :
207 : // Data should only be written on the final connection, so defer actually writing
208 : // until the final connection has been determined.
209 0 : if (!post_connect_state_.write_buffer_.has_value()) {
210 0 : post_connect_state_.end_stream_ = false;
211 0 : post_connect_state_.write_buffer_ = dispatcher_.getWatermarkFactory().createBuffer(
212 0 : [this]() -> void { this->onWriteBufferLowWatermark(); },
213 0 : [this]() -> void { this->onWriteBufferHighWatermark(); },
214 : // ConnectionCallbacks do not have a method to receive overflow watermark
215 : // notification. So this class, like ConnectionImpl, has a no-op handler.
216 0 : []() -> void { /* TODO(adisuissa): Handle overflow watermark */ });
217 0 : if (per_connection_state_.buffer_limits_.has_value()) {
218 0 : post_connect_state_.write_buffer_.value()->setWatermarks(
219 0 : per_connection_state_.buffer_limits_.value());
220 0 : }
221 0 : }
222 :
223 0 : post_connect_state_.write_buffer_.value()->move(data);
224 0 : ASSERT(!post_connect_state_.end_stream_.value()); // Don't write after end_stream.
225 0 : post_connect_state_.end_stream_ = end_stream;
226 0 : }
227 :
228 0 : void MultiConnectionBaseImpl::setBufferLimits(uint32_t limit) {
229 0 : if (!connect_finished_) {
230 0 : ASSERT(!per_connection_state_.buffer_limits_.has_value());
231 0 : per_connection_state_.buffer_limits_ = limit;
232 0 : if (post_connect_state_.write_buffer_.has_value()) {
233 0 : post_connect_state_.write_buffer_.value()->setWatermarks(limit);
234 0 : }
235 0 : }
236 0 : for (auto& connection : connections_) {
237 0 : connection->setBufferLimits(limit);
238 0 : }
239 0 : }
240 :
241 0 : uint32_t MultiConnectionBaseImpl::bufferLimit() const { return connections_[0]->bufferLimit(); }
242 :
243 0 : bool MultiConnectionBaseImpl::aboveHighWatermark() const {
244 0 : if (!connect_finished_) {
245 : // Writes are deferred, so return the watermark status from the deferred write buffer.
246 0 : return post_connect_state_.write_buffer_.has_value() &&
247 0 : post_connect_state_.write_buffer_.value()->highWatermarkTriggered();
248 0 : }
249 :
250 0 : return connections_[0]->aboveHighWatermark();
251 0 : }
252 :
253 0 : const ConnectionSocket::OptionsSharedPtr& MultiConnectionBaseImpl::socketOptions() const {
254 : // Note, this might change before connect finishes.
255 0 : return connections_[0]->socketOptions();
256 0 : }
257 :
258 0 : absl::string_view MultiConnectionBaseImpl::requestedServerName() const {
259 : // Note, this might change before connect finishes.
260 0 : return connections_[0]->requestedServerName();
261 0 : }
262 :
263 0 : StreamInfo::StreamInfo& MultiConnectionBaseImpl::streamInfo() {
264 : // Note, this might change before connect finishes.
265 0 : return connections_[0]->streamInfo();
266 0 : }
267 :
268 0 : const StreamInfo::StreamInfo& MultiConnectionBaseImpl::streamInfo() const {
269 : // Note, this might change before connect finishes.
270 0 : return connections_[0]->streamInfo();
271 0 : }
272 :
273 0 : absl::string_view MultiConnectionBaseImpl::transportFailureReason() const {
274 : // Note, this might change before connect finishes.
275 0 : return connections_[0]->transportFailureReason();
276 0 : }
277 :
278 0 : absl::string_view MultiConnectionBaseImpl::localCloseReason() const {
279 : // Note, this might change before connect finishes.
280 0 : return connections_[0]->localCloseReason();
281 0 : }
282 :
283 0 : bool MultiConnectionBaseImpl::startSecureTransport() {
284 0 : if (!connect_finished_) {
285 0 : per_connection_state_.start_secure_transport_ = true;
286 0 : }
287 0 : bool ret = true;
288 0 : for (auto& connection : connections_) {
289 0 : if (!connection->startSecureTransport()) {
290 0 : ret = false;
291 0 : }
292 0 : }
293 0 : return ret;
294 0 : }
295 :
296 0 : absl::optional<std::chrono::milliseconds> MultiConnectionBaseImpl::lastRoundTripTime() const {
297 : // Note, this might change before connect finishes.
298 0 : return connections_[0]->lastRoundTripTime();
299 0 : }
300 :
301 0 : absl::optional<uint64_t> MultiConnectionBaseImpl::congestionWindowInBytes() const {
302 : // Note, this value changes constantly even within the same connection.
303 0 : return connections_[0]->congestionWindowInBytes();
304 0 : }
305 :
306 0 : void MultiConnectionBaseImpl::addConnectionCallbacks(ConnectionCallbacks& cb) {
307 0 : if (connect_finished_) {
308 0 : connections_[0]->addConnectionCallbacks(cb);
309 0 : return;
310 0 : }
311 : // Callbacks should only be notified of events on the final connection, so defer adding
312 : // callbacks until the final connection has been determined.
313 0 : post_connect_state_.connection_callbacks_.push_back(&cb);
314 0 : }
315 :
316 0 : void MultiConnectionBaseImpl::removeConnectionCallbacks(ConnectionCallbacks& cb) {
317 0 : if (connect_finished_) {
318 0 : connections_[0]->removeConnectionCallbacks(cb);
319 0 : return;
320 0 : }
321 : // Callbacks should only be notified of events on the final connection, so remove
322 : // the callback from the list of deferred callbacks.
323 0 : auto i = post_connect_state_.connection_callbacks_.begin();
324 0 : while (i != post_connect_state_.connection_callbacks_.end()) {
325 0 : if (*i == &cb) {
326 0 : post_connect_state_.connection_callbacks_.erase(i);
327 0 : return;
328 0 : }
329 0 : }
330 0 : IS_ENVOY_BUG("Failed to remove connection callbacks");
331 0 : }
332 :
333 0 : void MultiConnectionBaseImpl::close(ConnectionCloseType type, absl::string_view details) {
334 0 : if (connect_finished_) {
335 0 : connections_[0]->close(type, details);
336 0 : return;
337 0 : }
338 :
339 0 : connect_finished_ = true;
340 0 : ENVOY_LOG(trace, "Disabling next attempt timer.");
341 0 : next_attempt_timer_->disableTimer();
342 0 : for (size_t i = 0; i < connections_.size(); ++i) {
343 0 : connections_[i]->removeConnectionCallbacks(*callbacks_wrappers_[i]);
344 0 : if (i != 0) {
345 : // Wait to close the final connection until the post-connection callbacks
346 : // have been added.
347 0 : connections_[i]->close(ConnectionCloseType::NoFlush, details);
348 0 : }
349 0 : }
350 0 : connections_.resize(1);
351 0 : callbacks_wrappers_.clear();
352 :
353 0 : for (auto cb : post_connect_state_.connection_callbacks_) {
354 0 : if (cb) {
355 0 : connections_[0]->addConnectionCallbacks(*cb);
356 0 : }
357 0 : }
358 0 : connections_[0]->close(type, details);
359 0 : }
360 :
361 0 : DetectedCloseType MultiConnectionBaseImpl::detectedCloseType() const {
362 0 : return connections_[0]->detectedCloseType();
363 0 : };
364 :
365 0 : Event::Dispatcher& MultiConnectionBaseImpl::dispatcher() const {
366 0 : ASSERT(&dispatcher_ == &connections_[0]->dispatcher());
367 0 : return connections_[0]->dispatcher();
368 0 : }
369 :
370 0 : uint64_t MultiConnectionBaseImpl::id() const { return id_; }
371 :
372 0 : void MultiConnectionBaseImpl::hashKey(std::vector<uint8_t>& hash_key) const {
373 : // Pack the id into sizeof(id_) uint8_t entries in the hash_key vector.
374 0 : hash_key.reserve(hash_key.size() + sizeof(id_));
375 0 : for (unsigned i = 0; i < sizeof(id_); ++i) {
376 0 : hash_key.push_back(0xFF & (id_ >> (8 * i)));
377 0 : }
378 0 : }
379 :
380 0 : void MultiConnectionBaseImpl::setConnectionStats(const ConnectionStats& stats) {
381 0 : if (!connect_finished_) {
382 0 : per_connection_state_.connection_stats_ = std::make_unique<ConnectionStats>(stats);
383 0 : }
384 0 : for (auto& connection : connections_) {
385 0 : connection->setConnectionStats(stats);
386 0 : }
387 0 : }
388 :
389 0 : void MultiConnectionBaseImpl::setDelayedCloseTimeout(std::chrono::milliseconds timeout) {
390 0 : if (!connect_finished_) {
391 0 : per_connection_state_.delayed_close_timeout_ = timeout;
392 0 : }
393 0 : for (auto& connection : connections_) {
394 0 : connection->setDelayedCloseTimeout(timeout);
395 0 : }
396 0 : }
397 :
398 0 : void MultiConnectionBaseImpl::dumpState(std::ostream& os, int indent_level) const {
399 0 : const char* spaces = spacesForLevel(indent_level);
400 0 : os << spaces << "MultiConnectionBaseImpl " << this << DUMP_MEMBER(id_)
401 0 : << DUMP_MEMBER(connect_finished_) << "\n";
402 :
403 0 : for (auto& connection : connections_) {
404 0 : DUMP_DETAILS(connection);
405 0 : }
406 0 : }
407 :
408 0 : ClientConnectionPtr MultiConnectionBaseImpl::createNextConnection() {
409 0 : ASSERT(connection_provider_->hasNextConnection());
410 0 : auto connection = connection_provider_->createNextConnection(id_);
411 0 : callbacks_wrappers_.push_back(std::make_unique<ConnectionCallbacksWrapper>(*this, *connection));
412 0 : connection->addConnectionCallbacks(*callbacks_wrappers_.back());
413 :
414 0 : if (per_connection_state_.detect_early_close_when_read_disabled_.has_value()) {
415 0 : connection->detectEarlyCloseWhenReadDisabled(
416 0 : per_connection_state_.detect_early_close_when_read_disabled_.value());
417 0 : }
418 0 : if (per_connection_state_.no_delay_.has_value()) {
419 0 : connection->noDelay(per_connection_state_.no_delay_.value());
420 0 : }
421 0 : if (per_connection_state_.connection_stats_) {
422 0 : connection->setConnectionStats(*per_connection_state_.connection_stats_);
423 0 : }
424 0 : if (per_connection_state_.buffer_limits_.has_value()) {
425 0 : connection->setBufferLimits(per_connection_state_.buffer_limits_.value());
426 0 : }
427 0 : if (per_connection_state_.enable_half_close_.has_value()) {
428 0 : connection->enableHalfClose(per_connection_state_.enable_half_close_.value());
429 0 : }
430 0 : if (per_connection_state_.delayed_close_timeout_.has_value()) {
431 0 : connection->setDelayedCloseTimeout(per_connection_state_.delayed_close_timeout_.value());
432 0 : }
433 0 : if (per_connection_state_.start_secure_transport_.has_value()) {
434 0 : ASSERT(per_connection_state_.start_secure_transport_);
435 0 : connection->startSecureTransport();
436 0 : }
437 :
438 0 : return connection;
439 0 : }
440 :
441 0 : void MultiConnectionBaseImpl::tryAnotherConnection() {
442 0 : ENVOY_LOG(trace, "Trying another connection.");
443 0 : connections_.push_back(createNextConnection());
444 0 : connections_.back()->connect();
445 0 : maybeScheduleNextAttempt();
446 0 : }
447 :
448 0 : void MultiConnectionBaseImpl::maybeScheduleNextAttempt() {
449 0 : if (!connection_provider_->hasNextConnection()) {
450 0 : return;
451 0 : }
452 0 : ENVOY_LOG(trace, "Scheduling next attempt.");
453 0 : next_attempt_timer_->enableTimer(std::chrono::milliseconds(300));
454 0 : }
455 :
456 0 : void MultiConnectionBaseImpl::onEvent(ConnectionEvent event, ConnectionCallbacksWrapper* wrapper) {
457 0 : switch (event) {
458 0 : case ConnectionEvent::Connected: {
459 0 : ENVOY_CONN_LOG_EVENT(debug, "multi_connection_cx_ok", "connection={}", *this,
460 0 : connection_provider_->nextConnection());
461 0 : break;
462 0 : }
463 0 : case ConnectionEvent::LocalClose:
464 0 : case ConnectionEvent::RemoteClose: {
465 0 : ENVOY_CONN_LOG_EVENT(debug, "multi_connection_cx_attempt_failed", "connection={}", *this,
466 0 : connection_provider_->nextConnection());
467 : // This connection attempt has failed. If possible, start another connection attempt
468 : // immediately, instead of waiting for the timer.
469 0 : if (connection_provider_->hasNextConnection()) {
470 0 : ENVOY_LOG(trace, "Disabling next attempt timer.");
471 0 : next_attempt_timer_->disableTimer();
472 0 : tryAnotherConnection();
473 0 : }
474 : // If there is at least one more attempt running then the current attempt can be destroyed.
475 0 : if (connections_.size() > 1) {
476 : // Nuke this connection and associated callbacks and let a subsequent attempt proceed.
477 0 : cleanupWrapperAndConnection(wrapper);
478 0 : return;
479 0 : }
480 0 : ASSERT(connections_.size() == 1);
481 : // This connection attempt failed but there are no more attempts to be made, so pass
482 : // the failure up by setting up this connection as the final one.
483 0 : ENVOY_CONN_LOG_EVENT(debug, "multi_connection_cx_failed", "connections={}", *this,
484 0 : connection_provider_->totalConnections());
485 0 : break;
486 0 : }
487 0 : case ConnectionEvent::ConnectedZeroRtt: {
488 0 : IS_ENVOY_BUG("Unexpected 0-RTT event received on TCP connection.");
489 0 : return;
490 0 : }
491 0 : }
492 :
493 : // Close all other connections and configure the final connection.
494 0 : setUpFinalConnection(event, wrapper);
495 0 : }
496 :
497 : void MultiConnectionBaseImpl::setUpFinalConnection(ConnectionEvent event,
498 0 : ConnectionCallbacksWrapper* wrapper) {
499 0 : ASSERT(event != ConnectionEvent::ConnectedZeroRtt);
500 0 : connect_finished_ = true;
501 0 : ENVOY_LOG(trace, "Disabling next attempt timer due to final connection.");
502 0 : next_attempt_timer_->disableTimer();
503 : // Remove the proxied connection callbacks from all connections.
504 0 : for (auto& w : callbacks_wrappers_) {
505 0 : w->connection().removeConnectionCallbacks(*w);
506 0 : }
507 :
508 : // Close and delete any other connections.
509 0 : auto it = connections_.begin();
510 0 : while (it != connections_.end()) {
511 0 : if (it->get() != &(wrapper->connection())) {
512 0 : (*it)->close(ConnectionCloseType::NoFlush);
513 0 : dispatcher_.deferredDelete(std::move(*it));
514 0 : it = connections_.erase(it);
515 0 : } else {
516 0 : ++it;
517 0 : }
518 0 : }
519 0 : ASSERT(connections_.size() == 1);
520 0 : callbacks_wrappers_.clear();
521 :
522 : // Apply post-connect state to the final socket.
523 0 : for (const auto& cb : post_connect_state_.bytes_sent_callbacks_) {
524 0 : connections_[0]->addBytesSentCallback(cb);
525 0 : }
526 :
527 0 : if (event == ConnectionEvent::Connected) {
528 : // Apply post-connect state which is only connections which have succeeded.
529 0 : for (auto& filter : post_connect_state_.filters_) {
530 0 : connections_[0]->addFilter(filter);
531 0 : }
532 0 : for (auto& filter : post_connect_state_.write_filters_) {
533 0 : connections_[0]->addWriteFilter(filter);
534 0 : }
535 0 : for (auto& filter : post_connect_state_.read_filters_) {
536 0 : connections_[0]->addReadFilter(filter);
537 0 : }
538 0 : if (post_connect_state_.initialize_read_filters_.has_value() &&
539 0 : post_connect_state_.initialize_read_filters_.value()) {
540 : // initialize_read_filters_ is set to true in initializeReadFilters() only when
541 : // there are read filters installed. The underlying connection's initializeReadFilters()
542 : // will always return true when read filters are installed so this should always
543 : // return true.
544 0 : ASSERT(!post_connect_state_.read_filters_.empty());
545 0 : bool initialized = connections_[0]->initializeReadFilters();
546 0 : ASSERT(initialized);
547 0 : }
548 0 : if (post_connect_state_.read_disable_count_.has_value()) {
549 0 : for (int i = 0; i < post_connect_state_.read_disable_count_.value(); ++i) {
550 0 : connections_[0]->readDisable(true);
551 0 : }
552 0 : }
553 :
554 0 : if (post_connect_state_.write_buffer_.has_value()) {
555 : // write_buffer_ and end_stream_ are both set together in write().
556 0 : ASSERT(post_connect_state_.end_stream_.has_value());
557 : // If a buffer limit was set, ensure that it was applied to the connection.
558 0 : if (per_connection_state_.buffer_limits_.has_value()) {
559 0 : ASSERT(connections_[0]->bufferLimit() == per_connection_state_.buffer_limits_.value());
560 0 : }
561 0 : connections_[0]->write(*post_connect_state_.write_buffer_.value(),
562 0 : post_connect_state_.end_stream_.value());
563 0 : }
564 0 : }
565 :
566 : // Add connection callbacks after moving data from the deferred write buffer so that
567 : // any high watermark notification is swallowed and not conveyed to the callbacks, since
568 : // that was already delivered to the callbacks when the data was written to the buffer.
569 0 : for (auto cb : post_connect_state_.connection_callbacks_) {
570 0 : if (cb) {
571 0 : connections_[0]->addConnectionCallbacks(*cb);
572 0 : }
573 0 : }
574 0 : }
575 :
576 0 : void MultiConnectionBaseImpl::cleanupWrapperAndConnection(ConnectionCallbacksWrapper* wrapper) {
577 0 : wrapper->connection().removeConnectionCallbacks(*wrapper);
578 0 : for (auto it = connections_.begin(); it != connections_.end();) {
579 0 : if (it->get() == &(wrapper->connection())) {
580 0 : (*it)->close(ConnectionCloseType::NoFlush);
581 0 : dispatcher_.deferredDelete(std::move(*it));
582 0 : it = connections_.erase(it);
583 0 : } else {
584 0 : ++it;
585 0 : }
586 0 : }
587 :
588 0 : for (auto it = callbacks_wrappers_.begin(); it != callbacks_wrappers_.end();) {
589 0 : if (it->get() == wrapper) {
590 0 : it = callbacks_wrappers_.erase(it);
591 0 : } else {
592 0 : ++it;
593 0 : }
594 0 : }
595 0 : }
596 :
597 0 : void MultiConnectionBaseImpl::onWriteBufferLowWatermark() {
598 : // Only called when moving write data from the deferred write buffer to
599 : // the underlying connection. In this case, the connection callbacks must
600 : // not be notified since this should be transparent to the callbacks.
601 0 : }
602 :
603 0 : void MultiConnectionBaseImpl::onWriteBufferHighWatermark() {
604 0 : ASSERT(!connect_finished_);
605 0 : for (auto callback : post_connect_state_.connection_callbacks_) {
606 0 : if (callback) {
607 0 : callback->onAboveWriteBufferHighWatermark();
608 0 : }
609 0 : }
610 0 : }
611 :
612 : } // namespace Network
613 : } // namespace Envoy
|