1
#pragma once
2

            
3
#include <chrono>
4
#include <cstddef>
5
#include <string>
6

            
7
#include "envoy/common/optref.h"
8
#include "envoy/event/dispatcher.h"
9
#include "envoy/network/connection.h"
10
#include "envoy/network/filter.h"
11
#include "envoy/stats/timespan.h"
12
#include "envoy/stream_info/stream_info.h"
13
#include "envoy/tcp/async_tcp_client.h"
14
#include "envoy/upstream/thread_local_cluster.h"
15

            
16
#include "source/common/network/filter_impl.h"
17

            
18
#include "absl/strings/string_view.h"
19
#include "absl/types/optional.h"
20

            
21
namespace Envoy {
22
namespace Tcp {
23

            
24
class AsyncTcpClientImpl : public AsyncTcpClient,
25
                           public Network::ConnectionCallbacks,
26
                           public Logger::Loggable<Logger::Id::client> {
27
public:
28
  AsyncTcpClientImpl(Event::Dispatcher& dispatcher,
29
                     Upstream::ThreadLocalCluster& thread_local_cluster,
30
                     Upstream::LoadBalancerContext* context, bool enable_half_close);
31
  ~AsyncTcpClientImpl() override;
32

            
33
18
  void close(Network::ConnectionCloseType type) override { closeImpl(type); }
34

            
35
5
  StreamInfo::DetectedCloseType detectedCloseType() const override { return detected_close_; }
36

            
37
  /**
38
   * @return true means a host is successfully picked from a Cluster.
39
   * This doesn't mean the connection is established.
40
   */
41
  bool connect() override;
42

            
43
  void onConnectTimeout();
44

            
45
  void setAsyncTcpClientCallbacks(AsyncTcpClientCallbacks& callbacks) override;
46

            
47
  void write(Buffer::Instance& data, bool end_stream) override;
48

            
49
2
  void readDisable(bool disable) override {
50
2
    if (connection_) {
51
2
      connection_->readDisable(disable);
52
2
    }
53
2
  };
54

            
55
  /**
56
   * @return if the client connects to a peer host.
57
   */
58
41
  bool connected() override { return connected_; }
59

            
60
1
  Event::Dispatcher& dispatcher() override { return dispatcher_; }
61

            
62
4
  OptRef<StreamInfo::StreamInfo> getStreamInfo() override {
63
4
    if (connection_) {
64
3
      return connection_->streamInfo();
65
3
    } else {
66
1
      return absl::nullopt;
67
1
    }
68
4
  }
69

            
70
private:
71
  // This implements the AsyncTcpClient::close but exists as non-virtual to avoid calling it in the
72
  // destructor.
73
  void closeImpl(Network::ConnectionCloseType type);
74

            
75
  struct NetworkReadFilter : public Network::ReadFilterBaseImpl {
76
37
    NetworkReadFilter(AsyncTcpClientImpl& parent) : parent_(parent) {}
77

            
78
    // Network::ReadFilter
79
9
    Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override {
80
9
      parent_.onData(data, end_stream);
81
9
      return Network::FilterStatus::StopIteration;
82
9
    }
83

            
84
    AsyncTcpClientImpl& parent_;
85
  };
86

            
87
  void onData(Buffer::Instance& data, bool end_stream);
88

            
89
  void onEvent(Network::ConnectionEvent event) override;
90
1
  void onAboveWriteBufferHighWatermark() override {
91
1
    if (callbacks_) {
92
1
      callbacks_->onAboveWriteBufferHighWatermark();
93
1
    }
94
1
  }
95
1
  void onBelowWriteBufferLowWatermark() override {
96
1
    if (callbacks_) {
97
1
      callbacks_->onBelowWriteBufferLowWatermark();
98
1
    }
99
1
  }
100

            
101
  void disableConnectTimeout();
102
  void reportConnectionDestroy(Network::ConnectionEvent event);
103

            
104
  Event::Dispatcher& dispatcher_;
105
  Network::ClientConnectionPtr connection_;
106
  Upstream::ThreadLocalCluster& thread_local_cluster_;
107
  Upstream::ClusterInfoConstSharedPtr cluster_info_;
108
  Upstream::LoadBalancerContext* context_;
109
  Stats::TimespanPtr conn_connect_ms_;
110
  Stats::TimespanPtr conn_length_ms_;
111
  Event::TimerPtr connect_timer_;
112
  AsyncTcpClientCallbacks* callbacks_{};
113
  StreamInfo::DetectedCloseType detected_close_{StreamInfo::DetectedCloseType::Normal};
114
  bool closing_{false};
115
  bool connected_{false};
116
  bool enable_half_close_{false};
117
};
118

            
119
} // namespace Tcp
120
} // namespace Envoy