/proc/self/cwd/source/extensions/filters/network/thrift_proxy/router/router.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <memory> |
4 | | #include <string> |
5 | | #include <vector> |
6 | | |
7 | | #include "envoy/buffer/buffer.h" |
8 | | #include "envoy/common/optref.h" |
9 | | #include "envoy/local_info/local_info.h" |
10 | | #include "envoy/rds/config.h" |
11 | | #include "envoy/router/router.h" |
12 | | #include "envoy/tcp/conn_pool.h" |
13 | | |
14 | | #include "source/common/buffer/buffer_impl.h" |
15 | | #include "source/common/common/logger.h" |
16 | | #include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h" |
17 | | #include "source/extensions/filters/network/thrift_proxy/metadata.h" |
18 | | #include "source/extensions/filters/network/thrift_proxy/protocol_converter.h" |
19 | | #include "source/extensions/filters/network/thrift_proxy/protocol_options_config.h" |
20 | | #include "source/extensions/filters/network/well_known_names.h" |
21 | | |
22 | | namespace Envoy { |
23 | | namespace Extensions { |
24 | | namespace NetworkFilters { |
25 | | namespace ThriftProxy { |
26 | | namespace Router { |
27 | | |
28 | | class RateLimitPolicy; |
29 | | class RequestMirrorPolicy; |
30 | | |
31 | | /** |
32 | | * RouteEntry is an individual resolved route entry. |
33 | | */ |
34 | | class RouteEntry { |
35 | | public: |
36 | 0 | virtual ~RouteEntry() = default; |
37 | | |
38 | | /** |
39 | | * @return const std::string& the upstream cluster that owns the route. |
40 | | */ |
41 | | virtual const std::string& clusterName() const PURE; |
42 | | |
43 | | /** |
44 | | * @return MetadataMatchCriteria* the metadata that a subset load balancer should match when |
45 | | * selecting an upstream host |
46 | | */ |
47 | | virtual const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() const PURE; |
48 | | |
49 | | /** |
50 | | * @return const RateLimitPolicy& the rate limit policy for the route. |
51 | | */ |
52 | | virtual const RateLimitPolicy& rateLimitPolicy() const PURE; |
53 | | |
54 | | /** |
55 | | * @return bool should the service name prefix be stripped from the method. |
56 | | */ |
57 | | virtual bool stripServiceName() const PURE; |
58 | | |
59 | | /** |
60 | | * @return const Http::LowerCaseString& the header used to determine the cluster. |
61 | | */ |
62 | | virtual const Http::LowerCaseString& clusterHeader() const PURE; |
63 | | |
64 | | /** |
65 | | * @return const std::vector<RequestMirrorPolicy>& the mirror policies associated with this route, |
66 | | * if any. |
67 | | */ |
68 | | virtual const std::vector<std::shared_ptr<RequestMirrorPolicy>>& |
69 | | requestMirrorPolicies() const PURE; |
70 | | }; |
71 | | |
72 | | /** |
73 | | * Route holds the RouteEntry for a request. |
74 | | */ |
75 | | class Route { |
76 | | public: |
77 | 0 | virtual ~Route() = default; |
78 | | |
79 | | /** |
80 | | * @return the route entry or nullptr if there is no matching route for the request. |
81 | | */ |
82 | | virtual const RouteEntry* routeEntry() const PURE; |
83 | | }; |
84 | | |
85 | | using RouteConstSharedPtr = std::shared_ptr<const Route>; |
86 | | |
87 | | /** |
88 | | * The router configuration. |
89 | | */ |
90 | | class Config : public Rds::Config { |
91 | | public: |
92 | | /** |
93 | | * Based on the incoming Thrift request transport and/or protocol data, determine the target |
94 | | * route for the request. |
95 | | * @param metadata MessageMetadata for the message to route |
96 | | * @param random_value uint64_t used to select cluster affinity |
97 | | * @return the route or nullptr if there is no matching route for the request. |
98 | | */ |
99 | | virtual RouteConstSharedPtr route(const MessageMetadata& metadata, |
100 | | uint64_t random_value) const PURE; |
101 | | }; |
102 | | |
103 | | using ConfigConstSharedPtr = std::shared_ptr<const Config>; |
104 | | |
105 | | #define ALL_THRIFT_ROUTER_STATS(COUNTER, GAUGE, HISTOGRAM) \ |
106 | 0 | COUNTER(route_missing) \ |
107 | 0 | COUNTER(unknown_cluster) \ |
108 | 0 | COUNTER(upstream_rq_maintenance_mode) \ |
109 | 0 | COUNTER(no_healthy_upstream) \ |
110 | 0 | COUNTER(shadow_request_submit_failure) |
111 | | |
112 | | /** |
113 | | * Struct containing named stats for the router. |
114 | | */ |
115 | | struct RouterNamedStats { |
116 | | ALL_THRIFT_ROUTER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, GENERATE_HISTOGRAM_STRUCT) |
117 | | |
118 | 0 | static RouterNamedStats generateStats(const std::string& prefix, Stats::Scope& scope) { |
119 | 0 | return RouterNamedStats{ALL_THRIFT_ROUTER_STATS(POOL_COUNTER_PREFIX(scope, prefix), |
120 | 0 | POOL_GAUGE_PREFIX(scope, prefix), |
121 | 0 | POOL_HISTOGRAM_PREFIX(scope, prefix))}; |
122 | 0 | } |
123 | | }; |
124 | | |
125 | | /** |
126 | | * Stats for use in the router. |
127 | | */ |
128 | | class RouterStats { |
129 | | public: |
130 | | RouterStats(const std::string& stat_prefix, Stats::Scope& scope, |
131 | | const LocalInfo::LocalInfo& local_info) |
132 | | : named_(RouterNamedStats::generateStats(stat_prefix, scope)), |
133 | | stat_name_set_(scope.symbolTable().makeSet("thrift_proxy")), |
134 | | symbol_table_(scope.symbolTable()), |
135 | | upstream_rq_call_(stat_name_set_->add("thrift.upstream_rq_call")), |
136 | | upstream_rq_oneway_(stat_name_set_->add("thrift.upstream_rq_oneway")), |
137 | | upstream_rq_invalid_type_(stat_name_set_->add("thrift.upstream_rq_invalid_type")), |
138 | | upstream_resp_reply_(stat_name_set_->add("thrift.upstream_resp_reply")), |
139 | | upstream_resp_reply_success_(stat_name_set_->add("thrift.upstream_resp_success")), |
140 | | upstream_resp_reply_error_(stat_name_set_->add("thrift.upstream_resp_error")), |
141 | | upstream_resp_exception_(stat_name_set_->add("thrift.upstream_resp_exception")), |
142 | | upstream_resp_exception_local_(stat_name_set_->add("thrift.upstream_resp_exception_local")), |
143 | | upstream_resp_exception_remote_( |
144 | | stat_name_set_->add("thrift.upstream_resp_exception_remote")), |
145 | | upstream_resp_invalid_type_(stat_name_set_->add("thrift.upstream_resp_invalid_type")), |
146 | | upstream_resp_decoding_error_(stat_name_set_->add("thrift.upstream_resp_decoding_error")), |
147 | | upstream_rq_time_(stat_name_set_->add("thrift.upstream_rq_time")), |
148 | | upstream_rq_size_(stat_name_set_->add("thrift.upstream_rq_size")), |
149 | | upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")), |
150 | | zone_(stat_name_set_->add("zone")), local_zone_name_(local_info.zoneStatName()), |
151 | | upstream_cx_drain_close_(stat_name_set_->add("thrift.upstream_cx_drain_close")), |
152 | | downstream_cx_partial_response_close_( |
153 | | stat_name_set_->add("thrift.downstream_cx_partial_response_close")), |
154 | | downstream_cx_underflow_response_close_( |
155 | 0 | stat_name_set_->add("thrift.downstream_cx_underflow_response_close")) {} |
156 | | |
157 | | /** |
158 | | * Increment counter for request calls. |
159 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
160 | | */ |
161 | 0 | void incRequestCall(const Upstream::ClusterInfo& cluster) const { |
162 | 0 | incClusterScopeCounter(cluster, nullptr, upstream_rq_call_); |
163 | 0 | } |
164 | | |
165 | | /** |
166 | | * Increment counter for requests that are one way only. |
167 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
168 | | */ |
169 | 0 | void incRequestOneWay(const Upstream::ClusterInfo& cluster) const { |
170 | 0 | incClusterScopeCounter(cluster, nullptr, upstream_rq_oneway_); |
171 | 0 | } |
172 | | |
173 | | /** |
174 | | * Increment counter for requests that are invalid. |
175 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
176 | | */ |
177 | 0 | void incRequestInvalid(const Upstream::ClusterInfo& cluster) const { |
178 | 0 | incClusterScopeCounter(cluster, nullptr, upstream_rq_invalid_type_); |
179 | 0 | } |
180 | | |
181 | | /** |
182 | | * Increment counter for connections that were closed due to draining. |
183 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
184 | | */ |
185 | 0 | void incCloseDrain(const Upstream::ClusterInfo& cluster) const { |
186 | 0 | incClusterScopeCounter(cluster, nullptr, upstream_cx_drain_close_); |
187 | 0 | } |
188 | | |
189 | | /** |
190 | | * Increment counter for downstream connections that were closed due to partial responses. |
191 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
192 | | */ |
193 | 0 | void incClosePartialResponse(const Upstream::ClusterInfo& cluster) const { |
194 | 0 | incClusterScopeCounter(cluster, nullptr, downstream_cx_partial_response_close_); |
195 | 0 | } |
196 | | |
197 | | /** |
198 | | * Increment counter for downstream connections that were closed due to underflow responses. |
199 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
200 | | */ |
201 | 0 | void incCloseUnderflowResponse(const Upstream::ClusterInfo& cluster) const { |
202 | 0 | incClusterScopeCounter(cluster, nullptr, downstream_cx_underflow_response_close_); |
203 | 0 | } |
204 | | |
205 | | /** |
206 | | * Increment counter for received responses that are replies that are successful. |
207 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
208 | | * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host |
209 | | */ |
210 | | void incResponseReplySuccess(const Upstream::ClusterInfo& cluster, |
211 | 0 | Upstream::HostDescriptionConstSharedPtr upstream_host) const { |
212 | 0 | incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_); |
213 | 0 | incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_success_); |
214 | 0 | ASSERT(upstream_host != nullptr); |
215 | 0 | upstream_host->stats().rq_success_.inc(); |
216 | 0 | } |
217 | | |
218 | | /** |
219 | | * Increment counter for received responses that are replies that are an error. |
220 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
221 | | * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host |
222 | | */ |
223 | | void incResponseReplyError(const Upstream::ClusterInfo& cluster, |
224 | 0 | Upstream::HostDescriptionConstSharedPtr upstream_host) const { |
225 | 0 | incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_); |
226 | 0 | incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_error_); |
227 | 0 | ASSERT(upstream_host != nullptr); |
228 | | // Currently IDL exceptions are always considered endpoint error but it's possible for an error |
229 | | // to have semantics matching HTTP 4xx, rather than 5xx. rq_error classification chosen |
230 | | // here to match outlier detection external failure in upstream_request.cc. |
231 | 0 | upstream_host->stats().rq_error_.inc(); |
232 | 0 | } |
233 | | |
234 | | /** |
235 | | * Increment counter for received remote responses that are exceptions. |
236 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
237 | | * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host |
238 | | */ |
239 | | void incResponseRemoteException(const Upstream::ClusterInfo& cluster, |
240 | 0 | Upstream::HostDescriptionConstSharedPtr upstream_host) const { |
241 | 0 | incClusterScopeCounter(cluster, upstream_host, upstream_resp_exception_); |
242 | 0 | ASSERT(upstream_host != nullptr); |
243 | 0 | incClusterScopeCounter(cluster, nullptr, upstream_resp_exception_remote_); |
244 | 0 | upstream_host->stats().rq_error_.inc(); |
245 | 0 | } |
246 | | |
247 | | /** |
248 | | * Increment counter for responses that are local exceptions, without forwarding a request |
249 | | * upstream. |
250 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
251 | | */ |
252 | 0 | void incResponseLocalException(const Upstream::ClusterInfo& cluster) const { |
253 | 0 | incClusterScopeCounter(cluster, nullptr, upstream_resp_exception_); |
254 | 0 | incClusterScopeCounter(cluster, nullptr, upstream_resp_exception_local_); |
255 | 0 | } |
256 | | |
257 | | /** |
258 | | * Increment counter for received responses that are invalid. |
259 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
260 | | * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host |
261 | | */ |
262 | | void incResponseInvalidType(const Upstream::ClusterInfo& cluster, |
263 | 0 | Upstream::HostDescriptionConstSharedPtr upstream_host) const { |
264 | 0 | incClusterScopeCounter(cluster, upstream_host, upstream_resp_invalid_type_); |
265 | 0 | ASSERT(upstream_host != nullptr); |
266 | 0 | upstream_host->stats().rq_error_.inc(); |
267 | 0 | } |
268 | | |
269 | | /** |
270 | | * Increment counter for decoding errors during responses. |
271 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
272 | | * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host |
273 | | */ |
274 | | void incResponseDecodingError(const Upstream::ClusterInfo& cluster, |
275 | 0 | Upstream::HostDescriptionConstSharedPtr upstream_host) const { |
276 | 0 | incClusterScopeCounter(cluster, upstream_host, upstream_resp_decoding_error_); |
277 | 0 | ASSERT(upstream_host != nullptr); |
278 | 0 | upstream_host->stats().rq_error_.inc(); |
279 | 0 | } |
280 | | |
281 | | /** |
282 | | * Record a value for the request size histogram. |
283 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
284 | | * @param value uint64_t size in bytes of the full request |
285 | | */ |
286 | 0 | void recordUpstreamRequestSize(const Upstream::ClusterInfo& cluster, uint64_t value) const { |
287 | 0 | recordClusterScopeHistogram(cluster, nullptr, upstream_rq_size_, Stats::Histogram::Unit::Bytes, |
288 | 0 | value); |
289 | 0 | } |
290 | | |
291 | | /** |
292 | | * Record a value for the response size histogram. |
293 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
294 | | * @param value uint64_t size in bytes of the full response |
295 | | */ |
296 | 0 | void recordUpstreamResponseSize(const Upstream::ClusterInfo& cluster, uint64_t value) const { |
297 | 0 | recordClusterScopeHistogram(cluster, nullptr, upstream_resp_size_, |
298 | 0 | Stats::Histogram::Unit::Bytes, value); |
299 | 0 | } |
300 | | |
301 | | /** |
302 | | * Record a value for the response time duration histogram. |
303 | | * @param cluster Upstream::ClusterInfo& describing the upstream cluster |
304 | | * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host |
305 | | * @param value uint64_t duration in milliseconds to receive the complete response |
306 | | */ |
307 | | void recordUpstreamResponseTime(const Upstream::ClusterInfo& cluster, |
308 | | Upstream::HostDescriptionConstSharedPtr upstream_host, |
309 | 0 | uint64_t value) const { |
310 | 0 | recordClusterScopeHistogram(cluster, upstream_host, upstream_rq_time_, |
311 | 0 | Stats::Histogram::Unit::Milliseconds, value); |
312 | 0 | } |
313 | | |
314 | | const RouterNamedStats named_; |
315 | | |
316 | | private: |
317 | | void incClusterScopeCounter(const Upstream::ClusterInfo& cluster, |
318 | | Upstream::HostDescriptionConstSharedPtr upstream_host, |
319 | 0 | const Stats::StatName& stat_name) const { |
320 | 0 | const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join({stat_name}); |
321 | 0 | cluster.statsScope().counterFromStatName(Stats::StatName(stat_name_storage.get())).inc(); |
322 | 0 | const Stats::SymbolTable::StoragePtr zone_stat_name_storage = |
323 | 0 | upstreamZoneStatName(upstream_host, stat_name); |
324 | 0 | if (zone_stat_name_storage) { |
325 | 0 | cluster.statsScope().counterFromStatName(Stats::StatName(zone_stat_name_storage.get())).inc(); |
326 | 0 | } |
327 | 0 | } |
328 | | |
329 | | void recordClusterScopeHistogram(const Upstream::ClusterInfo& cluster, |
330 | | Upstream::HostDescriptionConstSharedPtr upstream_host, |
331 | | const Stats::StatName& stat_name, Stats::Histogram::Unit unit, |
332 | 0 | uint64_t value) const { |
333 | 0 | const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join({stat_name}); |
334 | 0 | cluster.statsScope() |
335 | 0 | .histogramFromStatName(Stats::StatName(stat_name_storage.get()), unit) |
336 | 0 | .recordValue(value); |
337 | 0 | const Stats::SymbolTable::StoragePtr zone_stat_name_storage = |
338 | 0 | upstreamZoneStatName(upstream_host, stat_name); |
339 | 0 | if (zone_stat_name_storage) { |
340 | 0 | cluster.statsScope() |
341 | 0 | .histogramFromStatName(Stats::StatName(zone_stat_name_storage.get()), unit) |
342 | 0 | .recordValue(value); |
343 | 0 | } |
344 | 0 | } |
345 | | |
346 | | Stats::SymbolTable::StoragePtr |
347 | | upstreamZoneStatName(Upstream::HostDescriptionConstSharedPtr upstream_host, |
348 | 0 | const Stats::StatName& stat_name) const { |
349 | 0 | if (!upstream_host || local_zone_name_.empty()) { |
350 | 0 | return nullptr; |
351 | 0 | } |
352 | 0 | const auto& upstream_zone_name = upstream_host->localityZoneStatName(); |
353 | 0 | if (upstream_zone_name.empty()) { |
354 | 0 | return nullptr; |
355 | 0 | } |
356 | 0 | return symbol_table_.join({zone_, local_zone_name_, upstream_zone_name, stat_name}); |
357 | 0 | } |
358 | | |
359 | | Stats::StatNameSetPtr stat_name_set_; |
360 | | Stats::SymbolTable& symbol_table_; |
361 | | const Stats::StatName upstream_rq_call_; |
362 | | const Stats::StatName upstream_rq_oneway_; |
363 | | const Stats::StatName upstream_rq_invalid_type_; |
364 | | const Stats::StatName upstream_resp_reply_; |
365 | | const Stats::StatName upstream_resp_reply_success_; |
366 | | const Stats::StatName upstream_resp_reply_error_; |
367 | | const Stats::StatName upstream_resp_exception_; |
368 | | const Stats::StatName upstream_resp_exception_local_; |
369 | | const Stats::StatName upstream_resp_exception_remote_; |
370 | | const Stats::StatName upstream_resp_invalid_type_; |
371 | | const Stats::StatName upstream_resp_decoding_error_; |
372 | | const Stats::StatName upstream_rq_time_; |
373 | | const Stats::StatName upstream_rq_size_; |
374 | | const Stats::StatName upstream_resp_size_; |
375 | | const Stats::StatName zone_; |
376 | | const Stats::StatName local_zone_name_; |
377 | | const Stats::StatName upstream_cx_drain_close_; |
378 | | const Stats::StatName downstream_cx_partial_response_close_; |
379 | | const Stats::StatName downstream_cx_underflow_response_close_; |
380 | | }; |
381 | | |
382 | | /** |
383 | | * This interface is used by an upstream request to communicate its state. |
384 | | */ |
385 | | class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::Id::thrift> { |
386 | | public: |
387 | | RequestOwner(Upstream::ClusterManager& cluster_manager, const RouterStats& stats) |
388 | 0 | : cluster_manager_(cluster_manager), stats_(stats) {} |
389 | 0 | ~RequestOwner() override = default; |
390 | | |
391 | | /** |
392 | | * @return ConnectionPool::UpstreamCallbacks& the handler for upstream data. |
393 | | */ |
394 | | virtual Tcp::ConnectionPool::UpstreamCallbacks& upstreamCallbacks() PURE; |
395 | | |
396 | | /** |
397 | | * @return Buffer::OwnedImpl& the buffer used to serialize the upstream request. |
398 | | */ |
399 | | virtual Buffer::OwnedImpl& buffer() PURE; |
400 | | |
401 | | /** |
402 | | * @return Event::Dispatcher& the dispatcher used for timers, etc. |
403 | | */ |
404 | | virtual Event::Dispatcher& dispatcher() PURE; |
405 | | |
406 | | /** |
407 | | * Converts message begin into the right protocol. |
408 | | */ |
409 | 0 | void convertMessageBegin(MessageMetadataSharedPtr metadata) { |
410 | 0 | ProtocolConverter::messageBegin(metadata); |
411 | 0 | } |
412 | | |
413 | | /** |
414 | | * Used to update the request size every time bytes are pushed out. |
415 | | * |
416 | | * @param size uint64_t the value of the increment. |
417 | | */ |
418 | | virtual void addSize(uint64_t size) PURE; |
419 | | |
420 | | /** |
421 | | * Used to continue decoding if it was previously stopped. |
422 | | */ |
423 | | virtual void continueDecoding() PURE; |
424 | | |
425 | | /** |
426 | | * Used to reset the downstream connection after an error. |
427 | | */ |
428 | | virtual void resetDownstreamConnection() PURE; |
429 | | |
430 | | /** |
431 | | * Sends a locally generated response using the provided response object. |
432 | | * |
433 | | * @param response DirectResponse the response to send to the downstream client |
434 | | * @param end_stream if true, the downstream connection should be closed after this response |
435 | | */ |
436 | | virtual void sendLocalReply(const ThriftProxy::DirectResponse& response, bool end_stream) PURE; |
437 | | |
438 | | /** |
439 | | * @return Upstream::ClusterManager& the cluster manager. |
440 | | */ |
441 | 0 | Upstream::ClusterManager& clusterManager() { return cluster_manager_; } |
442 | | |
443 | | /** |
444 | | * @return Upstream::Cluster& the upstream cluster associated with the request. |
445 | | */ |
446 | 0 | const Upstream::ClusterInfo& cluster() const { return *cluster_; } |
447 | | |
448 | | /** |
449 | | * @return RouterStats the common router stats. |
450 | | */ |
451 | 0 | const RouterStats& stats() { return stats_; } |
452 | | |
453 | 0 | virtual void onReset() {} |
454 | | |
455 | | protected: |
456 | | struct UpstreamRequestInfo { |
457 | | bool passthrough_supported; |
458 | | TransportType transport; |
459 | | ProtocolType protocol; |
460 | | absl::optional<Upstream::TcpPoolData> conn_pool_data; |
461 | | }; |
462 | | |
463 | | struct PrepareUpstreamRequestResult { |
464 | | absl::optional<AppException> exception; |
465 | | absl::optional<UpstreamRequestInfo> upstream_request_info; |
466 | | }; |
467 | | |
468 | | PrepareUpstreamRequestResult prepareUpstreamRequest(const std::string& cluster_name, |
469 | | MessageMetadataSharedPtr& metadata, |
470 | | TransportType transport, |
471 | | ProtocolType protocol, |
472 | 0 | Upstream::LoadBalancerContext* lb_context) { |
473 | 0 | Upstream::ThreadLocalCluster* cluster = clusterManager().getThreadLocalCluster(cluster_name); |
474 | 0 | if (!cluster) { |
475 | 0 | ENVOY_LOG(debug, "unknown cluster '{}'", cluster_name); |
476 | 0 | stats().named_.unknown_cluster_.inc(); |
477 | 0 | return {AppException(AppExceptionType::InternalError, |
478 | 0 | fmt::format("unknown cluster '{}'", cluster_name)), |
479 | 0 | absl::nullopt}; |
480 | 0 | } |
481 | | |
482 | 0 | cluster_ = cluster->info(); |
483 | 0 | ENVOY_LOG(debug, "cluster '{}' match for method '{}'", cluster_name, metadata->methodName()); |
484 | |
|
485 | 0 | switch (metadata->messageType()) { |
486 | 0 | case MessageType::Call: |
487 | 0 | stats().incRequestCall(*cluster_); |
488 | 0 | break; |
489 | | |
490 | 0 | case MessageType::Oneway: |
491 | 0 | stats().incRequestOneWay(*cluster_); |
492 | 0 | break; |
493 | | |
494 | 0 | default: |
495 | 0 | stats().incRequestInvalid(*cluster_); |
496 | 0 | break; |
497 | 0 | } |
498 | | |
499 | 0 | if (cluster_->maintenanceMode()) { |
500 | 0 | stats().named_.upstream_rq_maintenance_mode_.inc(); |
501 | 0 | if (metadata->messageType() == MessageType::Call) { |
502 | 0 | stats().incResponseLocalException(*cluster_); |
503 | 0 | } |
504 | 0 | return {AppException(AppExceptionType::InternalError, |
505 | 0 | fmt::format("maintenance mode for cluster '{}'", cluster_name)), |
506 | 0 | absl::nullopt}; |
507 | 0 | } |
508 | | |
509 | 0 | const std::shared_ptr<const ProtocolOptionsConfig> options = |
510 | 0 | cluster_->extensionProtocolOptionsTyped<ProtocolOptionsConfig>( |
511 | 0 | NetworkFilterNames::get().ThriftProxy); |
512 | |
|
513 | 0 | const TransportType final_transport = options ? options->transport(transport) : transport; |
514 | 0 | ASSERT(final_transport != TransportType::Auto); |
515 | | |
516 | 0 | const ProtocolType final_protocol = options ? options->protocol(protocol) : protocol; |
517 | 0 | ASSERT(final_protocol != ProtocolType::Auto); |
518 | | |
519 | 0 | auto conn_pool_data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, lb_context); |
520 | 0 | if (!conn_pool_data) { |
521 | 0 | stats().named_.no_healthy_upstream_.inc(); |
522 | 0 | if (metadata->messageType() == MessageType::Call) { |
523 | 0 | stats().incResponseLocalException(*cluster_); |
524 | 0 | } |
525 | 0 | return {AppException(AppExceptionType::InternalError, |
526 | 0 | fmt::format("no healthy upstream for '{}'", cluster_name)), |
527 | 0 | absl::nullopt}; |
528 | 0 | } |
529 | | |
530 | 0 | const auto passthrough_supported = |
531 | 0 | (transport == TransportType::Framed || transport == TransportType::Header) && |
532 | 0 | (final_transport == TransportType::Framed || final_transport == TransportType::Header) && |
533 | 0 | protocol == final_protocol && final_protocol != ProtocolType::Twitter; |
534 | 0 | UpstreamRequestInfo result = {passthrough_supported, final_transport, final_protocol, |
535 | 0 | conn_pool_data}; |
536 | 0 | return {absl::nullopt, result}; |
537 | 0 | } |
538 | | |
539 | | Upstream::ClusterInfoConstSharedPtr cluster_; |
540 | | |
541 | | private: |
542 | | Upstream::ClusterManager& cluster_manager_; |
543 | | const RouterStats& stats_; |
544 | | }; |
545 | | |
546 | | /** |
547 | | * RequestMirrorPolicy is an individual mirroring rule for a route entry. |
548 | | */ |
549 | | class RequestMirrorPolicy { |
550 | | public: |
551 | 0 | virtual ~RequestMirrorPolicy() = default; |
552 | | |
553 | | /** |
554 | | * @return const std::string& the upstream cluster that should be used for the mirrored request. |
555 | | */ |
556 | | virtual const std::string& clusterName() const PURE; |
557 | | |
558 | | /** |
559 | | * @return bool whether this policy is currently enabled. |
560 | | */ |
561 | | virtual bool enabled(Runtime::Loader& runtime) const PURE; |
562 | | }; |
563 | | |
564 | | /** |
565 | | * ShadowRouterHandle is used to write a request or release a connection early if needed. |
566 | | */ |
567 | | class ShadowRouterHandle { |
568 | | public: |
569 | 0 | virtual ~ShadowRouterHandle() = default; |
570 | | |
571 | | /** |
572 | | * Called after the Router is destroyed. |
573 | | */ |
574 | | virtual void onRouterDestroy() PURE; |
575 | | |
576 | | /** |
577 | | * Checks if the request is currently waiting for an upstream connection to become available. |
578 | | */ |
579 | | virtual bool waitingForConnection() const PURE; |
580 | | |
581 | | /** |
582 | | * @return RequestOwner& the interface associated with this ShadowRouter. |
583 | | */ |
584 | | virtual RequestOwner& requestOwner() PURE; |
585 | | }; |
586 | | |
587 | | /** |
588 | | * ShadowWriter is used for submitting requests and ignoring the response. |
589 | | */ |
590 | | class ShadowWriter { |
591 | | public: |
592 | 0 | virtual ~ShadowWriter() = default; |
593 | | |
594 | | /** |
595 | | * @return Upstream::ClusterManager& the cluster manager. |
596 | | */ |
597 | | virtual Upstream::ClusterManager& clusterManager() PURE; |
598 | | |
599 | | /** |
600 | | * @return Dispatcher& the dispatcher. |
601 | | */ |
602 | | virtual Event::Dispatcher& dispatcher() PURE; |
603 | | |
604 | | /** |
605 | | * Starts the shadow request by requesting an upstream connection. |
606 | | */ |
607 | | virtual OptRef<ShadowRouterHandle> submit(const std::string& cluster_name, |
608 | | MessageMetadataSharedPtr metadata, |
609 | | TransportType original_transport, |
610 | | ProtocolType original_protocol) PURE; |
611 | | }; |
612 | | |
613 | | } // namespace Router |
614 | | } // namespace ThriftProxy |
615 | | } // namespace NetworkFilters |
616 | | } // namespace Extensions |
617 | | } // namespace Envoy |