1
#include "source/common/tcp/async_tcp_client_impl.h"
2

            
3
#include <cstddef>
4
#include <memory>
5
#include <utility>
6

            
7
#include "envoy/event/dispatcher.h"
8
#include "envoy/network/connection.h"
9
#include "envoy/tcp/async_tcp_client.h"
10
#include "envoy/upstream/upstream.h"
11

            
12
#include "source/common/common/assert.h"
13
#include "source/common/stats/timespan_impl.h"
14

            
15
namespace Envoy {
16
namespace Tcp {
17

            
18
AsyncTcpClientImpl::AsyncTcpClientImpl(Event::Dispatcher& dispatcher,
19
                                       Upstream::ThreadLocalCluster& thread_local_cluster,
20
                                       Upstream::LoadBalancerContext* context,
21
                                       bool enable_half_close)
22
41
    : dispatcher_(dispatcher), thread_local_cluster_(thread_local_cluster),
23
41
      cluster_info_(thread_local_cluster_.info()), context_(context),
24
41
      connect_timer_(dispatcher.createTimer([this]() { onConnectTimeout(); })),
25
41
      enable_half_close_(enable_half_close) {}
26

            
27
41
AsyncTcpClientImpl::~AsyncTcpClientImpl() {
28
41
  if (connection_) {
29
5
    connection_->removeConnectionCallbacks(*this);
30
5
  }
31

            
32
41
  closeImpl(Network::ConnectionCloseType::NoFlush);
33
41
}
34

            
35
41
bool AsyncTcpClientImpl::connect() {
36
41
  if (connection_) {
37
3
    return false;
38
3
  }
39

            
40
38
  connection_ = std::move(thread_local_cluster_.tcpConn(context_).connection_);
41
38
  if (!connection_) {
42
1
    return false;
43
1
  }
44

            
45
37
  cluster_info_->trafficStats()->upstream_cx_total_.inc();
46
37
  cluster_info_->trafficStats()->upstream_cx_active_.inc();
47
37
  connection_->enableHalfClose(enable_half_close_);
48
37
  connection_->addConnectionCallbacks(*this);
49
37
  connection_->addReadFilter(std::make_shared<NetworkReadFilter>(*this));
50
37
  conn_connect_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
51
37
      cluster_info_->trafficStats()->upstream_cx_connect_ms_, dispatcher_.timeSource());
52

            
53
37
  if (!connect_timer_) {
54
5
    connect_timer_ = dispatcher_.createTimer([this]() { onConnectTimeout(); });
55
5
  }
56

            
57
37
  connect_timer_->enableTimer(cluster_info_->connectTimeout());
58
37
  connection_->setConnectionStats({cluster_info_->trafficStats()->upstream_cx_rx_bytes_total_,
59
37
                                   cluster_info_->trafficStats()->upstream_cx_rx_bytes_buffered_,
60
37
                                   cluster_info_->trafficStats()->upstream_cx_tx_bytes_total_,
61
37
                                   cluster_info_->trafficStats()->upstream_cx_tx_bytes_buffered_,
62
37
                                   &cluster_info_->trafficStats()->bind_errors_, nullptr});
63
37
  connection_->noDelay(true);
64
37
  connection_->connect();
65
37
  return true;
66
38
}
67

            
68
1
void AsyncTcpClientImpl::onConnectTimeout() {
69
1
  if (connection_) {
70
1
    ENVOY_CONN_LOG(debug, "async tcp connection timed out", *connection_);
71
1
  } else {
72
    ENVOY_LOG(debug, "async tcp client timed out before creating a connection");
73
  }
74

            
75
1
  cluster_info_->trafficStats()->upstream_cx_connect_timeout_.inc();
76
1
  close(Network::ConnectionCloseType::NoFlush);
77
1
}
78

            
79
59
void AsyncTcpClientImpl::closeImpl(Network::ConnectionCloseType type) {
80
59
  if (connection_ && !closing_) {
81
23
    closing_ = true;
82
23
    connection_->close(type);
83
23
  }
84
59
}
85

            
86
42
void AsyncTcpClientImpl::setAsyncTcpClientCallbacks(AsyncTcpClientCallbacks& callbacks) {
87
42
  callbacks_ = &callbacks;
88
42
}
89

            
90
21
void AsyncTcpClientImpl::write(Buffer::Instance& data, bool end_stream) {
91
21
  ASSERT(connection_ != nullptr);
92
21
  connection_->write(data, end_stream);
93
21
}
94

            
95
9
void AsyncTcpClientImpl::onData(Buffer::Instance& data, bool end_stream) {
96
9
  if (callbacks_) {
97
9
    callbacks_->onData(data, end_stream);
98
9
  }
99
9
}
100

            
101
64
void AsyncTcpClientImpl::disableConnectTimeout() {
102
64
  if (connect_timer_) {
103
35
    connect_timer_->disableTimer();
104
35
    connect_timer_.reset();
105
35
  }
106
64
}
107

            
108
32
void AsyncTcpClientImpl::reportConnectionDestroy(Network::ConnectionEvent event) {
109
32
  auto& stats = cluster_info_->trafficStats();
110
32
  stats->upstream_cx_destroy_.inc();
111
32
  if (event == Network::ConnectionEvent::RemoteClose) {
112
11
    stats->upstream_cx_destroy_remote_.inc();
113
29
  } else {
114
21
    stats->upstream_cx_destroy_local_.inc();
115
21
  }
116
32
}
117

            
118
64
void AsyncTcpClientImpl::onEvent(Network::ConnectionEvent event) {
119
64
  if (event == Network::ConnectionEvent::RemoteClose ||
120
64
      event == Network::ConnectionEvent::LocalClose) {
121
32
    cluster_info_->trafficStats()->upstream_cx_active_.dec();
122
32
    if (!connected_) {
123
3
      cluster_info_->trafficStats()->upstream_cx_connect_fail_.inc();
124
3
    }
125

            
126
32
    if (connected_ && conn_length_ms_ != nullptr) {
127
29
      conn_length_ms_->complete();
128
29
      conn_length_ms_.reset();
129
29
    }
130

            
131
32
    disableConnectTimeout();
132
32
    reportConnectionDestroy(event);
133

            
134
32
    connected_ = false;
135
32
    if (connection_) {
136
32
      detected_close_ = connection_->detectedCloseType();
137
32
    }
138

            
139
32
    closing_ = false;
140
32
    dispatcher_.deferredDelete(std::move(connection_));
141
32
    if (callbacks_) {
142
32
      callbacks_->onEvent(event);
143
32
    }
144
32
  } else {
145
32
    connected_ = true;
146
32
    conn_connect_ms_->complete();
147
32
    conn_connect_ms_.reset();
148
32
    disableConnectTimeout();
149

            
150
32
    if (!conn_length_ms_) {
151
32
      conn_length_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
152
32
          cluster_info_->trafficStats()->upstream_cx_length_ms_, dispatcher_.timeSource());
153
32
    }
154
32
    if (callbacks_) {
155
32
      callbacks_->onEvent(event);
156
32
    }
157
32
  }
158
64
}
159

            
160
} // namespace Tcp
161
} // namespace Envoy