Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/dubbo_proxy/decoder.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include "envoy/buffer/buffer.h"
4
5
#include "source/common/buffer/buffer_impl.h"
6
#include "source/common/common/logger.h"
7
#include "source/extensions/filters/network/dubbo_proxy/decoder_event_handler.h"
8
#include "source/extensions/filters/network/dubbo_proxy/protocol.h"
9
#include "source/extensions/filters/network/dubbo_proxy/serializer.h"
10
11
namespace Envoy {
12
namespace Extensions {
13
namespace NetworkFilters {
14
namespace DubboProxy {
15
16
#define ALL_PROTOCOL_STATES(FUNCTION)                                                              \
17
  FUNCTION(StopIteration)                                                                          \
18
  FUNCTION(WaitForData)                                                                            \
19
  FUNCTION(OnDecodeStreamHeader)                                                                   \
20
  FUNCTION(OnDecodeStreamData)                                                                     \
21
  FUNCTION(Done)
22
23
/**
24
 * ProtocolState represents a set of states used in a state machine to decode Dubbo requests
25
 * and responses.
26
 */
27
enum class ProtocolState { ALL_PROTOCOL_STATES(GENERATE_ENUM) };
28
29
class ProtocolStateNameValues {
30
public:
31
0
  static const std::string& name(ProtocolState state) {
32
0
    size_t i = static_cast<size_t>(state);
33
0
    ASSERT(i < names().size());
34
0
    return names()[i];
35
0
  }
36
37
private:
38
0
  static const std::vector<std::string>& names() {
39
0
    CONSTRUCT_ON_FIRST_USE(std::vector<std::string>, {ALL_PROTOCOL_STATES(GENERATE_STRING)});
40
0
  }
41
};
42
43
struct ActiveStream {
44
  ActiveStream(StreamHandler& handler, MessageMetadataSharedPtr metadata, ContextSharedPtr context)
45
0
      : handler_(handler), metadata_(metadata), context_(context) {}
46
0
  ~ActiveStream() {
47
0
    metadata_.reset();
48
0
    context_.reset();
49
0
  }
50
51
0
  void onStreamDecoded() {
52
0
    ASSERT(metadata_ && context_);
53
0
    handler_.onStreamDecoded(metadata_, context_);
54
0
  }
55
56
  StreamHandler& handler_;
57
  MessageMetadataSharedPtr metadata_;
58
  ContextSharedPtr context_;
59
};
60
61
using ActiveStreamPtr = std::unique_ptr<ActiveStream>;
62
63
class DecoderStateMachine : public Logger::Loggable<Logger::Id::dubbo> {
64
public:
65
  class Delegate {
66
  public:
67
0
    virtual ~Delegate() = default;
68
    virtual ActiveStream* newStream(MessageMetadataSharedPtr metadata,
69
                                    ContextSharedPtr context) PURE;
70
    virtual void onHeartbeat(MessageMetadataSharedPtr metadata) PURE;
71
  };
72
73
  DecoderStateMachine(Protocol& protocol, Delegate& delegate)
74
0
      : protocol_(protocol), delegate_(delegate) {}
75
76
  /**
77
   * Consumes as much data from the configured Buffer as possible and executes the decoding state
78
   * machine. Returns ProtocolState::WaitForData if more data is required to complete processing of
79
   * a message. Returns ProtocolState::Done when the end of a message is successfully processed.
80
   * Once the Done state is reached, further invocations of run return immediately with Done.
81
   *
82
   * @param buffer a buffer containing the remaining data to be processed
83
   * @return ProtocolState returns with ProtocolState::WaitForData or ProtocolState::Done
84
   * @throw Envoy Exception if thrown by the underlying Protocol
85
   */
86
  ProtocolState run(Buffer::Instance& buffer);
87
88
  /**
89
   * @return the current ProtocolState
90
   */
91
0
  ProtocolState currentState() const { return state_; }
92
93
private:
94
  struct DecoderStatus {
95
    DecoderStatus() = default;
96
0
    DecoderStatus(ProtocolState next_state) : next_state_(next_state){};
97
    DecoderStatus(ProtocolState next_state, FilterStatus filter_status)
98
0
        : next_state_(next_state), filter_status_(filter_status){};
99
100
    ProtocolState next_state_;
101
    absl::optional<FilterStatus> filter_status_;
102
  };
103
104
  // These functions map directly to the matching ProtocolState values. Each returns the next state
105
  // or ProtocolState::WaitForData if more data is required.
106
  DecoderStatus onDecodeStreamHeader(Buffer::Instance& buffer);
107
  DecoderStatus onDecodeStreamData(Buffer::Instance& buffer);
108
109
  // handleState delegates to the appropriate method based on state_.
110
  DecoderStatus handleState(Buffer::Instance& buffer);
111
112
  Protocol& protocol_;
113
  Delegate& delegate_;
114
115
  ProtocolState state_{ProtocolState::OnDecodeStreamHeader};
116
  ActiveStream* active_stream_{nullptr};
117
};
118
119
using DecoderStateMachinePtr = std::unique_ptr<DecoderStateMachine>;
120
121
class DecoderBase : public DecoderStateMachine::Delegate,
122
                    public Logger::Loggable<Logger::Id::dubbo> {
123
public:
124
  DecoderBase(Protocol& protocol);
125
  ~DecoderBase() override;
126
127
  /**
128
   * Drains data from the given buffer
129
   *
130
   * @param data a Buffer containing Dubbo protocol data
131
   * @throw EnvoyException on Dubbo protocol errors
132
   */
133
  FilterStatus onData(Buffer::Instance& data, bool& buffer_underflow);
134
135
0
  const Protocol& protocol() { return protocol_; }
136
137
  // It is assumed that all of the protocol parsing are stateless,
138
  // if there is a state of the need to provide the reset interface call here.
139
  void reset();
140
141
protected:
142
  void start();
143
  void complete();
144
145
  Protocol& protocol_;
146
147
  ActiveStreamPtr stream_;
148
  DecoderStateMachinePtr state_machine_;
149
150
  bool decode_started_{false};
151
};
152
153
/**
154
 * Decoder encapsulates a configured and ProtocolPtr and SerializationPtr.
155
 */
156
template <typename T> class Decoder : public DecoderBase {
157
public:
158
0
  Decoder(Protocol& protocol, T& callbacks) : DecoderBase(protocol), callbacks_(callbacks) {}
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::ResponseDecoderCallbacks>::Decoder(Envoy::Extensions::NetworkFilters::DubboProxy::Protocol&, Envoy::Extensions::NetworkFilters::DubboProxy::ResponseDecoderCallbacks&)
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::RequestDecoderCallbacks>::Decoder(Envoy::Extensions::NetworkFilters::DubboProxy::Protocol&, Envoy::Extensions::NetworkFilters::DubboProxy::RequestDecoderCallbacks&)
159
160
0
  ActiveStream* newStream(MessageMetadataSharedPtr metadata, ContextSharedPtr context) override {
161
0
    ASSERT(!stream_);
162
0
    stream_ = std::make_unique<ActiveStream>(callbacks_.newStream(), metadata, context);
163
0
    return stream_.get();
164
0
  }
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::RequestDecoderCallbacks>::newStream(std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::MessageMetadata>, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::Context>)
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::ResponseDecoderCallbacks>::newStream(std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::MessageMetadata>, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::Context>)
165
166
0
  void onHeartbeat(MessageMetadataSharedPtr metadata) override { callbacks_.onHeartbeat(metadata); }
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::RequestDecoderCallbacks>::onHeartbeat(std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::MessageMetadata>)
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::ResponseDecoderCallbacks>::onHeartbeat(std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::MessageMetadata>)
167
168
private:
169
  T& callbacks_;
170
};
171
172
class RequestDecoder : public Decoder<RequestDecoderCallbacks> {
173
public:
174
  RequestDecoder(Protocol& protocol, RequestDecoderCallbacks& callbacks)
175
0
      : Decoder(protocol, callbacks) {}
176
};
177
178
using RequestDecoderPtr = std::unique_ptr<RequestDecoder>;
179
180
class ResponseDecoder : public Decoder<ResponseDecoderCallbacks> {
181
public:
182
  ResponseDecoder(Protocol& protocol, ResponseDecoderCallbacks& callbacks)
183
0
      : Decoder(protocol, callbacks) {}
184
};
185
186
using ResponseDecoderPtr = std::unique_ptr<ResponseDecoder>;
187
188
} // namespace DubboProxy
189
} // namespace NetworkFilters
190
} // namespace Extensions
191
} // namespace Envoy