/proc/self/cwd/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <memory> |
4 | | |
5 | | #include "envoy/event/dispatcher.h" |
6 | | #include "envoy/router/router.h" |
7 | | #include "envoy/tcp/conn_pool.h" |
8 | | #include "envoy/upstream/load_balancer.h" |
9 | | |
10 | | #include "source/common/common/linked_object.h" |
11 | | #include "source/common/common/logger.h" |
12 | | #include "source/common/upstream/load_balancer_impl.h" |
13 | | #include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h" |
14 | | #include "source/extensions/filters/network/thrift_proxy/conn_manager.h" |
15 | | #include "source/extensions/filters/network/thrift_proxy/router/router.h" |
16 | | #include "source/extensions/filters/network/thrift_proxy/router/upstream_request.h" |
17 | | |
18 | | namespace Envoy { |
19 | | namespace Extensions { |
20 | | namespace NetworkFilters { |
21 | | namespace ThriftProxy { |
22 | | namespace Router { |
23 | | |
24 | | struct NullResponseDecoder : public DecoderCallbacks, public ProtocolConverter { |
25 | | NullResponseDecoder(Transport& transport, Protocol& protocol) |
26 | 0 | : decoder_(std::make_unique<Decoder>(transport, protocol, *this)) { |
27 | 0 | initProtocolConverter(protocol, response_buffer_); |
28 | 0 | } Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::Router::NullResponseDecoder::NullResponseDecoder(Envoy::Extensions::NetworkFilters::ThriftProxy::Transport&, Envoy::Extensions::NetworkFilters::ThriftProxy::Protocol&) Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::Router::NullResponseDecoder::NullResponseDecoder(Envoy::Extensions::NetworkFilters::ThriftProxy::Transport&, Envoy::Extensions::NetworkFilters::ThriftProxy::Protocol&) |
29 | | |
30 | 0 | virtual ThriftFilters::ResponseStatus upstreamData(Buffer::Instance& data) { |
31 | 0 | upstream_buffer_.move(data); |
32 | |
|
33 | 0 | bool underflow = false; |
34 | 0 | TRY_NEEDS_AUDIT { underflow = onData(); } |
35 | 0 | END_TRY catch (const AppException&) { return ThriftFilters::ResponseStatus::Reset; } |
36 | 0 | catch (const EnvoyException&) { |
37 | 0 | return ThriftFilters::ResponseStatus::Reset; |
38 | 0 | } |
39 | | |
40 | 0 | ASSERT(complete_ || underflow); |
41 | 0 | return complete_ ? ThriftFilters::ResponseStatus::Complete |
42 | 0 | : ThriftFilters::ResponseStatus::MoreData; |
43 | 0 | } |
44 | 0 | virtual bool onData() { |
45 | 0 | bool underflow = false; |
46 | 0 | decoder_->onData(upstream_buffer_, underflow); |
47 | 0 | return underflow; |
48 | 0 | } |
49 | 0 | MessageMetadataSharedPtr& responseMetadata() { |
50 | 0 | ASSERT(metadata_ != nullptr); |
51 | 0 | return metadata_; |
52 | 0 | } |
53 | 0 | bool responseSuccess() { return success_.value_or(false); } |
54 | | |
55 | | // ProtocolConverter |
56 | 0 | FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override { |
57 | 0 | metadata_ = metadata; |
58 | 0 | if (metadata_->hasReplyType()) { |
59 | 0 | success_ = metadata_->replyType() == ReplyType::Success; |
60 | 0 | } |
61 | 0 | return FilterStatus::Continue; |
62 | 0 | } |
63 | 0 | FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override { |
64 | 0 | UNREFERENCED_PARAMETER(metadata); |
65 | 0 | return FilterStatus::Continue; |
66 | 0 | } |
67 | 0 | FilterStatus transportEnd() override { |
68 | 0 | ASSERT(metadata_ != nullptr); |
69 | 0 | complete_ = true; |
70 | 0 | return FilterStatus::Continue; |
71 | 0 | } |
72 | | |
73 | | // DecoderCallbacks |
74 | 0 | DecoderEventHandler& newDecoderEventHandler() override { return *this; } |
75 | 0 | bool passthroughEnabled() const override { return true; } |
76 | 0 | bool isRequest() const override { return false; } |
77 | 0 | bool headerKeysPreserveCase() const override { return false; } |
78 | | |
79 | | DecoderPtr decoder_; |
80 | | Buffer::OwnedImpl response_buffer_; |
81 | | Buffer::OwnedImpl upstream_buffer_; |
82 | | MessageMetadataSharedPtr metadata_; |
83 | | absl::optional<bool> success_; |
84 | | bool complete_ : 1; |
85 | | }; |
86 | | using NullResponseDecoderPtr = std::unique_ptr<NullResponseDecoder>; |
87 | | |
88 | | // Adapter from NullResponseDecoder to UpstreamResponseCallbacks. |
89 | | class ShadowUpstreamResponseCallbacksImpl : public UpstreamResponseCallbacks { |
90 | | public: |
91 | | ShadowUpstreamResponseCallbacksImpl(NullResponseDecoder& response_decoder) |
92 | 0 | : response_decoder_(response_decoder) {} |
93 | | |
94 | 0 | void startUpstreamResponse(Transport&, Protocol&) override {} |
95 | 0 | ThriftFilters::ResponseStatus upstreamData(Buffer::Instance& buffer) override { |
96 | 0 | return response_decoder_.upstreamData(buffer); |
97 | 0 | } |
98 | 0 | MessageMetadataSharedPtr responseMetadata() override { |
99 | 0 | return response_decoder_.responseMetadata(); |
100 | 0 | } |
101 | 0 | bool responseSuccess() override { return response_decoder_.responseSuccess(); } |
102 | | |
103 | | private: |
104 | | NullResponseDecoder& response_decoder_; |
105 | | }; |
106 | | using ShadowUpstreamResponseCallbacksImplPtr = std::unique_ptr<ShadowUpstreamResponseCallbacksImpl>; |
107 | | |
108 | | class ShadowWriterImpl; |
109 | | |
110 | | class ShadowRouterImpl : public ShadowRouterHandle, |
111 | | public RequestOwner, |
112 | | public Tcp::ConnectionPool::UpstreamCallbacks, |
113 | | public Upstream::LoadBalancerContextBase, |
114 | | public Event::DeferredDeletable, |
115 | | public LinkedObject<ShadowRouterImpl> { |
116 | | public: |
117 | | ShadowRouterImpl(ShadowWriterImpl& parent, const std::string& cluster_name, |
118 | | MessageMetadataSharedPtr& metadata, TransportType transport_type, |
119 | | ProtocolType protocol_type); |
120 | 0 | ~ShadowRouterImpl() override = default; |
121 | | |
122 | | bool createUpstreamRequest(); |
123 | | void maybeCleanup(); |
124 | 0 | void resetStream() { |
125 | 0 | if (upstream_request_ != nullptr) { |
126 | 0 | upstream_request_->releaseConnection(true); |
127 | 0 | } |
128 | 0 | } |
129 | | |
130 | | // ShadowRouterHandle |
131 | | void onRouterDestroy() override; |
132 | | bool waitingForConnection() const override; |
133 | 0 | RequestOwner& requestOwner() override { return *this; } |
134 | | |
135 | | // RequestOwner |
136 | 0 | Tcp::ConnectionPool::UpstreamCallbacks& upstreamCallbacks() override { return *this; } |
137 | 0 | Buffer::OwnedImpl& buffer() override { return upstream_request_buffer_; } |
138 | | Event::Dispatcher& dispatcher() override; |
139 | 0 | void addSize(uint64_t size) override { request_size_ += size; } |
140 | 0 | void continueDecoding() override { flushPendingCallbacks(); } |
141 | 0 | void resetDownstreamConnection() override {} |
142 | 0 | void sendLocalReply(const ThriftProxy::DirectResponse&, bool) override {} |
143 | | |
144 | | // RequestOwner::ProtocolConverter |
145 | 0 | FilterStatus transportBegin(MessageMetadataSharedPtr) override { return FilterStatus::Continue; } |
146 | 0 | FilterStatus transportEnd() override { return FilterStatus::Continue; } |
147 | | FilterStatus messageEnd() override; |
148 | | FilterStatus passthroughData(Buffer::Instance& data) override; |
149 | | FilterStatus structBegin(absl::string_view name) override; |
150 | | FilterStatus structEnd() override; |
151 | | FilterStatus fieldBegin(absl::string_view name, FieldType& field_type, |
152 | | int16_t& field_id) override; |
153 | | FilterStatus fieldEnd() override; |
154 | | FilterStatus boolValue(bool& value) override; |
155 | | FilterStatus byteValue(uint8_t& value) override; |
156 | | FilterStatus int16Value(int16_t& value) override; |
157 | | FilterStatus int32Value(int32_t& value) override; |
158 | | FilterStatus int64Value(int64_t& value) override; |
159 | | FilterStatus doubleValue(double& value) override; |
160 | | FilterStatus stringValue(absl::string_view value) override; |
161 | | FilterStatus mapBegin(FieldType& key_type, FieldType& value_type, uint32_t& size) override; |
162 | | FilterStatus mapEnd() override; |
163 | | FilterStatus listBegin(FieldType& elem_type, uint32_t& size) override; |
164 | | FilterStatus listEnd() override; |
165 | | FilterStatus setBegin(FieldType& elem_type, uint32_t& size) override; |
166 | | FilterStatus setEnd() override; |
167 | | |
168 | | // Tcp::ConnectionPool::UpstreamCallbacks |
169 | | void onUpstreamData(Buffer::Instance& data, bool end_stream) override; |
170 | | void onEvent(Network::ConnectionEvent event) override; |
171 | 0 | void onAboveWriteBufferHighWatermark() override {} |
172 | 0 | void onBelowWriteBufferLowWatermark() override {} |
173 | | |
174 | | // Upstream::LoadBalancerContextBase |
175 | 0 | const Network::Connection* downstreamConnection() const override { return nullptr; } |
176 | 0 | const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() override { return nullptr; } |
177 | | |
178 | | // Event::DeferredDeletable |
179 | 0 | void deleteIsPending() override { deferred_deleting_ = true; } |
180 | | |
181 | | private: |
182 | | friend class ShadowWriterTest; |
183 | | using ConverterCallback = std::function<FilterStatus()>; |
184 | | |
185 | | void writeRequest(); |
186 | | bool requestInProgress(); |
187 | | bool requestStarted() const; |
188 | | void flushPendingCallbacks(); |
189 | | FilterStatus runOrSave(std::function<FilterStatus()>&& cb, |
190 | | const std::function<void()>& on_save = {}); |
191 | | |
192 | | ShadowWriterImpl& parent_; |
193 | | const std::string cluster_name_; |
194 | | MessageMetadataSharedPtr metadata_; |
195 | | const TransportType transport_type_; |
196 | | const ProtocolType protocol_type_; |
197 | | TransportPtr transport_; |
198 | | ProtocolPtr protocol_; |
199 | | NullResponseDecoderPtr response_decoder_; |
200 | | ShadowUpstreamResponseCallbacksImplPtr upstream_response_callbacks_; |
201 | | bool router_destroyed_{}; |
202 | | bool request_sent_{}; |
203 | | Buffer::OwnedImpl upstream_request_buffer_; |
204 | | std::unique_ptr<UpstreamRequest> upstream_request_; |
205 | | uint64_t request_size_{}; |
206 | | uint64_t response_size_{}; |
207 | | bool request_ready_ : 1; |
208 | | |
209 | | std::list<ConverterCallback> pending_callbacks_; |
210 | | bool removed_{}; |
211 | | bool deferred_deleting_{}; |
212 | | }; |
213 | | |
214 | | class ActiveRouters : public ThreadLocal::ThreadLocalObject { |
215 | | public: |
216 | 0 | ActiveRouters(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} |
217 | 0 | ~ActiveRouters() override { |
218 | 0 | while (!active_routers_.empty()) { |
219 | 0 | auto& router = active_routers_.front(); |
220 | 0 | router->resetStream(); |
221 | 0 | remove(*router); |
222 | 0 | } |
223 | 0 | } |
224 | | |
225 | 0 | std::list<std::unique_ptr<ShadowRouterImpl>>& activeRouters() { return active_routers_; } |
226 | | |
227 | 0 | void remove(ShadowRouterImpl& router) { |
228 | 0 | dispatcher_.deferredDelete(router.removeFromList(active_routers_)); |
229 | 0 | } |
230 | | |
231 | | private: |
232 | | Event::Dispatcher& dispatcher_; |
233 | | std::list<std::unique_ptr<ShadowRouterImpl>> active_routers_; |
234 | | }; |
235 | | |
236 | | class ShadowWriterImpl : public ShadowWriter, Logger::Loggable<Logger::Id::thrift> { |
237 | | public: |
238 | | ShadowWriterImpl(Upstream::ClusterManager& cm, const RouterStats& stats, |
239 | | Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator& tls) |
240 | 0 | : cm_(cm), stats_(stats), dispatcher_(dispatcher), tls_(tls) { |
241 | 0 | tls_.set( |
242 | 0 | [](Event::Dispatcher& dispatcher) { return std::make_shared<ActiveRouters>(dispatcher); }); |
243 | 0 | } |
244 | | |
245 | 0 | ~ShadowWriterImpl() override = default; |
246 | | |
247 | 0 | void remove(ShadowRouterImpl& router) { |
248 | | // While router is destroyed but the request is in progress. The cleanup |
249 | | // is possibly deferred to tls shutdown, thus leading us to check if |
250 | | // the storage is valid. |
251 | 0 | if (tls_.get().has_value()) { |
252 | 0 | tls_->remove(router); |
253 | 0 | } |
254 | 0 | } |
255 | 0 | const RouterStats& stats() { return stats_; } |
256 | | |
257 | | // Router::ShadowWriter |
258 | 0 | Upstream::ClusterManager& clusterManager() override { return cm_; } |
259 | 0 | Event::Dispatcher& dispatcher() override { return dispatcher_; } |
260 | | OptRef<ShadowRouterHandle> submit(const std::string& cluster_name, |
261 | | MessageMetadataSharedPtr metadata, |
262 | | TransportType original_transport, |
263 | | ProtocolType original_protocol) override; |
264 | | |
265 | | private: |
266 | | friend class ShadowRouterImpl; |
267 | | friend class ShadowWriterTest; |
268 | | |
269 | | Upstream::ClusterManager& cm_; |
270 | | const RouterStats& stats_; |
271 | | Event::Dispatcher& dispatcher_; |
272 | | ThreadLocal::TypedSlot<ActiveRouters> tls_; |
273 | | }; |
274 | | |
275 | | } // namespace Router |
276 | | } // namespace ThriftProxy |
277 | | } // namespace NetworkFilters |
278 | | } // namespace Extensions |
279 | | } // namespace Envoy |