1
#include "source/extensions/health_checkers/thrift/client_impl.h"
2

            
3
#include "source/extensions/filters/network/thrift_proxy/thrift.h"
4

            
5
namespace Envoy {
6
namespace Extensions {
7
namespace HealthCheckers {
8
namespace ThriftHealthChecker {
9

            
10
8
bool SimpleResponseDecoder::onData(Buffer::Instance& data) {
11
8
  buffer_.move(data);
12

            
13
8
  bool underflow = false;
14
8
  decoder_->onData(buffer_, underflow);
15
8
  ASSERT(complete_ || underflow);
16

            
17
8
  return complete_;
18
8
}
19

            
20
8
bool SimpleResponseDecoder::responseSuccess() {
21
8
  ENVOY_LOG(trace, "SimpleResponseDecoder responseSuccess complete={} success={}", complete_,
22
8
            success_.has_value() && success_.value());
23
8
  return complete_ && success_.has_value() && success_.value();
24
8
}
25

            
26
8
FilterStatus SimpleResponseDecoder::messageBegin(MessageMetadataSharedPtr metadata) {
27
8
  ENVOY_LOG(trace, "SimpleResponseDecoder messageBegin message_type={} reply_type={}",
28
8
            metadata->hasMessageType() ? MessageTypeNames::get().fromType(metadata->messageType())
29
8
                                       : "-",
30
8
            metadata->hasReplyType() ? ReplyTypeNames::get().fromType(metadata->replyType()) : "-");
31

            
32
8
  if (metadata->hasReplyType()) {
33
7
    success_ = metadata->replyType() == ReplyType::Success;
34
7
  }
35

            
36
8
  if (metadata->hasMessageType() && metadata->messageType() == MessageType::Exception) {
37
1
    success_ = false;
38
1
  }
39
8
  return FilterStatus::Continue;
40
8
}
41

            
42
8
FilterStatus SimpleResponseDecoder::messageEnd() {
43
8
  ENVOY_LOG(trace, "SimpleResponseDecoder messageEnd");
44
8
  complete_ = true;
45
8
  return FilterStatus::Continue;
46
8
}
47

            
48
6
void ThriftSessionCallbacks::onEvent(Network::ConnectionEvent event) { parent_.onEvent(event); }
49

            
50
6
void ThriftSessionCallbacks::onAboveWriteBufferHighWatermark() {
51
6
  parent_.onAboveWriteBufferHighWatermark();
52
6
}
53

            
54
6
void ThriftSessionCallbacks::onBelowWriteBufferLowWatermark() {
55
6
  parent_.onBelowWriteBufferLowWatermark();
56
6
}
57

            
58
8
Network::FilterStatus ThriftSessionCallbacks::onData(Buffer::Instance& data, bool) {
59
8
  parent_.onData(data);
60
8
  return Network::FilterStatus::StopIteration;
61
8
}
62

            
63
6
void ClientImpl::start() {
64
6
  Upstream::Host::CreateConnectionData conn_data = parent_.createConnection();
65
6
  connection_ = std::move(conn_data.connection_);
66
6
  host_description_ = std::move(conn_data.host_description_);
67
6
  session_callbacks_ = std::make_unique<ThriftSessionCallbacks>(*this);
68
6
  connection_->addConnectionCallbacks(*session_callbacks_);
69
6
  connection_->addReadFilter(session_callbacks_);
70

            
71
6
  connection_->connect();
72
6
  connection_->noDelay(true);
73

            
74
6
  ENVOY_CONN_LOG(trace, "ThriftHealthChecker ClientImpl start", *connection_);
75
6
}
76

            
77
8
bool ClientImpl::sendRequest() {
78
8
  ENVOY_CONN_LOG(trace, "ThriftHealthChecker ClientImpl sendRequest", *connection_);
79
8
  ASSERT(connection_->state() == Network::Connection::State::Open);
80

            
81
8
  Buffer::OwnedImpl request_buffer;
82
8
  ProtocolConverterSharedPtr protocol_converter = std::make_shared<ProtocolConverter>();
83
8
  ProtocolPtr protocol = createProtocol();
84
8
  protocol_converter->initProtocolConverter(*protocol, request_buffer);
85

            
86
8
  MessageMetadataSharedPtr metadata = std::make_shared<MessageMetadata>();
87
8
  metadata->setProtocol(protocol_);
88
8
  metadata->setMethodName(method_name_);
89
8
  metadata->setMessageType(MessageType::Call);
90
8
  metadata->setSequenceId(sequenceId());
91

            
92
8
  protocol_converter->messageBegin(metadata);
93
8
  protocol_converter->structBegin("");
94
8
  FieldType field_type_stop = FieldType::Stop;
95
8
  int16_t field_id = 0;
96
8
  protocol_converter->fieldBegin("", field_type_stop, field_id);
97
8
  protocol_converter->structEnd();
98
8
  protocol_converter->messageEnd();
99

            
100
8
  TransportPtr transport = createTransport();
101
8
  Buffer::OwnedImpl transport_buffer;
102
8
  transport->encodeFrame(transport_buffer, *metadata, request_buffer);
103

            
104
8
  connection_->write(transport_buffer, false);
105
8
  ENVOY_CONN_LOG(trace, "ThriftHealthChecker ClientImpl sendRequest success id={}", *connection_,
106
8
                 metadata->sequenceId());
107
8
  return true;
108
8
}
109

            
110
6
void ClientImpl::close() {
111
6
  ENVOY_CONN_LOG(trace, "ThriftHealthChecker ClientImpl close", *connection_);
112
6
  connection_->close(Network::ConnectionCloseType::Abort);
113
6
}
114

            
115
8
void ClientImpl::onData(Buffer::Instance& data) {
116
8
  ENVOY_CONN_LOG(trace, "ThriftHealthChecker ClientImpl onData. total pending buffer={}",
117
8
                 *connection_, data.length());
118
8
  if (!response_decoder_) {
119
6
    response_decoder_ =
120
6
        std::make_unique<SimpleResponseDecoder>(createTransport(), createProtocol());
121
6
  }
122
8
  if (response_decoder_->onData(data)) {
123
8
    ENVOY_CONN_LOG(trace, "Response complete. Result={} ", *connection_,
124
8
                   response_decoder_->responseSuccess());
125
8
    parent_.onResponseResult(response_decoder_->responseSuccess());
126
8
  }
127
8
}
128

            
129
ClientFactoryImpl ClientFactoryImpl::instance_;
130

            
131
ClientPtr ClientFactoryImpl::create(ClientCallback& callbacks, TransportType transport,
132
                                    ProtocolType protocol, const std::string& method_name,
133
                                    Upstream::HostSharedPtr host, int32_t seq_id,
134
6
                                    bool fixed_seq_id) {
135
6
  auto client = std::make_unique<ClientImpl>(callbacks, transport, protocol, method_name,
136
6
                                             std::move(host), seq_id, fixed_seq_id);
137
6
  return client;
138
6
}
139

            
140
} // namespace ThriftHealthChecker
141
} // namespace HealthCheckers
142
} // namespace Extensions
143
} // namespace Envoy