/proc/self/cwd/source/extensions/filters/network/dubbo_proxy/active_message.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include "envoy/event/deferred_deletable.h" |
4 | | #include "envoy/network/connection.h" |
5 | | #include "envoy/network/filter.h" |
6 | | #include "envoy/stats/timespan.h" |
7 | | |
8 | | #include "source/common/buffer/buffer_impl.h" |
9 | | #include "source/common/common/linked_object.h" |
10 | | #include "source/common/common/logger.h" |
11 | | #include "source/common/stream_info/stream_info_impl.h" |
12 | | #include "source/extensions/filters/network/dubbo_proxy/decoder.h" |
13 | | #include "source/extensions/filters/network/dubbo_proxy/decoder_event_handler.h" |
14 | | #include "source/extensions/filters/network/dubbo_proxy/filters/filter.h" |
15 | | #include "source/extensions/filters/network/dubbo_proxy/metadata.h" |
16 | | #include "source/extensions/filters/network/dubbo_proxy/router/router.h" |
17 | | #include "source/extensions/filters/network/dubbo_proxy/stats.h" |
18 | | |
19 | | #include "absl/types/optional.h" |
20 | | |
21 | | namespace Envoy { |
22 | | namespace Extensions { |
23 | | namespace NetworkFilters { |
24 | | namespace DubboProxy { |
25 | | |
26 | | class ConnectionManager; |
27 | | class ActiveMessage; |
28 | | |
29 | | class ActiveResponseDecoder : public ResponseDecoderCallbacks, |
30 | | public StreamHandler, |
31 | | Logger::Loggable<Logger::Id::dubbo> { |
32 | | public: |
33 | | ActiveResponseDecoder(ActiveMessage& parent, DubboFilterStats& stats, |
34 | | Network::Connection& connection, ProtocolPtr&& protocol); |
35 | 0 | ~ActiveResponseDecoder() override = default; |
36 | | |
37 | | DubboFilters::UpstreamResponseStatus onData(Buffer::Instance& data); |
38 | | |
39 | | // StreamHandler |
40 | | void onStreamDecoded(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) override; |
41 | | |
42 | | // ResponseDecoderCallbacks |
43 | 0 | StreamHandler& newStream() override { return *this; } |
44 | 0 | void onHeartbeat(MessageMetadataSharedPtr) override {} |
45 | | |
46 | 0 | uint64_t requestId() const { return metadata_ ? metadata_->requestId() : 0; } |
47 | | |
48 | | private: |
49 | | FilterStatus applyMessageEncodedFilters(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx); |
50 | | |
51 | | ActiveMessage& parent_; |
52 | | DubboFilterStats& stats_; |
53 | | Network::Connection& response_connection_; |
54 | | ProtocolPtr protocol_; |
55 | | ResponseDecoderPtr decoder_; |
56 | | MessageMetadataSharedPtr metadata_; |
57 | | bool complete_ : 1; |
58 | | DubboFilters::UpstreamResponseStatus response_status_{ |
59 | | DubboFilters::UpstreamResponseStatus::MoreData}; |
60 | | }; |
61 | | |
62 | | using ActiveResponseDecoderPtr = std::unique_ptr<ActiveResponseDecoder>; |
63 | | |
64 | | class ActiveMessageFilterBase : public virtual DubboFilters::FilterCallbacksBase { |
65 | | public: |
66 | | ActiveMessageFilterBase(ActiveMessage& parent, bool dual_filter) |
67 | 0 | : parent_(parent), dual_filter_(dual_filter) {} |
68 | 0 | ~ActiveMessageFilterBase() override = default; |
69 | | |
70 | | // DubboFilters::FilterCallbacksBase |
71 | | uint64_t requestId() const override; |
72 | | uint64_t streamId() const override; |
73 | | const Network::Connection* connection() const override; |
74 | | DubboProxy::Router::RouteConstSharedPtr route() override; |
75 | | SerializationType serializationType() const override; |
76 | | ProtocolType protocolType() const override; |
77 | | StreamInfo::StreamInfo& streamInfo() override; |
78 | | Event::Dispatcher& dispatcher() override; |
79 | | void resetStream() override; |
80 | | |
81 | | protected: |
82 | | ActiveMessage& parent_; |
83 | | const bool dual_filter_ : 1; |
84 | | }; |
85 | | |
86 | | // Wraps a DecoderFilter and acts as the DecoderFilterCallbacks for the filter, enabling filter |
87 | | // chain continuation. |
88 | | class ActiveMessageDecoderFilter : public DubboFilters::DecoderFilterCallbacks, |
89 | | public ActiveMessageFilterBase, |
90 | | public LinkedObject<ActiveMessageDecoderFilter>, |
91 | | Logger::Loggable<Logger::Id::dubbo> { |
92 | | public: |
93 | | ActiveMessageDecoderFilter(ActiveMessage& parent, DubboFilters::DecoderFilterSharedPtr filter, |
94 | | bool dual_filter); |
95 | 0 | ~ActiveMessageDecoderFilter() override = default; |
96 | | |
97 | | void continueDecoding() override; |
98 | | void sendLocalReply(const DubboFilters::DirectResponse& response, bool end_stream) override; |
99 | | void startUpstreamResponse() override; |
100 | | DubboFilters::UpstreamResponseStatus upstreamData(Buffer::Instance& buffer) override; |
101 | | void resetDownstreamConnection() override; |
102 | | |
103 | 0 | DubboFilters::DecoderFilterSharedPtr handler() { return handle_; } |
104 | | |
105 | | private: |
106 | | DubboFilters::DecoderFilterSharedPtr handle_; |
107 | | }; |
108 | | |
109 | | using ActiveMessageDecoderFilterPtr = std::unique_ptr<ActiveMessageDecoderFilter>; |
110 | | |
111 | | // Wraps a EncoderFilter and acts as the EncoderFilterCallbacks for the filter, enabling filter |
112 | | // chain continuation. |
113 | | class ActiveMessageEncoderFilter : public ActiveMessageFilterBase, |
114 | | public DubboFilters::EncoderFilterCallbacks, |
115 | | public LinkedObject<ActiveMessageEncoderFilter>, |
116 | | Logger::Loggable<Logger::Id::dubbo> { |
117 | | public: |
118 | | ActiveMessageEncoderFilter(ActiveMessage& parent, DubboFilters::EncoderFilterSharedPtr filter, |
119 | | bool dual_filter); |
120 | 0 | ~ActiveMessageEncoderFilter() override = default; |
121 | | |
122 | | void continueEncoding() override; |
123 | 0 | DubboFilters::EncoderFilterSharedPtr handler() { return handle_; } |
124 | | |
125 | | private: |
126 | | DubboFilters::EncoderFilterSharedPtr handle_; |
127 | | |
128 | | friend class ActiveMessage; |
129 | | }; |
130 | | |
131 | | using ActiveMessageEncoderFilterPtr = std::unique_ptr<ActiveMessageEncoderFilter>; |
132 | | |
133 | | // ActiveMessage tracks downstream requests for which no response has been received. |
134 | | class ActiveMessage : public LinkedObject<ActiveMessage>, |
135 | | public Event::DeferredDeletable, |
136 | | public StreamHandler, |
137 | | public DubboFilters::FilterChainFactoryCallbacks, |
138 | | Logger::Loggable<Logger::Id::dubbo> { |
139 | | public: |
140 | | ActiveMessage(ConnectionManager& parent); |
141 | | ~ActiveMessage() override; |
142 | | |
143 | | // Indicates which filter to start the iteration with. |
144 | | enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent }; |
145 | | |
146 | | // Returns the encoder filter to start iteration with. |
147 | | std::list<ActiveMessageEncoderFilterPtr>::iterator |
148 | | commonEncodePrefix(ActiveMessageEncoderFilter* filter, FilterIterationStartState state); |
149 | | // Returns the decoder filter to start iteration with. |
150 | | std::list<ActiveMessageDecoderFilterPtr>::iterator |
151 | | commonDecodePrefix(ActiveMessageDecoderFilter* filter, FilterIterationStartState state); |
152 | | |
153 | | // Dubbo::FilterChainFactoryCallbacks |
154 | | void addDecoderFilter(DubboFilters::DecoderFilterSharedPtr filter) override; |
155 | | void addEncoderFilter(DubboFilters::EncoderFilterSharedPtr filter) override; |
156 | | void addFilter(DubboFilters::CodecFilterSharedPtr filter) override; |
157 | | |
158 | | // StreamHandler |
159 | | void onStreamDecoded(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) override; |
160 | | |
161 | | uint64_t requestId() const; |
162 | | uint64_t streamId() const; |
163 | | const Network::Connection* connection() const; |
164 | | SerializationType serializationType() const; |
165 | | ProtocolType protocolType() const; |
166 | | StreamInfo::StreamInfo& streamInfo(); |
167 | | Router::RouteConstSharedPtr route(); |
168 | | void sendLocalReply(const DubboFilters::DirectResponse& response, bool end_stream); |
169 | | void startUpstreamResponse(); |
170 | | DubboFilters::UpstreamResponseStatus upstreamData(Buffer::Instance& buffer); |
171 | | void resetDownstreamConnection(); |
172 | | Event::Dispatcher& dispatcher(); |
173 | | void resetStream(); |
174 | | |
175 | | void createFilterChain(); |
176 | | FilterStatus applyDecoderFilters(ActiveMessageDecoderFilter* filter, |
177 | | FilterIterationStartState state); |
178 | | FilterStatus applyEncoderFilters(ActiveMessageEncoderFilter* filter, |
179 | | FilterIterationStartState state); |
180 | | void finalizeRequest(); |
181 | | void onReset(); |
182 | | void onError(const std::string& what); |
183 | 0 | MessageMetadataSharedPtr metadata() const { return metadata_; } |
184 | 0 | ContextSharedPtr context() const { return context_; } |
185 | 0 | bool pendingStreamDecoded() const { return pending_stream_decoded_; } |
186 | | |
187 | | private: |
188 | | void addDecoderFilterWorker(DubboFilters::DecoderFilterSharedPtr filter, bool dual_filter); |
189 | | void addEncoderFilterWorker(DubboFilters::EncoderFilterSharedPtr, bool dual_filter); |
190 | | |
191 | | ConnectionManager& parent_; |
192 | | |
193 | | ContextSharedPtr context_; |
194 | | MessageMetadataSharedPtr metadata_; |
195 | | Stats::TimespanPtr request_timer_; |
196 | | ActiveResponseDecoderPtr response_decoder_; |
197 | | |
198 | | absl::optional<Router::RouteConstSharedPtr> cached_route_; |
199 | | |
200 | | std::list<ActiveMessageDecoderFilterPtr> decoder_filters_; |
201 | | std::function<FilterStatus(DubboFilters::DecoderFilter*)> filter_action_; |
202 | | |
203 | | std::list<ActiveMessageEncoderFilterPtr> encoder_filters_; |
204 | | std::function<FilterStatus(DubboFilters::EncoderFilter*)> encoder_filter_action_; |
205 | | |
206 | | // This value is used in the calculation of the weighted cluster. |
207 | | uint64_t stream_id_; |
208 | | StreamInfo::StreamInfoImpl stream_info_; |
209 | | |
210 | | Buffer::OwnedImpl response_buffer_; |
211 | | |
212 | | bool pending_stream_decoded_ : 1; |
213 | | bool local_response_sent_ : 1; |
214 | | |
215 | | friend class ActiveResponseDecoder; |
216 | | }; |
217 | | |
218 | | using ActiveMessagePtr = std::unique_ptr<ActiveMessage>; |
219 | | |
220 | | } // namespace DubboProxy |
221 | | } // namespace NetworkFilters |
222 | | } // namespace Extensions |
223 | | } // namespace Envoy |