/proc/self/cwd/source/extensions/filters/network/thrift_proxy/router/router_impl.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <memory> |
4 | | #include <string> |
5 | | #include <vector> |
6 | | |
7 | | #include "envoy/extensions/filters/network/thrift_proxy/v3/route.pb.h" |
8 | | #include "envoy/router/router.h" |
9 | | #include "envoy/tcp/conn_pool.h" |
10 | | #include "envoy/upstream/load_balancer.h" |
11 | | |
12 | | #include "source/common/http/header_utility.h" |
13 | | #include "source/common/router/metadatamatchcriteria_impl.h" |
14 | | #include "source/common/upstream/load_balancer_impl.h" |
15 | | #include "source/extensions/filters/network/thrift_proxy/conn_manager.h" |
16 | | #include "source/extensions/filters/network/thrift_proxy/filters/filter.h" |
17 | | #include "source/extensions/filters/network/thrift_proxy/router/router.h" |
18 | | #include "source/extensions/filters/network/thrift_proxy/router/router_ratelimit_impl.h" |
19 | | #include "source/extensions/filters/network/thrift_proxy/router/upstream_request.h" |
20 | | #include "source/extensions/filters/network/thrift_proxy/thrift_object.h" |
21 | | |
22 | | #include "absl/types/optional.h" |
23 | | |
24 | | namespace Envoy { |
25 | | namespace Extensions { |
26 | | namespace NetworkFilters { |
27 | | namespace ThriftProxy { |
28 | | namespace Router { |
29 | | |
30 | | class RequestMirrorPolicyImpl : public RequestMirrorPolicy { |
31 | | public: |
32 | | RequestMirrorPolicyImpl(const std::string& cluster_name, const std::string& runtime_key, |
33 | | const envoy::type::v3::FractionalPercent& default_value) |
34 | 0 | : cluster_name_(cluster_name), runtime_key_(runtime_key), default_value_(default_value) {} |
35 | | |
36 | | // Router::RequestMirrorPolicy |
37 | 0 | const std::string& clusterName() const override { return cluster_name_; } |
38 | 0 | bool enabled(Runtime::Loader& runtime) const override { |
39 | 0 | return runtime_key_.empty() ? true |
40 | 0 | : runtime.snapshot().featureEnabled(runtime_key_, default_value_); |
41 | 0 | } |
42 | | |
43 | | private: |
44 | | const std::string cluster_name_; |
45 | | const std::string runtime_key_; |
46 | | const envoy::type::v3::FractionalPercent default_value_; |
47 | | }; |
48 | | |
49 | | class RouteEntryImplBase : public RouteEntry, |
50 | | public Route, |
51 | | public std::enable_shared_from_this<RouteEntryImplBase> { |
52 | | public: |
53 | | RouteEntryImplBase(const envoy::extensions::filters::network::thrift_proxy::v3::Route& route); |
54 | | |
55 | | void validateClusters(const Upstream::ClusterManager::ClusterInfoMaps& cluster_info_maps) const; |
56 | | // Router::RouteEntry |
57 | | const std::string& clusterName() const override; |
58 | 0 | const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() const override { |
59 | 0 | return metadata_match_criteria_.get(); |
60 | 0 | } |
61 | 0 | const RateLimitPolicy& rateLimitPolicy() const override { return rate_limit_policy_; } |
62 | 0 | bool stripServiceName() const override { return strip_service_name_; }; |
63 | 0 | const Http::LowerCaseString& clusterHeader() const override { return cluster_header_; } |
64 | 0 | const std::vector<std::shared_ptr<RequestMirrorPolicy>>& requestMirrorPolicies() const override { |
65 | 0 | return mirror_policies_; |
66 | 0 | } |
67 | | |
68 | | // Router::Route |
69 | | const RouteEntry* routeEntry() const override; |
70 | | |
71 | | virtual RouteConstSharedPtr matches(const MessageMetadata& metadata, |
72 | | uint64_t random_value) const PURE; |
73 | | |
74 | | protected: |
75 | | RouteConstSharedPtr clusterEntry(uint64_t random_value, const MessageMetadata& metadata) const; |
76 | | bool headersMatch(const Http::HeaderMap& headers) const; |
77 | | |
78 | | private: |
79 | | class WeightedClusterEntry : public RouteEntry, public Route { |
80 | | public: |
81 | | WeightedClusterEntry( |
82 | | const RouteEntryImplBase& parent, |
83 | | const envoy::extensions::filters::network::thrift_proxy::v3::WeightedCluster::ClusterWeight& |
84 | | cluster); |
85 | | |
86 | 0 | uint64_t clusterWeight() const { return cluster_weight_; } |
87 | | |
88 | | // Router::RouteEntry |
89 | 0 | const std::string& clusterName() const override { return cluster_name_; } |
90 | 0 | const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() const override { |
91 | 0 | if (metadata_match_criteria_) { |
92 | 0 | return metadata_match_criteria_.get(); |
93 | 0 | } |
94 | | |
95 | 0 | return parent_.metadataMatchCriteria(); |
96 | 0 | } |
97 | 0 | const RateLimitPolicy& rateLimitPolicy() const override { return parent_.rateLimitPolicy(); } |
98 | 0 | bool stripServiceName() const override { return parent_.stripServiceName(); } |
99 | 0 | const Http::LowerCaseString& clusterHeader() const override { |
100 | | // Weighted cluster entries don't have a cluster header based on proto. |
101 | 0 | ASSERT(parent_.clusterHeader().get().empty()); |
102 | 0 | return parent_.clusterHeader(); |
103 | 0 | } |
104 | | const std::vector<std::shared_ptr<RequestMirrorPolicy>>& |
105 | 0 | requestMirrorPolicies() const override { |
106 | 0 | return parent_.requestMirrorPolicies(); |
107 | 0 | } |
108 | | |
109 | | // Router::Route |
110 | 0 | const RouteEntry* routeEntry() const override { return this; } |
111 | | |
112 | | private: |
113 | | const RouteEntryImplBase& parent_; |
114 | | const std::string cluster_name_; |
115 | | const uint64_t cluster_weight_; |
116 | | Envoy::Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_; |
117 | | }; |
118 | | using WeightedClusterEntrySharedPtr = std::shared_ptr<WeightedClusterEntry>; |
119 | | |
120 | | class DynamicRouteEntry : public RouteEntry, public Route { |
121 | | public: |
122 | | DynamicRouteEntry(const RouteEntryImplBase& parent, absl::string_view cluster_name) |
123 | 0 | : parent_(parent), cluster_name_(std::string(cluster_name)) {} |
124 | | |
125 | | // Router::RouteEntry |
126 | 0 | const std::string& clusterName() const override { return cluster_name_; } |
127 | 0 | const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() const override { |
128 | 0 | return parent_.metadataMatchCriteria(); |
129 | 0 | } |
130 | 0 | const RateLimitPolicy& rateLimitPolicy() const override { return parent_.rateLimitPolicy(); } |
131 | 0 | bool stripServiceName() const override { return parent_.stripServiceName(); } |
132 | 0 | const Http::LowerCaseString& clusterHeader() const override { return parent_.clusterHeader(); } |
133 | | const std::vector<std::shared_ptr<RequestMirrorPolicy>>& |
134 | 0 | requestMirrorPolicies() const override { |
135 | 0 | return parent_.requestMirrorPolicies(); |
136 | 0 | } |
137 | | |
138 | | // Router::Route |
139 | 0 | const RouteEntry* routeEntry() const override { return this; } |
140 | | |
141 | | private: |
142 | | const RouteEntryImplBase& parent_; |
143 | | const std::string cluster_name_; |
144 | | }; |
145 | | |
146 | | static std::vector<std::shared_ptr<RequestMirrorPolicy>> buildMirrorPolicies( |
147 | | const envoy::extensions::filters::network::thrift_proxy::v3::RouteAction& route); |
148 | | |
149 | | const std::string cluster_name_; |
150 | | const std::vector<Http::HeaderUtility::HeaderDataPtr> config_headers_; |
151 | | std::vector<WeightedClusterEntrySharedPtr> weighted_clusters_; |
152 | | uint64_t total_cluster_weight_; |
153 | | Envoy::Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_; |
154 | | const RateLimitPolicyImpl rate_limit_policy_; |
155 | | const bool strip_service_name_; |
156 | | const Http::LowerCaseString cluster_header_; |
157 | | const std::vector<std::shared_ptr<RequestMirrorPolicy>> mirror_policies_; |
158 | | }; |
159 | | |
160 | | using RouteEntryImplBaseConstSharedPtr = std::shared_ptr<const RouteEntryImplBase>; |
161 | | |
162 | | class MethodNameRouteEntryImpl : public RouteEntryImplBase { |
163 | | public: |
164 | | MethodNameRouteEntryImpl( |
165 | | const envoy::extensions::filters::network::thrift_proxy::v3::Route& route); |
166 | | |
167 | | // RouteEntryImplBase |
168 | | RouteConstSharedPtr matches(const MessageMetadata& metadata, |
169 | | uint64_t random_value) const override; |
170 | | |
171 | | private: |
172 | | const std::string method_name_; |
173 | | const bool invert_; |
174 | | }; |
175 | | |
176 | | class ServiceNameRouteEntryImpl : public RouteEntryImplBase { |
177 | | public: |
178 | | ServiceNameRouteEntryImpl( |
179 | | const envoy::extensions::filters::network::thrift_proxy::v3::Route& route); |
180 | | |
181 | | // RouteEntryImplBase |
182 | | RouteConstSharedPtr matches(const MessageMetadata& metadata, |
183 | | uint64_t random_value) const override; |
184 | | |
185 | | private: |
186 | | std::string service_name_; |
187 | | const bool invert_; |
188 | | }; |
189 | | |
190 | | class RouteMatcher { |
191 | | public: |
192 | | // validation_clusters = absl::nullopt means that clusters are not validated. |
193 | | RouteMatcher( |
194 | | const envoy::extensions::filters::network::thrift_proxy::v3::RouteConfiguration& config, |
195 | | const absl::optional<Upstream::ClusterManager::ClusterInfoMaps>& validation_clusters); |
196 | | |
197 | | RouteConstSharedPtr route(const MessageMetadata& metadata, uint64_t random_value) const; |
198 | | |
199 | | private: |
200 | | std::vector<RouteEntryImplBaseConstSharedPtr> routes_; |
201 | | }; |
202 | | |
203 | | // Adapter from DecoderFilterCallbacks to UpstreamResponseCallbacks. |
204 | | class UpstreamResponseCallbacksImpl : public UpstreamResponseCallbacks { |
205 | | public: |
206 | | UpstreamResponseCallbacksImpl(ThriftFilters::DecoderFilterCallbacks* callbacks) |
207 | 0 | : callbacks_(callbacks) {} |
208 | | |
209 | 0 | void startUpstreamResponse(Transport& transport, Protocol& protocol) override { |
210 | 0 | callbacks_->startUpstreamResponse(transport, protocol); |
211 | 0 | } |
212 | 0 | ThriftFilters::ResponseStatus upstreamData(Buffer::Instance& buffer) override { |
213 | 0 | callbacks_->streamInfo().addBytesSent(buffer.length()); |
214 | 0 | return callbacks_->upstreamData(buffer); |
215 | 0 | } |
216 | 0 | MessageMetadataSharedPtr responseMetadata() override { return callbacks_->responseMetadata(); } |
217 | 0 | bool responseSuccess() override { return callbacks_->responseSuccess(); } |
218 | | |
219 | | private: |
220 | | ThriftFilters::DecoderFilterCallbacks* callbacks_{}; |
221 | | }; |
222 | | |
223 | | class Router : public Tcp::ConnectionPool::UpstreamCallbacks, |
224 | | public Upstream::LoadBalancerContextBase, |
225 | | public RequestOwner, |
226 | | public ThriftFilters::DecoderFilter { |
227 | | public: |
228 | | Router(Upstream::ClusterManager& cluster_manager, const RouterStats& stats, |
229 | | Runtime::Loader& runtime, ShadowWriter& shadow_writer, bool close_downstream_on_error) |
230 | | : RequestOwner(cluster_manager, stats), passthrough_supported_(false), runtime_(runtime), |
231 | 0 | shadow_writer_(shadow_writer), close_downstream_on_error_(close_downstream_on_error) {} Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::Router::Router::Router(Envoy::Upstream::ClusterManager&, Envoy::Extensions::NetworkFilters::ThriftProxy::Router::RouterStats const&, Envoy::Runtime::Loader&, Envoy::Extensions::NetworkFilters::ThriftProxy::Router::ShadowWriter&, bool) Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::Router::Router::Router(Envoy::Upstream::ClusterManager&, Envoy::Extensions::NetworkFilters::ThriftProxy::Router::RouterStats const&, Envoy::Runtime::Loader&, Envoy::Extensions::NetworkFilters::ThriftProxy::Router::ShadowWriter&, bool) |
232 | | |
233 | 0 | ~Router() override = default; |
234 | | |
235 | | // ThriftFilters::DecoderFilter |
236 | | void onDestroy() override; |
237 | | void setDecoderFilterCallbacks(ThriftFilters::DecoderFilterCallbacks& callbacks) override; |
238 | 0 | bool passthroughSupported() const override { return passthrough_supported_; } |
239 | | |
240 | | // RequestOwner |
241 | 0 | Tcp::ConnectionPool::UpstreamCallbacks& upstreamCallbacks() override { |
242 | 0 | ASSERT(callbacks_ != nullptr); |
243 | 0 | ASSERT(upstream_request_ != nullptr); |
244 | | |
245 | 0 | auto upstream_info = std::make_shared<StreamInfo::UpstreamInfoImpl>(); |
246 | 0 | upstream_info->setUpstreamHost(upstream_request_->upstream_host_); |
247 | 0 | callbacks_->streamInfo().setUpstreamInfo(std::move(upstream_info)); |
248 | |
|
249 | 0 | return *this; |
250 | 0 | } |
251 | 0 | Buffer::OwnedImpl& buffer() override { return upstream_request_buffer_; } |
252 | 0 | Event::Dispatcher& dispatcher() override { return callbacks_->dispatcher(); } |
253 | 0 | void addSize(uint64_t size) override { request_size_ += size; } |
254 | 0 | void continueDecoding() override { callbacks_->continueDecoding(); } |
255 | 0 | void resetDownstreamConnection() override { callbacks_->resetDownstreamConnection(); } |
256 | 0 | void sendLocalReply(const ThriftProxy::DirectResponse& response, bool end_stream) override { |
257 | 0 | callbacks_->sendLocalReply(response, end_stream); |
258 | 0 | } |
259 | 0 | void onReset() override { callbacks_->onReset(); } |
260 | | |
261 | | // RequestOwner::ProtocolConverter |
262 | | FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override; |
263 | | FilterStatus transportEnd() override; |
264 | | FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override; |
265 | | FilterStatus messageEnd() override; |
266 | | FilterStatus passthroughData(Buffer::Instance& data) override; |
267 | | FilterStatus structBegin(absl::string_view name) override; |
268 | | FilterStatus structEnd() override; |
269 | | FilterStatus fieldBegin(absl::string_view name, FieldType& field_type, |
270 | | int16_t& field_id) override; |
271 | | FilterStatus fieldEnd() override; |
272 | | FilterStatus boolValue(bool& value) override; |
273 | | FilterStatus byteValue(uint8_t& value) override; |
274 | | FilterStatus int16Value(int16_t& value) override; |
275 | | FilterStatus int32Value(int32_t& value) override; |
276 | | FilterStatus int64Value(int64_t& value) override; |
277 | | FilterStatus doubleValue(double& value) override; |
278 | | FilterStatus stringValue(absl::string_view value) override; |
279 | | FilterStatus mapBegin(FieldType& key_type, FieldType& value_type, uint32_t& size) override; |
280 | | FilterStatus mapEnd() override; |
281 | | FilterStatus listBegin(FieldType& elem_type, uint32_t& size) override; |
282 | | FilterStatus listEnd() override; |
283 | | FilterStatus setBegin(FieldType& elem_type, uint32_t& size) override; |
284 | | FilterStatus setEnd() override; |
285 | | |
286 | | // Upstream::LoadBalancerContext |
287 | | const Network::Connection* downstreamConnection() const override; |
288 | 0 | const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() override { |
289 | 0 | const Envoy::Router::MetadataMatchCriteria* route_criteria = |
290 | 0 | (route_entry_ != nullptr) ? route_entry_->metadataMatchCriteria() : nullptr; |
291 | | |
292 | | // Support getting metadata match criteria from thrift request. |
293 | 0 | const auto& request_metadata = callbacks_->streamInfo().dynamicMetadata().filter_metadata(); |
294 | 0 | const auto filter_it = request_metadata.find(Envoy::Config::MetadataFilters::get().ENVOY_LB); |
295 | |
|
296 | 0 | if (filter_it == request_metadata.end()) { |
297 | 0 | return route_criteria; |
298 | 0 | } |
299 | | |
300 | 0 | if (route_criteria != nullptr) { |
301 | 0 | metadata_match_criteria_ = route_criteria->mergeMatchCriteria(filter_it->second); |
302 | 0 | } else { |
303 | 0 | metadata_match_criteria_ = |
304 | 0 | std::make_unique<Envoy::Router::MetadataMatchCriteriaImpl>(filter_it->second); |
305 | 0 | } |
306 | |
|
307 | 0 | return metadata_match_criteria_.get(); |
308 | 0 | } |
309 | | |
310 | | // Tcp::ConnectionPool::UpstreamCallbacks |
311 | | void onUpstreamData(Buffer::Instance& data, bool end_stream) override; |
312 | | void onEvent(Network::ConnectionEvent event) override; |
313 | 0 | void onAboveWriteBufferHighWatermark() override {} |
314 | 0 | void onBelowWriteBufferLowWatermark() override {} |
315 | | |
316 | | private: |
317 | | void cleanup(); |
318 | | |
319 | | ThriftFilters::DecoderFilterCallbacks* callbacks_{}; |
320 | | std::unique_ptr<UpstreamResponseCallbacksImpl> upstream_response_callbacks_{}; |
321 | | RouteConstSharedPtr route_{}; |
322 | | const RouteEntry* route_entry_{}; |
323 | | Envoy::Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_; |
324 | | |
325 | | std::unique_ptr<UpstreamRequest> upstream_request_; |
326 | | Buffer::OwnedImpl upstream_request_buffer_; |
327 | | |
328 | | bool passthrough_supported_ : 1; |
329 | | uint64_t request_size_{}; |
330 | | Runtime::Loader& runtime_; |
331 | | ShadowWriter& shadow_writer_; |
332 | | std::vector<std::reference_wrapper<ShadowRouterHandle>> shadow_routers_{}; |
333 | | |
334 | | bool close_downstream_on_error_; |
335 | | }; |
336 | | |
337 | | } // namespace Router |
338 | | } // namespace ThriftProxy |
339 | | } // namespace NetworkFilters |
340 | | } // namespace Extensions |
341 | | } // namespace Envoy |