Line data Source code
1 : #include "source/extensions/health_checkers/thrift/client_impl.h" 2 : 3 : #include "source/extensions/filters/network/thrift_proxy/thrift.h" 4 : 5 : namespace Envoy { 6 : namespace Extensions { 7 : namespace HealthCheckers { 8 : namespace ThriftHealthChecker { 9 : 10 0 : bool SimpleResponseDecoder::onData(Buffer::Instance& data) { 11 0 : buffer_.move(data); 12 : 13 0 : bool underflow = false; 14 0 : decoder_->onData(buffer_, underflow); 15 0 : ASSERT(complete_ || underflow); 16 : 17 0 : return complete_; 18 0 : } 19 : 20 0 : bool SimpleResponseDecoder::responseSuccess() { 21 0 : ENVOY_LOG(trace, "SimpleResponseDecoder responseSuccess complete={} success={}", complete_, 22 0 : success_.has_value() && success_.value()); 23 0 : return complete_ && success_.has_value() && success_.value(); 24 0 : } 25 : 26 0 : FilterStatus SimpleResponseDecoder::messageBegin(MessageMetadataSharedPtr metadata) { 27 0 : ENVOY_LOG(trace, "SimpleResponseDecoder messageBegin message_type={} reply_type={}", 28 0 : metadata->hasMessageType() ? MessageTypeNames::get().fromType(metadata->messageType()) 29 0 : : "-", 30 0 : metadata->hasReplyType() ? ReplyTypeNames::get().fromType(metadata->replyType()) : "-"); 31 : 32 0 : if (metadata->hasReplyType()) { 33 0 : success_ = metadata->replyType() == ReplyType::Success; 34 0 : } 35 : 36 0 : if (metadata->hasMessageType() && metadata->messageType() == MessageType::Exception) { 37 0 : success_ = false; 38 0 : } 39 0 : return FilterStatus::Continue; 40 0 : } 41 : 42 0 : FilterStatus SimpleResponseDecoder::messageEnd() { 43 0 : ENVOY_LOG(trace, "SimpleResponseDecoder messageEnd"); 44 0 : complete_ = true; 45 0 : return FilterStatus::Continue; 46 0 : } 47 : 48 0 : void ThriftSessionCallbacks::onEvent(Network::ConnectionEvent event) { parent_.onEvent(event); } 49 : 50 0 : void ThriftSessionCallbacks::onAboveWriteBufferHighWatermark() { 51 0 : parent_.onAboveWriteBufferHighWatermark(); 52 0 : } 53 : 54 0 : void ThriftSessionCallbacks::onBelowWriteBufferLowWatermark() { 55 0 : parent_.onBelowWriteBufferLowWatermark(); 56 0 : } 57 : 58 0 : Network::FilterStatus ThriftSessionCallbacks::onData(Buffer::Instance& data, bool) { 59 0 : parent_.onData(data); 60 0 : return Network::FilterStatus::StopIteration; 61 0 : } 62 : 63 0 : void ClientImpl::start() { 64 0 : Upstream::Host::CreateConnectionData conn_data = parent_.createConnection(); 65 0 : connection_ = std::move(conn_data.connection_); 66 0 : host_description_ = std::move(conn_data.host_description_); 67 0 : session_callbacks_ = std::make_unique<ThriftSessionCallbacks>(*this); 68 0 : connection_->addConnectionCallbacks(*session_callbacks_); 69 0 : connection_->addReadFilter(session_callbacks_); 70 : 71 0 : connection_->connect(); 72 0 : connection_->noDelay(true); 73 : 74 0 : ENVOY_CONN_LOG(trace, "ThriftHealthChecker ClientImpl start", *connection_); 75 0 : } 76 : 77 0 : bool ClientImpl::sendRequest() { 78 0 : ENVOY_CONN_LOG(trace, "ThriftHealthChecker ClientImpl sendRequest", *connection_); 79 0 : ASSERT(connection_->state() == Network::Connection::State::Open); 80 : 81 0 : Buffer::OwnedImpl request_buffer; 82 0 : ProtocolConverterSharedPtr protocol_converter = std::make_shared<ProtocolConverter>(); 83 0 : ProtocolPtr protocol = createProtocol(); 84 0 : protocol_converter->initProtocolConverter(*protocol, request_buffer); 85 : 86 0 : MessageMetadataSharedPtr metadata = std::make_shared<MessageMetadata>(); 87 0 : metadata->setProtocol(protocol_); 88 0 : metadata->setMethodName(method_name_); 89 0 : metadata->setMessageType(MessageType::Call); 90 0 : metadata->setSequenceId(sequenceId()); 91 : 92 0 : protocol_converter->messageBegin(metadata); 93 0 : protocol_converter->structBegin(""); 94 0 : FieldType field_type_stop = FieldType::Stop; 95 0 : int16_t field_id = 0; 96 0 : protocol_converter->fieldBegin("", field_type_stop, field_id); 97 0 : protocol_converter->structEnd(); 98 0 : protocol_converter->messageEnd(); 99 : 100 0 : TransportPtr transport = createTransport(); 101 0 : Buffer::OwnedImpl transport_buffer; 102 0 : transport->encodeFrame(transport_buffer, *metadata, request_buffer); 103 : 104 0 : connection_->write(transport_buffer, false); 105 0 : ENVOY_CONN_LOG(trace, "ThriftHealthChecker ClientImpl sendRequest success id={}", *connection_, 106 0 : metadata->sequenceId()); 107 0 : return true; 108 0 : } 109 : 110 0 : void ClientImpl::close() { 111 0 : ENVOY_CONN_LOG(trace, "ThriftHealthChecker ClientImpl close", *connection_); 112 0 : connection_->close(Network::ConnectionCloseType::Abort); 113 0 : } 114 : 115 0 : void ClientImpl::onData(Buffer::Instance& data) { 116 0 : ENVOY_CONN_LOG(trace, "ThriftHealthChecker ClientImpl onData. total pending buffer={}", 117 0 : *connection_, data.length()); 118 0 : if (!response_decoder_) { 119 0 : response_decoder_ = 120 0 : std::make_unique<SimpleResponseDecoder>(createTransport(), createProtocol()); 121 0 : } 122 0 : if (response_decoder_->onData(data)) { 123 0 : ENVOY_CONN_LOG(trace, "Response complete. Result={} ", *connection_, 124 0 : response_decoder_->responseSuccess()); 125 0 : parent_.onResponseResult(response_decoder_->responseSuccess()); 126 0 : } 127 0 : } 128 : 129 : ClientFactoryImpl ClientFactoryImpl::instance_; 130 : 131 : ClientPtr ClientFactoryImpl::create(ClientCallback& callbacks, TransportType transport, 132 : ProtocolType protocol, const std::string& method_name, 133 : Upstream::HostSharedPtr host, int32_t seq_id, 134 0 : bool fixed_seq_id) { 135 0 : auto client = std::make_unique<ClientImpl>(callbacks, transport, protocol, method_name, 136 0 : std::move(host), seq_id, fixed_seq_id); 137 0 : return client; 138 0 : } 139 : 140 : } // namespace ThriftHealthChecker 141 : } // namespace HealthCheckers 142 : } // namespace Extensions 143 : } // namespace Envoy