/proc/self/cwd/source/extensions/filters/network/dubbo_proxy/decoder.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include "envoy/buffer/buffer.h" |
4 | | |
5 | | #include "source/common/buffer/buffer_impl.h" |
6 | | #include "source/common/common/logger.h" |
7 | | #include "source/extensions/filters/network/dubbo_proxy/decoder_event_handler.h" |
8 | | #include "source/extensions/filters/network/dubbo_proxy/protocol.h" |
9 | | #include "source/extensions/filters/network/dubbo_proxy/serializer.h" |
10 | | |
11 | | namespace Envoy { |
12 | | namespace Extensions { |
13 | | namespace NetworkFilters { |
14 | | namespace DubboProxy { |
15 | | |
16 | | #define ALL_PROTOCOL_STATES(FUNCTION) \ |
17 | | FUNCTION(StopIteration) \ |
18 | | FUNCTION(WaitForData) \ |
19 | | FUNCTION(OnDecodeStreamHeader) \ |
20 | | FUNCTION(OnDecodeStreamData) \ |
21 | | FUNCTION(Done) |
22 | | |
23 | | /** |
24 | | * ProtocolState represents a set of states used in a state machine to decode Dubbo requests |
25 | | * and responses. |
26 | | */ |
27 | | enum class ProtocolState { ALL_PROTOCOL_STATES(GENERATE_ENUM) }; |
28 | | |
29 | | class ProtocolStateNameValues { |
30 | | public: |
31 | 0 | static const std::string& name(ProtocolState state) { |
32 | 0 | size_t i = static_cast<size_t>(state); |
33 | 0 | ASSERT(i < names().size()); |
34 | 0 | return names()[i]; |
35 | 0 | } |
36 | | |
37 | | private: |
38 | 0 | static const std::vector<std::string>& names() { |
39 | 0 | CONSTRUCT_ON_FIRST_USE(std::vector<std::string>, {ALL_PROTOCOL_STATES(GENERATE_STRING)}); |
40 | 0 | } |
41 | | }; |
42 | | |
43 | | struct ActiveStream { |
44 | | ActiveStream(StreamHandler& handler, MessageMetadataSharedPtr metadata, ContextSharedPtr context) |
45 | 0 | : handler_(handler), metadata_(metadata), context_(context) {} |
46 | 0 | ~ActiveStream() { |
47 | 0 | metadata_.reset(); |
48 | 0 | context_.reset(); |
49 | 0 | } |
50 | | |
51 | 0 | void onStreamDecoded() { |
52 | 0 | ASSERT(metadata_ && context_); |
53 | 0 | handler_.onStreamDecoded(metadata_, context_); |
54 | 0 | } |
55 | | |
56 | | StreamHandler& handler_; |
57 | | MessageMetadataSharedPtr metadata_; |
58 | | ContextSharedPtr context_; |
59 | | }; |
60 | | |
61 | | using ActiveStreamPtr = std::unique_ptr<ActiveStream>; |
62 | | |
63 | | class DecoderStateMachine : public Logger::Loggable<Logger::Id::dubbo> { |
64 | | public: |
65 | | class Delegate { |
66 | | public: |
67 | 0 | virtual ~Delegate() = default; |
68 | | virtual ActiveStream* newStream(MessageMetadataSharedPtr metadata, |
69 | | ContextSharedPtr context) PURE; |
70 | | virtual void onHeartbeat(MessageMetadataSharedPtr metadata) PURE; |
71 | | }; |
72 | | |
73 | | DecoderStateMachine(Protocol& protocol, Delegate& delegate) |
74 | 0 | : protocol_(protocol), delegate_(delegate) {} |
75 | | |
76 | | /** |
77 | | * Consumes as much data from the configured Buffer as possible and executes the decoding state |
78 | | * machine. Returns ProtocolState::WaitForData if more data is required to complete processing of |
79 | | * a message. Returns ProtocolState::Done when the end of a message is successfully processed. |
80 | | * Once the Done state is reached, further invocations of run return immediately with Done. |
81 | | * |
82 | | * @param buffer a buffer containing the remaining data to be processed |
83 | | * @return ProtocolState returns with ProtocolState::WaitForData or ProtocolState::Done |
84 | | * @throw Envoy Exception if thrown by the underlying Protocol |
85 | | */ |
86 | | ProtocolState run(Buffer::Instance& buffer); |
87 | | |
88 | | /** |
89 | | * @return the current ProtocolState |
90 | | */ |
91 | 0 | ProtocolState currentState() const { return state_; } |
92 | | |
93 | | private: |
94 | | struct DecoderStatus { |
95 | | DecoderStatus() = default; |
96 | 0 | DecoderStatus(ProtocolState next_state) : next_state_(next_state){}; |
97 | | DecoderStatus(ProtocolState next_state, FilterStatus filter_status) |
98 | 0 | : next_state_(next_state), filter_status_(filter_status){}; |
99 | | |
100 | | ProtocolState next_state_; |
101 | | absl::optional<FilterStatus> filter_status_; |
102 | | }; |
103 | | |
104 | | // These functions map directly to the matching ProtocolState values. Each returns the next state |
105 | | // or ProtocolState::WaitForData if more data is required. |
106 | | DecoderStatus onDecodeStreamHeader(Buffer::Instance& buffer); |
107 | | DecoderStatus onDecodeStreamData(Buffer::Instance& buffer); |
108 | | |
109 | | // handleState delegates to the appropriate method based on state_. |
110 | | DecoderStatus handleState(Buffer::Instance& buffer); |
111 | | |
112 | | Protocol& protocol_; |
113 | | Delegate& delegate_; |
114 | | |
115 | | ProtocolState state_{ProtocolState::OnDecodeStreamHeader}; |
116 | | ActiveStream* active_stream_{nullptr}; |
117 | | }; |
118 | | |
119 | | using DecoderStateMachinePtr = std::unique_ptr<DecoderStateMachine>; |
120 | | |
121 | | class DecoderBase : public DecoderStateMachine::Delegate, |
122 | | public Logger::Loggable<Logger::Id::dubbo> { |
123 | | public: |
124 | | DecoderBase(Protocol& protocol); |
125 | | ~DecoderBase() override; |
126 | | |
127 | | /** |
128 | | * Drains data from the given buffer |
129 | | * |
130 | | * @param data a Buffer containing Dubbo protocol data |
131 | | * @throw EnvoyException on Dubbo protocol errors |
132 | | */ |
133 | | FilterStatus onData(Buffer::Instance& data, bool& buffer_underflow); |
134 | | |
135 | 0 | const Protocol& protocol() { return protocol_; } |
136 | | |
137 | | // It is assumed that all of the protocol parsing are stateless, |
138 | | // if there is a state of the need to provide the reset interface call here. |
139 | | void reset(); |
140 | | |
141 | | protected: |
142 | | void start(); |
143 | | void complete(); |
144 | | |
145 | | Protocol& protocol_; |
146 | | |
147 | | ActiveStreamPtr stream_; |
148 | | DecoderStateMachinePtr state_machine_; |
149 | | |
150 | | bool decode_started_{false}; |
151 | | }; |
152 | | |
153 | | /** |
154 | | * Decoder encapsulates a configured and ProtocolPtr and SerializationPtr. |
155 | | */ |
156 | | template <typename T> class Decoder : public DecoderBase { |
157 | | public: |
158 | 0 | Decoder(Protocol& protocol, T& callbacks) : DecoderBase(protocol), callbacks_(callbacks) {} Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::ResponseDecoderCallbacks>::Decoder(Envoy::Extensions::NetworkFilters::DubboProxy::Protocol&, Envoy::Extensions::NetworkFilters::DubboProxy::ResponseDecoderCallbacks&) Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::RequestDecoderCallbacks>::Decoder(Envoy::Extensions::NetworkFilters::DubboProxy::Protocol&, Envoy::Extensions::NetworkFilters::DubboProxy::RequestDecoderCallbacks&) |
159 | | |
160 | 0 | ActiveStream* newStream(MessageMetadataSharedPtr metadata, ContextSharedPtr context) override { |
161 | 0 | ASSERT(!stream_); |
162 | 0 | stream_ = std::make_unique<ActiveStream>(callbacks_.newStream(), metadata, context); |
163 | 0 | return stream_.get(); |
164 | 0 | } Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::RequestDecoderCallbacks>::newStream(std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::MessageMetadata>, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::Context>) Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::ResponseDecoderCallbacks>::newStream(std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::MessageMetadata>, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::Context>) |
165 | | |
166 | 0 | void onHeartbeat(MessageMetadataSharedPtr metadata) override { callbacks_.onHeartbeat(metadata); } Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::RequestDecoderCallbacks>::onHeartbeat(std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::MessageMetadata>) Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::Decoder<Envoy::Extensions::NetworkFilters::DubboProxy::ResponseDecoderCallbacks>::onHeartbeat(std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::MessageMetadata>) |
167 | | |
168 | | private: |
169 | | T& callbacks_; |
170 | | }; |
171 | | |
172 | | class RequestDecoder : public Decoder<RequestDecoderCallbacks> { |
173 | | public: |
174 | | RequestDecoder(Protocol& protocol, RequestDecoderCallbacks& callbacks) |
175 | 0 | : Decoder(protocol, callbacks) {} |
176 | | }; |
177 | | |
178 | | using RequestDecoderPtr = std::unique_ptr<RequestDecoder>; |
179 | | |
180 | | class ResponseDecoder : public Decoder<ResponseDecoderCallbacks> { |
181 | | public: |
182 | | ResponseDecoder(Protocol& protocol, ResponseDecoderCallbacks& callbacks) |
183 | 0 | : Decoder(protocol, callbacks) {} |
184 | | }; |
185 | | |
186 | | using ResponseDecoderPtr = std::unique_ptr<ResponseDecoder>; |
187 | | |
188 | | } // namespace DubboProxy |
189 | | } // namespace NetworkFilters |
190 | | } // namespace Extensions |
191 | | } // namespace Envoy |