/proc/self/cwd/source/common/router/router.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/common/router/router.h" |
2 | | |
3 | | #include <algorithm> |
4 | | #include <chrono> |
5 | | #include <cstdint> |
6 | | #include <functional> |
7 | | #include <memory> |
8 | | #include <string> |
9 | | |
10 | | #include "envoy/event/dispatcher.h" |
11 | | #include "envoy/event/timer.h" |
12 | | #include "envoy/grpc/status.h" |
13 | | #include "envoy/http/conn_pool.h" |
14 | | #include "envoy/runtime/runtime.h" |
15 | | #include "envoy/upstream/cluster_manager.h" |
16 | | #include "envoy/upstream/health_check_host_monitor.h" |
17 | | #include "envoy/upstream/upstream.h" |
18 | | |
19 | | #include "source/common/common/assert.h" |
20 | | #include "source/common/common/cleanup.h" |
21 | | #include "source/common/common/empty_string.h" |
22 | | #include "source/common/common/enum_to_int.h" |
23 | | #include "source/common/common/scope_tracker.h" |
24 | | #include "source/common/common/utility.h" |
25 | | #include "source/common/config/utility.h" |
26 | | #include "source/common/grpc/common.h" |
27 | | #include "source/common/http/codes.h" |
28 | | #include "source/common/http/header_map_impl.h" |
29 | | #include "source/common/http/headers.h" |
30 | | #include "source/common/http/message_impl.h" |
31 | | #include "source/common/http/utility.h" |
32 | | #include "source/common/network/application_protocol.h" |
33 | | #include "source/common/network/socket_option_factory.h" |
34 | | #include "source/common/network/transport_socket_options_impl.h" |
35 | | #include "source/common/network/upstream_server_name.h" |
36 | | #include "source/common/network/upstream_socket_options_filter_state.h" |
37 | | #include "source/common/network/upstream_subject_alt_names.h" |
38 | | #include "source/common/orca/orca_load_metrics.h" |
39 | | #include "source/common/orca/orca_parser.h" |
40 | | #include "source/common/router/config_impl.h" |
41 | | #include "source/common/router/debug_config.h" |
42 | | #include "source/common/router/retry_state_impl.h" |
43 | | #include "source/common/runtime/runtime_features.h" |
44 | | #include "source/common/stream_info/uint32_accessor_impl.h" |
45 | | #include "source/common/tracing/http_tracer_impl.h" |
46 | | |
47 | | namespace Envoy { |
48 | | namespace Router { |
49 | | namespace { |
50 | | constexpr char NumInternalRedirectsFilterStateName[] = "num_internal_redirects"; |
51 | | |
52 | 0 | uint32_t getLength(const Buffer::Instance* instance) { return instance ? instance->length() : 0; } |
53 | | |
54 | | bool schemeIsHttp(const Http::RequestHeaderMap& downstream_headers, |
55 | 0 | OptRef<const Network::Connection> connection) { |
56 | 0 | if (Http::Utility::schemeIsHttp(downstream_headers.getSchemeValue())) { |
57 | 0 | return true; |
58 | 0 | } |
59 | 0 | if (connection.has_value() && !connection->ssl()) { |
60 | 0 | return true; |
61 | 0 | } |
62 | 0 | return false; |
63 | 0 | } |
64 | | |
65 | | constexpr uint64_t TimeoutPrecisionFactor = 100; |
66 | | |
67 | | } // namespace |
68 | | |
69 | | FilterConfig::FilterConfig(Stats::StatName stat_prefix, |
70 | | Server::Configuration::FactoryContext& context, |
71 | | ShadowWriterPtr&& shadow_writer, |
72 | | const envoy::extensions::filters::http::router::v3::Router& config) |
73 | | : FilterConfig( |
74 | | context.serverFactoryContext(), stat_prefix, context.serverFactoryContext().localInfo(), |
75 | | context.scope(), context.serverFactoryContext().clusterManager(), |
76 | | context.serverFactoryContext().runtime(), |
77 | | context.serverFactoryContext().api().randomGenerator(), std::move(shadow_writer), |
78 | | PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, dynamic_stats, true), config.start_child_span(), |
79 | | config.suppress_envoy_headers(), config.respect_expected_rq_timeout(), |
80 | | config.suppress_grpc_request_failure_code_stats(), |
81 | | config.has_upstream_log_options() |
82 | | ? config.upstream_log_options().flush_upstream_log_on_upstream_stream() |
83 | | : false, |
84 | | config.strict_check_headers(), context.serverFactoryContext().api().timeSource(), |
85 | | context.serverFactoryContext().httpContext(), |
86 | 2.06k | context.serverFactoryContext().routerContext()) { |
87 | 2.06k | for (const auto& upstream_log : config.upstream_log()) { |
88 | 12 | upstream_logs_.push_back(AccessLog::AccessLogFactory::fromProto(upstream_log, context)); |
89 | 12 | } |
90 | | |
91 | 2.06k | if (config.has_upstream_log_options() && |
92 | 2.06k | config.upstream_log_options().has_upstream_log_flush_interval()) { |
93 | 0 | upstream_log_flush_interval_ = std::chrono::milliseconds(DurationUtil::durationToMilliseconds( |
94 | 0 | config.upstream_log_options().upstream_log_flush_interval())); |
95 | 0 | } |
96 | | |
97 | 2.06k | if (config.upstream_http_filters_size() > 0) { |
98 | 2 | auto& server_factory_ctx = context.serverFactoryContext(); |
99 | 2 | const Http::FilterChainUtility::FiltersList& upstream_http_filters = |
100 | 2 | config.upstream_http_filters(); |
101 | 2 | std::shared_ptr<Http::UpstreamFilterConfigProviderManager> filter_config_provider_manager = |
102 | 2 | Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager( |
103 | 2 | server_factory_ctx); |
104 | 2 | std::string prefix = context.scope().symbolTable().toString(context.scope().prefix()); |
105 | 2 | upstream_ctx_ = std::make_unique<Upstream::UpstreamFactoryContextImpl>( |
106 | 2 | server_factory_ctx, context.initManager(), context.scope()); |
107 | 2 | Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext, |
108 | 2 | Server::Configuration::UpstreamHttpFilterConfigFactory> |
109 | 2 | helper(*filter_config_provider_manager, server_factory_ctx, |
110 | 2 | context.serverFactoryContext().clusterManager(), *upstream_ctx_, prefix); |
111 | 2 | THROW_IF_NOT_OK(helper.processFilters(upstream_http_filters, "router upstream http", |
112 | 2 | "router upstream http", upstream_http_filter_factories_)); |
113 | 2 | } |
114 | 2.06k | } |
115 | | |
116 | | // Express percentage as [0, TimeoutPrecisionFactor] because stats do not accept floating point |
117 | | // values, and getting multiple significant figures on the histogram would be nice. |
118 | | uint64_t FilterUtility::percentageOfTimeout(const std::chrono::milliseconds response_time, |
119 | 5 | const std::chrono::milliseconds timeout) { |
120 | | // Timeouts of 0 are considered infinite. Any portion of an infinite timeout used is still |
121 | | // none of it. |
122 | 5 | if (timeout.count() == 0) { |
123 | 5 | return 0; |
124 | 5 | } |
125 | | |
126 | 0 | return static_cast<uint64_t>(response_time.count() * TimeoutPrecisionFactor / timeout.count()); |
127 | 5 | } |
128 | | |
129 | | void FilterUtility::setUpstreamScheme(Http::RequestHeaderMap& headers, bool downstream_ssl, |
130 | 24.2k | bool upstream_ssl, bool use_upstream) { |
131 | 24.2k | if (use_upstream) { |
132 | 22.3k | if (upstream_ssl) { |
133 | 0 | headers.setReferenceScheme(Http::Headers::get().SchemeValues.Https); |
134 | 22.3k | } else { |
135 | 22.3k | headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http); |
136 | 22.3k | } |
137 | 22.3k | return; |
138 | 22.3k | } |
139 | | |
140 | 1.81k | if (Http::Utility::schemeIsValid(headers.getSchemeValue())) { |
141 | 987 | return; |
142 | 987 | } |
143 | | // After all the changes in https://github.com/envoyproxy/envoy/issues/14587 |
144 | | // this path should only occur if a buggy filter has removed the :scheme |
145 | | // header. In that case best-effort set from X-Forwarded-Proto. |
146 | 829 | absl::string_view xfp = headers.getForwardedProtoValue(); |
147 | 829 | if (Http::Utility::schemeIsValid(xfp)) { |
148 | 0 | headers.setScheme(xfp); |
149 | 0 | return; |
150 | 0 | } |
151 | | |
152 | 829 | if (downstream_ssl) { |
153 | 0 | headers.setReferenceScheme(Http::Headers::get().SchemeValues.Https); |
154 | 829 | } else { |
155 | 829 | headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http); |
156 | 829 | } |
157 | 829 | } |
158 | | |
159 | | bool FilterUtility::shouldShadow(const ShadowPolicy& policy, Runtime::Loader& runtime, |
160 | 0 | uint64_t stable_random) { |
161 | | |
162 | | // The policy's default value is set correctly regardless of whether there is a runtime key |
163 | | // or not, thus this call is sufficient for all cases (100% if no runtime set, otherwise |
164 | | // using the default value within the runtime fractional percent setting). |
165 | 0 | return runtime.snapshot().featureEnabled(policy.runtimeKey(), policy.defaultValue(), |
166 | 0 | stable_random); |
167 | 0 | } |
168 | | |
169 | | TimeoutData FilterUtility::finalTimeout(const RouteEntry& route, |
170 | | Http::RequestHeaderMap& request_headers, |
171 | | bool insert_envoy_expected_request_timeout_ms, |
172 | | bool grpc_request, bool per_try_timeout_hedging_enabled, |
173 | 1.81k | bool respect_expected_rq_timeout) { |
174 | | // See if there is a user supplied timeout in a request header. If there is we take that. |
175 | | // Otherwise if the request is gRPC and a maximum gRPC timeout is configured we use the timeout |
176 | | // in the gRPC headers (or infinity when gRPC headers have no timeout), but cap that timeout to |
177 | | // the configured maximum gRPC timeout (which may also be infinity, represented by a 0 value), |
178 | | // or the default from the route config otherwise. |
179 | 1.81k | TimeoutData timeout; |
180 | 1.81k | if (!route.usingNewTimeouts()) { |
181 | 1.81k | if (grpc_request && route.maxGrpcTimeout()) { |
182 | 0 | const std::chrono::milliseconds max_grpc_timeout = route.maxGrpcTimeout().value(); |
183 | 0 | auto header_timeout = Grpc::Common::getGrpcTimeout(request_headers); |
184 | 0 | std::chrono::milliseconds grpc_timeout = |
185 | 0 | header_timeout ? header_timeout.value() : std::chrono::milliseconds(0); |
186 | 0 | if (route.grpcTimeoutOffset()) { |
187 | | // We only apply the offset if it won't result in grpc_timeout hitting 0 or below, as |
188 | | // setting it to 0 means infinity and a negative timeout makes no sense. |
189 | 0 | const auto offset = *route.grpcTimeoutOffset(); |
190 | 0 | if (offset < grpc_timeout) { |
191 | 0 | grpc_timeout -= offset; |
192 | 0 | } |
193 | 0 | } |
194 | | |
195 | | // Cap gRPC timeout to the configured maximum considering that 0 means infinity. |
196 | 0 | if (max_grpc_timeout != std::chrono::milliseconds(0) && |
197 | 0 | (grpc_timeout == std::chrono::milliseconds(0) || grpc_timeout > max_grpc_timeout)) { |
198 | 0 | grpc_timeout = max_grpc_timeout; |
199 | 0 | } |
200 | 0 | timeout.global_timeout_ = grpc_timeout; |
201 | 1.81k | } else { |
202 | 1.81k | timeout.global_timeout_ = route.timeout(); |
203 | 1.81k | } |
204 | 1.81k | } |
205 | 1.81k | timeout.per_try_timeout_ = route.retryPolicy().perTryTimeout(); |
206 | 1.81k | timeout.per_try_idle_timeout_ = route.retryPolicy().perTryIdleTimeout(); |
207 | | |
208 | 1.81k | uint64_t header_timeout; |
209 | | |
210 | 1.81k | if (respect_expected_rq_timeout) { |
211 | | // Check if there is timeout set by egress Envoy. |
212 | | // If present, use that value as route timeout and don't override |
213 | | // *x-envoy-expected-rq-timeout-ms* header. At this point *x-envoy-upstream-rq-timeout-ms* |
214 | | // header should have been sanitized by egress Envoy. |
215 | 0 | const Http::HeaderEntry* header_expected_timeout_entry = |
216 | 0 | request_headers.EnvoyExpectedRequestTimeoutMs(); |
217 | 0 | if (header_expected_timeout_entry) { |
218 | 0 | trySetGlobalTimeout(*header_expected_timeout_entry, timeout); |
219 | 0 | } else { |
220 | 0 | const Http::HeaderEntry* header_timeout_entry = |
221 | 0 | request_headers.EnvoyUpstreamRequestTimeoutMs(); |
222 | |
|
223 | 0 | if (header_timeout_entry) { |
224 | 0 | trySetGlobalTimeout(*header_timeout_entry, timeout); |
225 | 0 | request_headers.removeEnvoyUpstreamRequestTimeoutMs(); |
226 | 0 | } |
227 | 0 | } |
228 | 1.81k | } else { |
229 | 1.81k | const Http::HeaderEntry* header_timeout_entry = request_headers.EnvoyUpstreamRequestTimeoutMs(); |
230 | | |
231 | 1.81k | if (header_timeout_entry) { |
232 | 0 | trySetGlobalTimeout(*header_timeout_entry, timeout); |
233 | 0 | request_headers.removeEnvoyUpstreamRequestTimeoutMs(); |
234 | 0 | } |
235 | 1.81k | } |
236 | | |
237 | | // See if there is a per try/retry timeout. If it's >= global we just ignore it. |
238 | 1.81k | const absl::string_view per_try_timeout_entry = |
239 | 1.81k | request_headers.getEnvoyUpstreamRequestPerTryTimeoutMsValue(); |
240 | 1.81k | if (!per_try_timeout_entry.empty()) { |
241 | 0 | if (absl::SimpleAtoi(per_try_timeout_entry, &header_timeout)) { |
242 | 0 | timeout.per_try_timeout_ = std::chrono::milliseconds(header_timeout); |
243 | 0 | } |
244 | 0 | request_headers.removeEnvoyUpstreamRequestPerTryTimeoutMs(); |
245 | 0 | } |
246 | | |
247 | 1.81k | if (timeout.per_try_timeout_ >= timeout.global_timeout_ && timeout.global_timeout_.count() != 0) { |
248 | 0 | timeout.per_try_timeout_ = std::chrono::milliseconds(0); |
249 | 0 | } |
250 | | |
251 | 1.81k | setTimeoutHeaders(0, timeout, route, request_headers, insert_envoy_expected_request_timeout_ms, |
252 | 1.81k | grpc_request, per_try_timeout_hedging_enabled); |
253 | | |
254 | 1.81k | return timeout; |
255 | 1.81k | } |
256 | | |
257 | | void FilterUtility::setTimeoutHeaders(uint64_t elapsed_time, const TimeoutData& timeout, |
258 | | const RouteEntry& route, |
259 | | Http::RequestHeaderMap& request_headers, |
260 | | bool insert_envoy_expected_request_timeout_ms, |
261 | 1.81k | bool grpc_request, bool per_try_timeout_hedging_enabled) { |
262 | | |
263 | 1.81k | const uint64_t global_timeout = timeout.global_timeout_.count(); |
264 | | |
265 | | // See if there is any timeout to write in the expected timeout header. |
266 | 1.81k | uint64_t expected_timeout = timeout.per_try_timeout_.count(); |
267 | | |
268 | | // Use the global timeout if no per try timeout was specified or if we're |
269 | | // doing hedging when there are per try timeouts. Either of these scenarios |
270 | | // mean that the upstream server can use the full global timeout. |
271 | 1.81k | if (per_try_timeout_hedging_enabled || expected_timeout == 0) { |
272 | 1.81k | expected_timeout = global_timeout; |
273 | 1.81k | } |
274 | | |
275 | | // If the expected timeout is 0 set no timeout, as Envoy treats 0 as infinite timeout. |
276 | 1.81k | if (expected_timeout > 0) { |
277 | | |
278 | 987 | if (global_timeout > 0) { |
279 | 987 | if (elapsed_time >= global_timeout) { |
280 | | // We are out of time, but 0 would be an infinite timeout. So instead we send a 1ms timeout |
281 | | // and assume the timers armed by onRequestComplete() will fire very soon. |
282 | 0 | expected_timeout = 1; |
283 | 987 | } else { |
284 | 987 | expected_timeout = std::min(expected_timeout, global_timeout - elapsed_time); |
285 | 987 | } |
286 | 987 | } |
287 | | |
288 | 987 | if (insert_envoy_expected_request_timeout_ms) { |
289 | 982 | request_headers.setEnvoyExpectedRequestTimeoutMs(expected_timeout); |
290 | 982 | } |
291 | | |
292 | | // If we've configured max_grpc_timeout, override the grpc-timeout header with |
293 | | // the expected timeout. This ensures that the optional per try timeout is reflected |
294 | | // in grpc-timeout, ensuring that the upstream gRPC server is aware of the actual timeout. |
295 | 987 | if (grpc_request && !route.usingNewTimeouts() && route.maxGrpcTimeout()) { |
296 | 0 | Grpc::Common::toGrpcTimeout(std::chrono::milliseconds(expected_timeout), request_headers); |
297 | 0 | } |
298 | 987 | } |
299 | 1.81k | } |
300 | | |
301 | | absl::optional<std::chrono::milliseconds> |
302 | 0 | FilterUtility::tryParseHeaderTimeout(const Http::HeaderEntry& header_timeout_entry) { |
303 | 0 | uint64_t header_timeout; |
304 | 0 | if (absl::SimpleAtoi(header_timeout_entry.value().getStringView(), &header_timeout)) { |
305 | 0 | return std::chrono::milliseconds(header_timeout); |
306 | 0 | } |
307 | 0 | return absl::nullopt; |
308 | 0 | } |
309 | | |
310 | | void FilterUtility::trySetGlobalTimeout(const Http::HeaderEntry& header_timeout_entry, |
311 | 0 | TimeoutData& timeout) { |
312 | 0 | const auto timeout_ms = tryParseHeaderTimeout(header_timeout_entry); |
313 | 0 | if (timeout_ms.has_value()) { |
314 | 0 | timeout.global_timeout_ = timeout_ms.value(); |
315 | 0 | } |
316 | 0 | } |
317 | | |
318 | | FilterUtility::HedgingParams |
319 | | FilterUtility::finalHedgingParams(const RouteEntry& route, |
320 | 1.81k | Http::RequestHeaderMap& request_headers) { |
321 | 1.81k | HedgingParams hedging_params; |
322 | 1.81k | hedging_params.hedge_on_per_try_timeout_ = route.hedgePolicy().hedgeOnPerTryTimeout(); |
323 | | |
324 | 1.81k | const Http::HeaderEntry* hedge_on_per_try_timeout_entry = |
325 | 1.81k | request_headers.EnvoyHedgeOnPerTryTimeout(); |
326 | 1.81k | if (hedge_on_per_try_timeout_entry) { |
327 | 0 | if (hedge_on_per_try_timeout_entry->value() == "true") { |
328 | 0 | hedging_params.hedge_on_per_try_timeout_ = true; |
329 | 0 | } |
330 | 0 | if (hedge_on_per_try_timeout_entry->value() == "false") { |
331 | 0 | hedging_params.hedge_on_per_try_timeout_ = false; |
332 | 0 | } |
333 | |
|
334 | 0 | request_headers.removeEnvoyHedgeOnPerTryTimeout(); |
335 | 0 | } |
336 | | |
337 | 1.81k | return hedging_params; |
338 | 1.81k | } |
339 | | |
340 | 2.25k | Filter::~Filter() { |
341 | | // Upstream resources should already have been cleaned. |
342 | 2.25k | ASSERT(upstream_requests_.empty()); |
343 | 2.25k | ASSERT(!retry_state_); |
344 | 2.25k | } |
345 | | |
346 | | const FilterUtility::StrictHeaderChecker::HeaderCheckResult |
347 | | FilterUtility::StrictHeaderChecker::checkHeader(Http::RequestHeaderMap& headers, |
348 | 0 | const Http::LowerCaseString& target_header) { |
349 | 0 | if (target_header == Http::Headers::get().EnvoyUpstreamRequestTimeoutMs) { |
350 | 0 | return isInteger(headers.EnvoyUpstreamRequestTimeoutMs()); |
351 | 0 | } else if (target_header == Http::Headers::get().EnvoyUpstreamRequestPerTryTimeoutMs) { |
352 | 0 | return isInteger(headers.EnvoyUpstreamRequestPerTryTimeoutMs()); |
353 | 0 | } else if (target_header == Http::Headers::get().EnvoyMaxRetries) { |
354 | 0 | return isInteger(headers.EnvoyMaxRetries()); |
355 | 0 | } else if (target_header == Http::Headers::get().EnvoyRetryOn) { |
356 | 0 | return hasValidRetryFields(headers.EnvoyRetryOn(), &Router::RetryStateImpl::parseRetryOn); |
357 | 0 | } else if (target_header == Http::Headers::get().EnvoyRetryGrpcOn) { |
358 | 0 | return hasValidRetryFields(headers.EnvoyRetryGrpcOn(), |
359 | 0 | &Router::RetryStateImpl::parseRetryGrpcOn); |
360 | 0 | } |
361 | | // Should only validate headers for which we have implemented a validator. |
362 | 0 | PANIC("unexpectedly reached"); |
363 | 0 | } |
364 | | |
365 | 2.54k | Stats::StatName Filter::upstreamZone(Upstream::HostDescriptionConstSharedPtr upstream_host) { |
366 | 2.54k | return upstream_host ? upstream_host->localityZoneStatName() : config_->empty_stat_name_; |
367 | 2.54k | } |
368 | | |
369 | | void Filter::chargeUpstreamCode(uint64_t response_status_code, |
370 | | const Http::ResponseHeaderMap& response_headers, |
371 | | Upstream::HostDescriptionConstSharedPtr upstream_host, |
372 | 1.74k | bool dropped) { |
373 | | // Passing the response_status_code explicitly is an optimization to avoid |
374 | | // multiple calls to slow Http::Utility::getResponseStatus. |
375 | 1.74k | ASSERT(response_status_code == Http::Utility::getResponseStatus(response_headers)); |
376 | 1.74k | if (config_->emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck()) { |
377 | 1.74k | const Http::HeaderEntry* upstream_canary_header = response_headers.EnvoyUpstreamCanary(); |
378 | 1.74k | const bool is_canary = (upstream_canary_header && upstream_canary_header->value() == "true") || |
379 | 1.74k | (upstream_host ? upstream_host->canary() : false); |
380 | 1.74k | const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_); |
381 | | |
382 | 1.74k | Stats::StatName upstream_zone = upstreamZone(upstream_host); |
383 | 1.74k | Http::CodeStats::ResponseStatInfo info{ |
384 | 1.74k | config_->scope_, |
385 | 1.74k | cluster_->statsScope(), |
386 | 1.74k | config_->empty_stat_name_, |
387 | 1.74k | response_status_code, |
388 | 1.74k | internal_request, |
389 | 1.74k | route_->virtualHost().statName(), |
390 | 1.74k | request_vcluster_ ? request_vcluster_->statName() : config_->empty_stat_name_, |
391 | 1.74k | route_stats_context_.has_value() ? route_stats_context_->statName() |
392 | 1.74k | : config_->empty_stat_name_, |
393 | 1.74k | config_->zone_name_, |
394 | 1.74k | upstream_zone, |
395 | 1.74k | is_canary}; |
396 | | |
397 | 1.74k | Http::CodeStats& code_stats = httpContext().codeStats(); |
398 | 1.74k | code_stats.chargeResponseStat(info, exclude_http_code_stats_); |
399 | | |
400 | 1.74k | if (alt_stat_prefix_ != nullptr) { |
401 | 0 | Http::CodeStats::ResponseStatInfo alt_info{config_->scope_, |
402 | 0 | cluster_->statsScope(), |
403 | 0 | alt_stat_prefix_->statName(), |
404 | 0 | response_status_code, |
405 | 0 | internal_request, |
406 | 0 | config_->empty_stat_name_, |
407 | 0 | config_->empty_stat_name_, |
408 | 0 | config_->empty_stat_name_, |
409 | 0 | config_->zone_name_, |
410 | 0 | upstream_zone, |
411 | 0 | is_canary}; |
412 | 0 | code_stats.chargeResponseStat(alt_info, exclude_http_code_stats_); |
413 | 0 | } |
414 | | |
415 | 1.74k | if (dropped) { |
416 | 0 | cluster_->loadReportStats().upstream_rq_dropped_.inc(); |
417 | 0 | } |
418 | 1.74k | if (upstream_host && Http::CodeUtility::is5xx(response_status_code)) { |
419 | 107 | upstream_host->stats().rq_error_.inc(); |
420 | 107 | } |
421 | 1.74k | } |
422 | 1.74k | } |
423 | | |
424 | | void Filter::chargeUpstreamCode(Http::Code code, |
425 | | Upstream::HostDescriptionConstSharedPtr upstream_host, |
426 | 107 | bool dropped) { |
427 | 107 | const uint64_t response_status_code = enumToInt(code); |
428 | 107 | const auto fake_response_headers = Http::createHeaderMap<Http::ResponseHeaderMapImpl>( |
429 | 107 | {{Http::Headers::get().Status, std::to_string(response_status_code)}}); |
430 | 107 | chargeUpstreamCode(response_status_code, *fake_response_headers, upstream_host, dropped); |
431 | 107 | } |
432 | | |
433 | 1.90k | Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) { |
434 | 1.90k | downstream_headers_ = &headers; |
435 | | |
436 | | // Extract debug configuration from filter state. This is used further along to determine whether |
437 | | // we should append cluster and host headers to the response, and whether to forward the request |
438 | | // upstream. |
439 | 1.90k | const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState(); |
440 | 1.90k | const DebugConfig* debug_config = filter_state->getDataReadOnly<DebugConfig>(DebugConfig::key()); |
441 | | |
442 | | // TODO: Maybe add a filter API for this. |
443 | 1.90k | grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers); |
444 | 1.90k | exclude_http_code_stats_ = grpc_request_ && config_->suppress_grpc_request_failure_code_stats_; |
445 | | |
446 | | // Only increment rq total stat if we actually decode headers here. This does not count requests |
447 | | // that get handled by earlier filters. |
448 | 1.90k | stats_.rq_total_.inc(); |
449 | | |
450 | | // Initialize the `modify_headers` function as a no-op (so we don't have to remember to check it |
451 | | // against nullptr before calling it), and feed it behavior later if/when we have cluster info |
452 | | // headers to append. |
453 | 1.90k | std::function<void(Http::ResponseHeaderMap&)> modify_headers = [](Http::ResponseHeaderMap&) {}; |
454 | | |
455 | | // Determine if there is a route entry or a direct response for the request. |
456 | 1.90k | route_ = callbacks_->route(); |
457 | 1.90k | if (!route_) { |
458 | 0 | stats_.no_route_.inc(); |
459 | 0 | ENVOY_STREAM_LOG(debug, "no route match for URL '{}'", *callbacks_, headers.getPathValue()); |
460 | |
|
461 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoRouteFound); |
462 | 0 | callbacks_->sendLocalReply(Http::Code::NotFound, "", modify_headers, absl::nullopt, |
463 | 0 | StreamInfo::ResponseCodeDetails::get().RouteNotFound); |
464 | 0 | return Http::FilterHeadersStatus::StopIteration; |
465 | 0 | } |
466 | | |
467 | | // Determine if there is a direct response for the request. |
468 | 1.90k | const auto* direct_response = route_->directResponseEntry(); |
469 | 1.90k | if (direct_response != nullptr) { |
470 | 89 | stats_.rq_direct_response_.inc(); |
471 | 89 | direct_response->rewritePathHeader(headers, !config_->suppress_envoy_headers_); |
472 | 89 | callbacks_->sendLocalReply( |
473 | 89 | direct_response->responseCode(), direct_response->responseBody(), |
474 | 89 | [this, direct_response, |
475 | 89 | &request_headers = headers](Http::ResponseHeaderMap& response_headers) -> void { |
476 | 89 | std::string new_uri; |
477 | 89 | if (request_headers.Path()) { |
478 | 89 | new_uri = direct_response->newUri(request_headers); |
479 | 89 | } |
480 | | // See https://tools.ietf.org/html/rfc7231#section-7.1.2. |
481 | 89 | const auto add_location = |
482 | 89 | direct_response->responseCode() == Http::Code::Created || |
483 | 89 | Http::CodeUtility::is3xx(enumToInt(direct_response->responseCode())); |
484 | 89 | if (!new_uri.empty() && add_location) { |
485 | 1 | response_headers.addReferenceKey(Http::Headers::get().Location, new_uri); |
486 | 1 | } |
487 | 89 | direct_response->finalizeResponseHeaders(response_headers, callbacks_->streamInfo()); |
488 | 89 | }, |
489 | 89 | absl::nullopt, StreamInfo::ResponseCodeDetails::get().DirectResponse); |
490 | 89 | return Http::FilterHeadersStatus::StopIteration; |
491 | 89 | } |
492 | | |
493 | | // A route entry matches for the request. |
494 | 1.82k | route_entry_ = route_->routeEntry(); |
495 | | // If there's a route specific limit and it's smaller than general downstream |
496 | | // limits, apply the new cap. |
497 | 1.82k | retry_shadow_buffer_limit_ = |
498 | 1.82k | std::min(retry_shadow_buffer_limit_, route_entry_->retryShadowBufferLimit()); |
499 | 1.82k | if (debug_config && debug_config->append_cluster_) { |
500 | | // The cluster name will be appended to any local or upstream responses from this point. |
501 | 0 | modify_headers = [this, debug_config](Http::ResponseHeaderMap& headers) { |
502 | 0 | headers.addCopy(debug_config->cluster_header_.value_or(Http::Headers::get().EnvoyCluster), |
503 | 0 | route_entry_->clusterName()); |
504 | 0 | }; |
505 | 0 | } |
506 | 1.82k | Upstream::ThreadLocalCluster* cluster = |
507 | 1.82k | config_->cm_.getThreadLocalCluster(route_entry_->clusterName()); |
508 | 1.82k | if (!cluster) { |
509 | 4 | stats_.no_cluster_.inc(); |
510 | 4 | ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName()); |
511 | | |
512 | 4 | callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoClusterFound); |
513 | 4 | callbacks_->sendLocalReply(route_entry_->clusterNotFoundResponseCode(), "", modify_headers, |
514 | 4 | absl::nullopt, |
515 | 4 | StreamInfo::ResponseCodeDetails::get().ClusterNotFound); |
516 | 4 | return Http::FilterHeadersStatus::StopIteration; |
517 | 4 | } |
518 | 1.81k | cluster_ = cluster->info(); |
519 | | |
520 | | // Set up stat prefixes, etc. |
521 | 1.81k | request_vcluster_ = route_->virtualHost().virtualCluster(headers); |
522 | 1.81k | if (request_vcluster_ != nullptr) { |
523 | 5 | callbacks_->streamInfo().setVirtualClusterName(request_vcluster_->name()); |
524 | 5 | } |
525 | 1.81k | route_stats_context_ = route_entry_->routeStatsContext(); |
526 | 1.81k | ENVOY_STREAM_LOG(debug, "cluster '{}' match for URL '{}'", *callbacks_, |
527 | 1.81k | route_entry_->clusterName(), headers.getPathValue()); |
528 | | |
529 | 1.81k | if (config_->strict_check_headers_ != nullptr) { |
530 | 0 | for (const auto& header : *config_->strict_check_headers_) { |
531 | 0 | const auto res = FilterUtility::StrictHeaderChecker::checkHeader(headers, header); |
532 | 0 | if (!res.valid_) { |
533 | 0 | callbacks_->streamInfo().setResponseFlag( |
534 | 0 | StreamInfo::CoreResponseFlag::InvalidEnvoyRequestHeaders); |
535 | 0 | const std::string body = fmt::format("invalid header '{}' with value '{}'", |
536 | 0 | std::string(res.entry_->key().getStringView()), |
537 | 0 | std::string(res.entry_->value().getStringView())); |
538 | 0 | const std::string details = |
539 | 0 | absl::StrCat(StreamInfo::ResponseCodeDetails::get().InvalidEnvoyRequestHeaders, "{", |
540 | 0 | StringUtil::replaceAllEmptySpace(res.entry_->key().getStringView()), "}"); |
541 | 0 | callbacks_->sendLocalReply(Http::Code::BadRequest, body, nullptr, absl::nullopt, details); |
542 | 0 | return Http::FilterHeadersStatus::StopIteration; |
543 | 0 | } |
544 | 0 | } |
545 | 0 | } |
546 | | |
547 | 1.81k | const Http::HeaderEntry* request_alt_name = headers.EnvoyUpstreamAltStatName(); |
548 | 1.81k | if (request_alt_name) { |
549 | 0 | alt_stat_prefix_ = std::make_unique<Stats::StatNameDynamicStorage>( |
550 | 0 | request_alt_name->value().getStringView(), config_->scope_.symbolTable()); |
551 | 0 | headers.removeEnvoyUpstreamAltStatName(); |
552 | 0 | } |
553 | | |
554 | | // See if we are supposed to immediately kill some percentage of this cluster's traffic. |
555 | 1.81k | if (cluster_->maintenanceMode()) { |
556 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow); |
557 | 0 | chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true); |
558 | 0 | callbacks_->sendLocalReply( |
559 | 0 | Http::Code::ServiceUnavailable, "maintenance mode", |
560 | 0 | [modify_headers, this](Http::ResponseHeaderMap& headers) { |
561 | 0 | if (!config_->suppress_envoy_headers_) { |
562 | 0 | headers.addReference(Http::Headers::get().EnvoyOverloaded, |
563 | 0 | Http::Headers::get().EnvoyOverloadedValues.True); |
564 | 0 | } |
565 | | // Note: append_cluster_info does not respect suppress_envoy_headers. |
566 | 0 | modify_headers(headers); |
567 | 0 | }, |
568 | 0 | absl::nullopt, StreamInfo::ResponseCodeDetails::get().MaintenanceMode); |
569 | 0 | cluster_->trafficStats()->upstream_rq_maintenance_mode_.inc(); |
570 | 0 | return Http::FilterHeadersStatus::StopIteration; |
571 | 0 | } |
572 | | |
573 | | // Support DROP_OVERLOAD config from control plane to drop certain percentage of traffic. |
574 | 1.81k | if (checkDropOverload(*cluster, modify_headers)) { |
575 | 0 | return Http::FilterHeadersStatus::StopIteration; |
576 | 0 | } |
577 | | |
578 | | // Fetch a connection pool for the upstream cluster. |
579 | 1.81k | const auto& upstream_http_protocol_options = cluster_->upstreamHttpProtocolOptions(); |
580 | | |
581 | 1.81k | if (upstream_http_protocol_options.has_value() && |
582 | 1.81k | (upstream_http_protocol_options.value().auto_sni() || |
583 | 0 | upstream_http_protocol_options.value().auto_san_validation())) { |
584 | | // Default the header to Host/Authority header. |
585 | 0 | std::string header_value = route_entry_->getRequestHostValue(headers); |
586 | 0 | if (!Runtime::runtimeFeatureEnabled( |
587 | 0 | "envoy.reloadable_features.use_route_host_mutation_for_auto_sni_san")) { |
588 | 0 | header_value = std::string(headers.getHostValue()); |
589 | 0 | } |
590 | | |
591 | | // Check whether `override_auto_sni_header` is specified. |
592 | 0 | const auto override_auto_sni_header = |
593 | 0 | upstream_http_protocol_options.value().override_auto_sni_header(); |
594 | 0 | if (!override_auto_sni_header.empty()) { |
595 | | // Use the header value from `override_auto_sni_header` to set the SNI value. |
596 | 0 | const auto overridden_header_value = Http::HeaderUtility::getAllOfHeaderAsString( |
597 | 0 | headers, Http::LowerCaseString(override_auto_sni_header)); |
598 | 0 | if (overridden_header_value.result().has_value() && |
599 | 0 | !overridden_header_value.result().value().empty()) { |
600 | 0 | header_value = overridden_header_value.result().value(); |
601 | 0 | } |
602 | 0 | } |
603 | 0 | const auto parsed_authority = Http::Utility::parseAuthority(header_value); |
604 | 0 | bool should_set_sni = !parsed_authority.is_ip_address_; |
605 | | // `host_` returns a string_view so doing this should be safe. |
606 | 0 | absl::string_view sni_value = parsed_authority.host_; |
607 | |
|
608 | 0 | if (should_set_sni && upstream_http_protocol_options.value().auto_sni() && |
609 | 0 | !callbacks_->streamInfo().filterState()->hasDataWithName( |
610 | 0 | Network::UpstreamServerName::key())) { |
611 | 0 | callbacks_->streamInfo().filterState()->setData( |
612 | 0 | Network::UpstreamServerName::key(), |
613 | 0 | std::make_unique<Network::UpstreamServerName>(sni_value), |
614 | 0 | StreamInfo::FilterState::StateType::Mutable); |
615 | 0 | } |
616 | |
|
617 | 0 | if (upstream_http_protocol_options.value().auto_san_validation() && |
618 | 0 | !callbacks_->streamInfo().filterState()->hasDataWithName( |
619 | 0 | Network::UpstreamSubjectAltNames::key())) { |
620 | 0 | callbacks_->streamInfo().filterState()->setData( |
621 | 0 | Network::UpstreamSubjectAltNames::key(), |
622 | 0 | std::make_unique<Network::UpstreamSubjectAltNames>( |
623 | 0 | std::vector<std::string>{std::string(sni_value)}), |
624 | 0 | StreamInfo::FilterState::StateType::Mutable); |
625 | 0 | } |
626 | 0 | } |
627 | | |
628 | 1.81k | transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState( |
629 | 1.81k | *callbacks_->streamInfo().filterState()); |
630 | | |
631 | 1.81k | if (auto downstream_connection = downstreamConnection(); downstream_connection != nullptr) { |
632 | 987 | if (auto typed_state = downstream_connection->streamInfo() |
633 | 987 | .filterState() |
634 | 987 | .getDataReadOnly<Network::UpstreamSocketOptionsFilterState>( |
635 | 987 | Network::UpstreamSocketOptionsFilterState::key()); |
636 | 987 | typed_state != nullptr) { |
637 | 0 | auto downstream_options = typed_state->value(); |
638 | 0 | if (!upstream_options_) { |
639 | 0 | upstream_options_ = std::make_shared<Network::Socket::Options>(); |
640 | 0 | } |
641 | 0 | Network::Socket::appendOptions(upstream_options_, downstream_options); |
642 | 0 | } |
643 | 987 | } |
644 | | |
645 | 1.81k | if (upstream_options_ && callbacks_->getUpstreamSocketOptions()) { |
646 | 0 | Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions()); |
647 | 0 | } |
648 | | |
649 | 1.81k | std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster); |
650 | | |
651 | 1.81k | if (!generic_conn_pool) { |
652 | 0 | sendNoHealthyUpstreamResponse(); |
653 | 0 | return Http::FilterHeadersStatus::StopIteration; |
654 | 0 | } |
655 | 1.81k | Upstream::HostDescriptionConstSharedPtr host = generic_conn_pool->host(); |
656 | | |
657 | 1.81k | if (debug_config && debug_config->append_upstream_host_) { |
658 | | // The hostname and address will be appended to any local or upstream responses from this point, |
659 | | // possibly in addition to the cluster name. |
660 | 0 | modify_headers = [modify_headers, debug_config, host](Http::ResponseHeaderMap& headers) { |
661 | 0 | modify_headers(headers); |
662 | 0 | headers.addCopy( |
663 | 0 | debug_config->hostname_header_.value_or(Http::Headers::get().EnvoyUpstreamHostname), |
664 | 0 | host->hostname()); |
665 | 0 | headers.addCopy(debug_config->host_address_header_.value_or( |
666 | 0 | Http::Headers::get().EnvoyUpstreamHostAddress), |
667 | 0 | host->address()->asString()); |
668 | 0 | }; |
669 | 0 | } |
670 | | |
671 | | // If we've been instructed not to forward the request upstream, send an empty local response. |
672 | 1.81k | if (debug_config && debug_config->do_not_forward_) { |
673 | 0 | modify_headers = [modify_headers, debug_config](Http::ResponseHeaderMap& headers) { |
674 | 0 | modify_headers(headers); |
675 | 0 | headers.addCopy( |
676 | 0 | debug_config->not_forwarded_header_.value_or(Http::Headers::get().EnvoyNotForwarded), |
677 | 0 | "true"); |
678 | 0 | }; |
679 | 0 | callbacks_->sendLocalReply(Http::Code::NoContent, "", modify_headers, absl::nullopt, ""); |
680 | 0 | return Http::FilterHeadersStatus::StopIteration; |
681 | 0 | } |
682 | | |
683 | 1.81k | if (callbacks_->shouldLoadShed()) { |
684 | 0 | callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "envoy overloaded", nullptr, |
685 | 0 | absl::nullopt, StreamInfo::ResponseCodeDetails::get().Overload); |
686 | 0 | stats_.rq_overload_local_reply_.inc(); |
687 | 0 | return Http::FilterHeadersStatus::StopIteration; |
688 | 0 | } |
689 | | |
690 | 1.81k | hedging_params_ = FilterUtility::finalHedgingParams(*route_entry_, headers); |
691 | | |
692 | 1.81k | timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_->suppress_envoy_headers_, |
693 | 1.81k | grpc_request_, hedging_params_.hedge_on_per_try_timeout_, |
694 | 1.81k | config_->respect_expected_rq_timeout_); |
695 | | |
696 | 1.81k | const Http::HeaderEntry* header_max_stream_duration_entry = |
697 | 1.81k | headers.EnvoyUpstreamStreamDurationMs(); |
698 | 1.81k | if (header_max_stream_duration_entry) { |
699 | 0 | dynamic_max_stream_duration_ = |
700 | 0 | FilterUtility::tryParseHeaderTimeout(*header_max_stream_duration_entry); |
701 | 0 | headers.removeEnvoyUpstreamStreamDurationMs(); |
702 | 0 | } |
703 | | |
704 | | // If this header is set with any value, use an alternate response code on timeout |
705 | 1.81k | if (headers.EnvoyUpstreamRequestTimeoutAltResponse()) { |
706 | 0 | timeout_response_code_ = Http::Code::NoContent; |
707 | 0 | headers.removeEnvoyUpstreamRequestTimeoutAltResponse(); |
708 | 0 | } |
709 | | |
710 | 1.81k | include_attempt_count_in_request_ = route_entry_->includeAttemptCountInRequest(); |
711 | 1.81k | if (include_attempt_count_in_request_) { |
712 | 0 | headers.setEnvoyAttemptCount(attempt_count_); |
713 | 0 | } |
714 | | |
715 | | // The router has reached a point where it is going to try to send a request upstream, |
716 | | // so now modify_headers should attach x-envoy-attempt-count to the downstream response if the |
717 | | // config flag is true. |
718 | 1.81k | if (route_entry_->includeAttemptCountInResponse()) { |
719 | 0 | modify_headers = [modify_headers, this](Http::ResponseHeaderMap& headers) { |
720 | 0 | modify_headers(headers); |
721 | | |
722 | | // This header is added without checking for config_->suppress_envoy_headers_ to mirror what |
723 | | // is done for upstream requests. |
724 | 0 | headers.setEnvoyAttemptCount(attempt_count_); |
725 | 0 | }; |
726 | 0 | } |
727 | 1.81k | callbacks_->streamInfo().setAttemptCount(attempt_count_); |
728 | | |
729 | 1.81k | route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(), |
730 | 1.81k | !config_->suppress_envoy_headers_); |
731 | 1.81k | FilterUtility::setUpstreamScheme( |
732 | 1.81k | headers, callbacks_->streamInfo().downstreamAddressProvider().sslConnection() != nullptr, |
733 | 1.81k | host->transportSocketFactory().sslCtx() != nullptr, |
734 | 1.81k | callbacks_->streamInfo().shouldSchemeMatchUpstream()); |
735 | | |
736 | | // Ensure an http transport scheme is selected before continuing with decoding. |
737 | 1.81k | ASSERT(headers.Scheme()); |
738 | | |
739 | 1.81k | retry_state_ = createRetryState( |
740 | 1.81k | route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_, route_stats_context_, |
741 | 1.81k | config_->factory_context_, callbacks_->dispatcher(), route_entry_->priority()); |
742 | | |
743 | | // Determine which shadow policies to use. It's possible that we don't do any shadowing due to |
744 | | // runtime keys. Also the method CONNECT doesn't support shadowing. |
745 | 1.81k | auto method = headers.getMethodValue(); |
746 | 1.81k | if (method != Http::Headers::get().MethodValues.Connect) { |
747 | 1.81k | for (const auto& shadow_policy : route_entry_->shadowPolicies()) { |
748 | 0 | const auto& policy_ref = *shadow_policy; |
749 | 0 | if (FilterUtility::shouldShadow(policy_ref, config_->runtime_, callbacks_->streamId())) { |
750 | 0 | active_shadow_policies_.push_back(std::cref(policy_ref)); |
751 | 0 | shadow_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*downstream_headers_); |
752 | 0 | } |
753 | 0 | } |
754 | 1.81k | } |
755 | | |
756 | 1.81k | ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers); |
757 | | |
758 | | // Hang onto the modify_headers function for later use in handling upstream responses. |
759 | 1.81k | modify_headers_ = modify_headers; |
760 | | |
761 | 1.81k | const bool can_send_early_data = |
762 | 1.81k | route_entry_->earlyDataPolicy().allowsEarlyDataForRequest(*downstream_headers_); |
763 | | |
764 | 1.81k | include_timeout_retry_header_in_request_ = route_->virtualHost().includeIsTimeoutRetryHeader(); |
765 | | |
766 | | // Set initial HTTP/3 use based on the presence of HTTP/1.1 proxy config. |
767 | | // For retries etc, HTTP/3 usability may transition from true to false, but |
768 | | // will never transition from false to true. |
769 | 1.81k | bool can_use_http3 = |
770 | 1.81k | !transport_socket_options_ || !transport_socket_options_->http11ProxyInfo().has_value(); |
771 | 1.81k | UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>( |
772 | 1.81k | *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3, |
773 | 1.81k | allow_multiplexed_upstream_half_close_ /*enable_half_close*/); |
774 | 1.81k | LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_); |
775 | 1.81k | upstream_requests_.front()->acceptHeadersFromRouter(end_stream); |
776 | 1.81k | if (streaming_shadows_) { |
777 | | // start the shadow streams. |
778 | 0 | for (const auto& shadow_policy_wrapper : active_shadow_policies_) { |
779 | 0 | const auto& shadow_policy = shadow_policy_wrapper.get(); |
780 | 0 | const absl::optional<absl::string_view> shadow_cluster_name = |
781 | 0 | getShadowCluster(shadow_policy, *downstream_headers_); |
782 | 0 | if (!shadow_cluster_name.has_value()) { |
783 | 0 | continue; |
784 | 0 | } |
785 | 0 | auto shadow_headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_); |
786 | 0 | auto options = |
787 | 0 | Http::AsyncClient::RequestOptions() |
788 | 0 | .setTimeout(timeout_.global_timeout_) |
789 | 0 | .setParentSpan(callbacks_->activeSpan()) |
790 | 0 | .setChildSpanName("mirror") |
791 | 0 | .setSampled(shadow_policy.traceSampled()) |
792 | 0 | .setIsShadow(true) |
793 | 0 | .setIsShadowSuffixDisabled(shadow_policy.disableShadowHostSuffixAppend()) |
794 | 0 | .setBufferAccount(callbacks_->account()) |
795 | | // A buffer limit of 1 is set in the case that retry_shadow_buffer_limit_ == 0, |
796 | | // because a buffer limit of zero on async clients is interpreted as no buffer limit. |
797 | 0 | .setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_) |
798 | 0 | .setDiscardResponseBody(true); |
799 | 0 | options.setFilterConfig(config_); |
800 | 0 | if (end_stream) { |
801 | | // This is a header-only request, and can be dispatched immediately to the shadow |
802 | | // without waiting. |
803 | 0 | Http::RequestMessagePtr request(new Http::RequestMessageImpl( |
804 | 0 | Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_))); |
805 | 0 | config_->shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request), |
806 | 0 | options); |
807 | 0 | } else { |
808 | 0 | Http::AsyncClient::OngoingRequest* shadow_stream = config_->shadowWriter().streamingShadow( |
809 | 0 | std::string(shadow_cluster_name.value()), std::move(shadow_headers), options); |
810 | 0 | if (shadow_stream != nullptr) { |
811 | 0 | shadow_streams_.insert(shadow_stream); |
812 | 0 | shadow_stream->setDestructorCallback( |
813 | 0 | [this, shadow_stream]() { shadow_streams_.erase(shadow_stream); }); |
814 | 0 | shadow_stream->setWatermarkCallbacks(watermark_callbacks_); |
815 | 0 | } |
816 | 0 | } |
817 | 0 | } |
818 | 0 | } |
819 | 1.81k | if (end_stream) { |
820 | 585 | onRequestComplete(); |
821 | 585 | } |
822 | | |
823 | 1.81k | return Http::FilterHeadersStatus::StopIteration; |
824 | 1.81k | } |
825 | | |
826 | | std::unique_ptr<GenericConnPool> |
827 | 1.81k | Filter::createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster) { |
828 | 1.81k | GenericConnPoolFactory* factory = nullptr; |
829 | 1.81k | if (cluster_->upstreamConfig().has_value()) { |
830 | 0 | factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>( |
831 | 0 | cluster_->upstreamConfig().ref()); |
832 | 0 | ENVOY_BUG(factory != nullptr, |
833 | 0 | fmt::format("invalid factory type '{}', failing over to default upstream", |
834 | 0 | cluster_->upstreamConfig().ref().DebugString())); |
835 | 0 | } |
836 | 1.81k | if (!factory) { |
837 | 1.81k | factory = &config_->router_context_.genericConnPoolFactory(); |
838 | 1.81k | } |
839 | | |
840 | 1.81k | using UpstreamProtocol = Envoy::Router::GenericConnPoolFactory::UpstreamProtocol; |
841 | 1.81k | UpstreamProtocol upstream_protocol = UpstreamProtocol::HTTP; |
842 | 1.81k | if (route_entry_->connectConfig().has_value()) { |
843 | 0 | auto method = downstream_headers_->getMethodValue(); |
844 | 0 | if (Http::HeaderUtility::isConnectUdpRequest(*downstream_headers_)) { |
845 | 0 | upstream_protocol = UpstreamProtocol::UDP; |
846 | 0 | } else if (method == Http::Headers::get().MethodValues.Connect || |
847 | 0 | (route_entry_->connectConfig()->allow_post() && |
848 | 0 | method == Http::Headers::get().MethodValues.Post)) { |
849 | | // Allow POST for proxying raw TCP if it is configured. |
850 | 0 | upstream_protocol = UpstreamProtocol::TCP; |
851 | 0 | } |
852 | 0 | } |
853 | 1.81k | return factory->createGenericConnPool(thread_local_cluster, upstream_protocol, |
854 | 1.81k | route_entry_->priority(), |
855 | 1.81k | callbacks_->streamInfo().protocol(), this); |
856 | 1.81k | } |
857 | | |
858 | 0 | void Filter::sendNoHealthyUpstreamResponse() { |
859 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoHealthyUpstream); |
860 | 0 | chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, false); |
861 | 0 | callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream", modify_headers_, |
862 | 0 | absl::nullopt, |
863 | 0 | StreamInfo::ResponseCodeDetails::get().NoHealthyUpstream); |
864 | 0 | } |
865 | | |
866 | 3.05k | Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) { |
867 | | // upstream_requests_.size() cannot be > 1 because that only happens when a per |
868 | | // try timeout occurs with hedge_on_per_try_timeout enabled but the per |
869 | | // try timeout timer is not started until onRequestComplete(). It could be zero |
870 | | // if the first request attempt has already failed and a retry is waiting for |
871 | | // a backoff timer. |
872 | 3.05k | ASSERT(upstream_requests_.size() <= 1); |
873 | | |
874 | 3.05k | bool buffering = (retry_state_ && retry_state_->enabled()) || |
875 | 3.05k | (!active_shadow_policies_.empty() && !streaming_shadows_) || |
876 | 3.05k | (route_entry_ && route_entry_->internalRedirectPolicy().enabled()); |
877 | 3.05k | if (buffering && |
878 | 3.05k | getLength(callbacks_->decodingBuffer()) + data.length() > retry_shadow_buffer_limit_) { |
879 | 0 | ENVOY_LOG(debug, |
880 | 0 | "The request payload has at least {} bytes data which exceeds buffer limit {}. Give " |
881 | 0 | "up on the retry/shadow.", |
882 | 0 | getLength(callbacks_->decodingBuffer()) + data.length(), retry_shadow_buffer_limit_); |
883 | 0 | cluster_->trafficStats()->retry_or_shadow_abandoned_.inc(); |
884 | 0 | retry_state_.reset(); |
885 | 0 | buffering = false; |
886 | 0 | active_shadow_policies_.clear(); |
887 | 0 | request_buffer_overflowed_ = true; |
888 | | |
889 | | // If we had to abandon buffering and there's no request in progress, abort the request and |
890 | | // clean up. This happens if the initial upstream request failed, and we are currently waiting |
891 | | // for a backoff timer before starting the next upstream attempt. |
892 | 0 | if (upstream_requests_.empty()) { |
893 | 0 | cleanup(); |
894 | 0 | callbacks_->sendLocalReply( |
895 | 0 | Http::Code::InsufficientStorage, "exceeded request buffer limit while retrying upstream", |
896 | 0 | modify_headers_, absl::nullopt, |
897 | 0 | StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit); |
898 | 0 | return Http::FilterDataStatus::StopIterationNoBuffer; |
899 | 0 | } |
900 | 0 | } |
901 | | |
902 | 3.05k | for (auto* shadow_stream : shadow_streams_) { |
903 | 0 | if (end_stream) { |
904 | 0 | shadow_stream->removeDestructorCallback(); |
905 | 0 | shadow_stream->removeWatermarkCallbacks(); |
906 | 0 | } |
907 | 0 | Buffer::OwnedImpl copy(data); |
908 | 0 | shadow_stream->sendData(copy, end_stream); |
909 | 0 | } |
910 | 3.05k | if (end_stream) { |
911 | 1.04k | shadow_streams_.clear(); |
912 | 1.04k | } |
913 | 3.05k | if (buffering) { |
914 | 0 | if (!upstream_requests_.empty()) { |
915 | 0 | Buffer::OwnedImpl copy(data); |
916 | 0 | upstream_requests_.front()->acceptDataFromRouter(copy, end_stream); |
917 | 0 | } |
918 | | |
919 | | // If we are potentially going to retry or buffer shadow this request we need to buffer. |
920 | | // This will not cause the connection manager to 413 because before we hit the |
921 | | // buffer limit we give up on retries and buffering. We must buffer using addDecodedData() |
922 | | // so that all buffered data is available by the time we do request complete processing and |
923 | | // potentially shadow. Additionally, we can't do a copy here because there's a check down |
924 | | // this stack for whether `data` is the same buffer as already buffered data. |
925 | 0 | callbacks_->addDecodedData(data, true); |
926 | 3.05k | } else { |
927 | 3.05k | if (!Runtime::runtimeFeatureEnabled( |
928 | 3.05k | "envoy.reloadable_features.send_local_reply_when_no_buffer_and_upstream_request")) { |
929 | 0 | upstream_requests_.front()->acceptDataFromRouter(data, end_stream); |
930 | 3.05k | } else { |
931 | 3.05k | if (!upstream_requests_.empty()) { |
932 | 3.05k | upstream_requests_.front()->acceptDataFromRouter(data, end_stream); |
933 | 3.05k | } else { |
934 | | // not buffering any data for retry, shadow, and internal redirect, and there will be |
935 | | // no more upstream request, abort the request and clean up. |
936 | 0 | cleanup(); |
937 | 0 | callbacks_->sendLocalReply( |
938 | 0 | Http::Code::ServiceUnavailable, |
939 | 0 | "upstream is closed prematurely during decoding data from downstream", modify_headers_, |
940 | 0 | absl::nullopt, StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset); |
941 | 0 | return Http::FilterDataStatus::StopIterationNoBuffer; |
942 | 0 | } |
943 | 3.05k | } |
944 | 3.05k | } |
945 | | |
946 | 3.05k | if (end_stream) { |
947 | 1.04k | onRequestComplete(); |
948 | 1.04k | } |
949 | | |
950 | 3.05k | return Http::FilterDataStatus::StopIterationNoBuffer; |
951 | 3.05k | } |
952 | | |
953 | 0 | Http::FilterTrailersStatus Filter::decodeTrailers(Http::RequestTrailerMap& trailers) { |
954 | 0 | ENVOY_STREAM_LOG(debug, "router decoding trailers:\n{}", *callbacks_, trailers); |
955 | |
|
956 | 0 | if (shadow_headers_) { |
957 | 0 | shadow_trailers_ = Http::createHeaderMap<Http::RequestTrailerMapImpl>(trailers); |
958 | 0 | } |
959 | | |
960 | | // upstream_requests_.size() cannot be > 1 because that only happens when a per |
961 | | // try timeout occurs with hedge_on_per_try_timeout enabled but the per |
962 | | // try timeout timer is not started until onRequestComplete(). It could be zero |
963 | | // if the first request attempt has already failed and a retry is waiting for |
964 | | // a backoff timer. |
965 | 0 | ASSERT(upstream_requests_.size() <= 1); |
966 | 0 | downstream_trailers_ = &trailers; |
967 | 0 | if (!upstream_requests_.empty()) { |
968 | 0 | upstream_requests_.front()->acceptTrailersFromRouter(trailers); |
969 | 0 | } |
970 | 0 | for (auto* shadow_stream : shadow_streams_) { |
971 | 0 | shadow_stream->removeDestructorCallback(); |
972 | 0 | shadow_stream->removeWatermarkCallbacks(); |
973 | 0 | shadow_stream->captureAndSendTrailers( |
974 | 0 | Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_)); |
975 | 0 | } |
976 | 0 | shadow_streams_.clear(); |
977 | |
|
978 | 0 | onRequestComplete(); |
979 | 0 | return Http::FilterTrailersStatus::StopIteration; |
980 | 0 | } |
981 | | |
982 | 26 | Http::FilterMetadataStatus Filter::decodeMetadata(Http::MetadataMap& metadata_map) { |
983 | 26 | Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map); |
984 | 26 | if (!upstream_requests_.empty()) { |
985 | | // TODO(soya3129): Save metadata for retry, redirect and shadowing case. |
986 | 26 | upstream_requests_.front()->acceptMetadataFromRouter(std::move(metadata_map_ptr)); |
987 | 26 | } |
988 | 26 | return Http::FilterMetadataStatus::Continue; |
989 | 26 | } |
990 | | |
991 | 2.25k | void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { |
992 | 2.25k | callbacks_ = &callbacks; |
993 | | // As the decoder filter only pushes back via watermarks once data has reached |
994 | | // it, it can latch the current buffer limit and does not need to update the |
995 | | // limit if another filter increases it. |
996 | | // |
997 | | // The default is "do not limit". If there are configured (non-zero) buffer |
998 | | // limits, apply them here. |
999 | 2.25k | if (callbacks_->decoderBufferLimit() != 0) { |
1000 | 1.41k | retry_shadow_buffer_limit_ = callbacks_->decoderBufferLimit(); |
1001 | 1.41k | } |
1002 | | |
1003 | 2.25k | watermark_callbacks_.setDecoderFilterCallbacks(callbacks_); |
1004 | 2.25k | } |
1005 | | |
1006 | 4.08k | void Filter::cleanup() { |
1007 | | // All callers of cleanup() should have cleaned out the upstream_requests_ |
1008 | | // list as appropriate. |
1009 | 4.08k | ASSERT(upstream_requests_.empty()); |
1010 | | |
1011 | 4.08k | retry_state_.reset(); |
1012 | 4.08k | if (response_timeout_) { |
1013 | 916 | response_timeout_->disableTimer(); |
1014 | 916 | response_timeout_.reset(); |
1015 | 916 | } |
1016 | 4.08k | } |
1017 | | |
1018 | | absl::optional<absl::string_view> Filter::getShadowCluster(const ShadowPolicy& policy, |
1019 | 0 | const Http::HeaderMap& headers) const { |
1020 | 0 | if (!policy.cluster().empty()) { |
1021 | 0 | return policy.cluster(); |
1022 | 0 | } else { |
1023 | 0 | ASSERT(!policy.clusterHeader().get().empty()); |
1024 | 0 | const auto entry = headers.get(policy.clusterHeader()); |
1025 | 0 | if (!entry.empty() && !entry[0]->value().empty()) { |
1026 | 0 | return entry[0]->value().getStringView(); |
1027 | 0 | } |
1028 | 0 | ENVOY_STREAM_LOG(debug, "There is no cluster name in header: {}", *callbacks_, |
1029 | 0 | policy.clusterHeader()); |
1030 | 0 | return absl::nullopt; |
1031 | 0 | } |
1032 | 0 | } |
1033 | | |
1034 | 1.62k | void Filter::maybeDoShadowing() { |
1035 | 1.62k | for (const auto& shadow_policy_wrapper : active_shadow_policies_) { |
1036 | 0 | const auto& shadow_policy = shadow_policy_wrapper.get(); |
1037 | |
|
1038 | 0 | const absl::optional<absl::string_view> shadow_cluster_name = |
1039 | 0 | getShadowCluster(shadow_policy, *downstream_headers_); |
1040 | | |
1041 | | // The cluster name got from headers is empty. |
1042 | 0 | if (!shadow_cluster_name.has_value()) { |
1043 | 0 | continue; |
1044 | 0 | } |
1045 | | |
1046 | 0 | Http::RequestMessagePtr request(new Http::RequestMessageImpl( |
1047 | 0 | Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_))); |
1048 | 0 | if (callbacks_->decodingBuffer()) { |
1049 | 0 | request->body().add(*callbacks_->decodingBuffer()); |
1050 | 0 | } |
1051 | 0 | if (shadow_trailers_) { |
1052 | 0 | request->trailers(Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_)); |
1053 | 0 | } |
1054 | |
|
1055 | 0 | auto options = Http::AsyncClient::RequestOptions() |
1056 | 0 | .setTimeout(timeout_.global_timeout_) |
1057 | 0 | .setParentSpan(callbacks_->activeSpan()) |
1058 | 0 | .setChildSpanName("mirror") |
1059 | 0 | .setSampled(shadow_policy.traceSampled()) |
1060 | 0 | .setIsShadow(true) |
1061 | 0 | .setIsShadowSuffixDisabled(shadow_policy.disableShadowHostSuffixAppend()); |
1062 | 0 | options.setFilterConfig(config_); |
1063 | 0 | config_->shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request), |
1064 | 0 | options); |
1065 | 0 | } |
1066 | 1.62k | } |
1067 | | |
1068 | 1.62k | void Filter::onRequestComplete() { |
1069 | | // This should be called exactly once, when the downstream request has been received in full. |
1070 | 1.62k | ASSERT(!downstream_end_stream_); |
1071 | 1.62k | downstream_end_stream_ = true; |
1072 | 1.62k | Event::Dispatcher& dispatcher = callbacks_->dispatcher(); |
1073 | 1.62k | downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime(); |
1074 | | |
1075 | | // Possible that we got an immediate reset. |
1076 | 1.62k | if (!upstream_requests_.empty()) { |
1077 | | // Even if we got an immediate reset, we could still shadow, but that is a riskier change and |
1078 | | // seems unnecessary right now. |
1079 | 1.62k | if (!streaming_shadows_) { |
1080 | 1.62k | maybeDoShadowing(); |
1081 | 1.62k | } |
1082 | | |
1083 | 1.62k | if (timeout_.global_timeout_.count() > 0) { |
1084 | 916 | response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); }); |
1085 | 916 | response_timeout_->enableTimer(timeout_.global_timeout_); |
1086 | 916 | } |
1087 | | |
1088 | 1.62k | for (auto& upstream_request : upstream_requests_) { |
1089 | 1.62k | if (upstream_request->createPerTryTimeoutOnRequestComplete()) { |
1090 | 1.15k | upstream_request->setupPerTryTimeout(); |
1091 | 1.15k | } |
1092 | 1.62k | } |
1093 | 1.62k | } |
1094 | 1.62k | } |
1095 | | |
1096 | 3.06k | void Filter::onDestroy() { |
1097 | | // Reset any in-flight upstream requests. |
1098 | 3.06k | resetAll(); |
1099 | | |
1100 | | // Unregister from shadow stream notifications and cancel active streams. |
1101 | 3.06k | for (auto* shadow_stream : shadow_streams_) { |
1102 | 0 | shadow_stream->removeDestructorCallback(); |
1103 | 0 | shadow_stream->removeWatermarkCallbacks(); |
1104 | 0 | shadow_stream->cancel(); |
1105 | 0 | } |
1106 | | |
1107 | 3.06k | cleanup(); |
1108 | 3.06k | } |
1109 | | |
1110 | 0 | void Filter::onResponseTimeout() { |
1111 | 0 | ENVOY_STREAM_LOG(debug, "upstream timeout", *callbacks_); |
1112 | | |
1113 | | // Reset any upstream requests that are still in flight. |
1114 | 0 | while (!upstream_requests_.empty()) { |
1115 | 0 | UpstreamRequestPtr upstream_request = |
1116 | 0 | upstream_requests_.back()->removeFromList(upstream_requests_); |
1117 | | |
1118 | | // We want to record the upstream timeouts and increase the stats counters in all the cases. |
1119 | | // For example, we also want to record the stats in the case of BiDi streaming APIs where we |
1120 | | // might have already seen the headers. |
1121 | 0 | cluster_->trafficStats()->upstream_rq_timeout_.inc(); |
1122 | 0 | if (request_vcluster_) { |
1123 | 0 | request_vcluster_->stats().upstream_rq_timeout_.inc(); |
1124 | 0 | } |
1125 | 0 | if (route_stats_context_.has_value()) { |
1126 | 0 | route_stats_context_->stats().upstream_rq_timeout_.inc(); |
1127 | 0 | } |
1128 | |
|
1129 | 0 | if (upstream_request->upstreamHost()) { |
1130 | 0 | upstream_request->upstreamHost()->stats().rq_timeout_.inc(); |
1131 | 0 | } |
1132 | |
|
1133 | 0 | if (upstream_request->awaitingHeaders()) { |
1134 | 0 | if (cluster_->timeoutBudgetStats().has_value()) { |
1135 | | // Cancel firing per-try timeout information, because the per-try timeout did not come into |
1136 | | // play when the global timeout was hit. |
1137 | 0 | upstream_request->recordTimeoutBudget(false); |
1138 | 0 | } |
1139 | | |
1140 | | // If this upstream request already hit a "soft" timeout, then it |
1141 | | // already recorded a timeout into outlier detection. Don't do it again. |
1142 | 0 | if (!upstream_request->outlierDetectionTimeoutRecorded()) { |
1143 | 0 | updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, *upstream_request, |
1144 | 0 | absl::optional<uint64_t>(enumToInt(timeout_response_code_))); |
1145 | 0 | } |
1146 | |
|
1147 | 0 | chargeUpstreamAbort(timeout_response_code_, false, *upstream_request); |
1148 | 0 | } |
1149 | 0 | upstream_request->resetStream(); |
1150 | 0 | } |
1151 | |
|
1152 | 0 | onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag::UpstreamRequestTimeout, |
1153 | 0 | StreamInfo::ResponseCodeDetails::get().ResponseTimeout); |
1154 | 0 | } |
1155 | | |
1156 | | // Called when the per try timeout is hit but we didn't reset the request |
1157 | | // (hedge_on_per_try_timeout enabled). |
1158 | 0 | void Filter::onSoftPerTryTimeout(UpstreamRequest& upstream_request) { |
1159 | 0 | ASSERT(!upstream_request.retried()); |
1160 | | // Track this as a timeout for outlier detection purposes even though we didn't |
1161 | | // cancel the request yet and might get a 2xx later. |
1162 | 0 | updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request, |
1163 | 0 | absl::optional<uint64_t>(enumToInt(timeout_response_code_))); |
1164 | 0 | upstream_request.outlierDetectionTimeoutRecorded(true); |
1165 | |
|
1166 | 0 | if (!downstream_response_started_ && retry_state_) { |
1167 | 0 | RetryStatus retry_status = retry_state_->shouldHedgeRetryPerTryTimeout( |
1168 | 0 | [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_]() -> void { |
1169 | | // Without any knowledge about what's going on in the connection pool, retry the request |
1170 | | // with the safest settings which is no early data but keep using or not using alt-svc as |
1171 | | // before. In this way, QUIC won't be falsely marked as broken. |
1172 | 0 | doRetry(/*can_send_early_data*/ false, can_use_http3, TimeoutRetry::Yes); |
1173 | 0 | }); |
1174 | |
|
1175 | 0 | if (retry_status == RetryStatus::Yes) { |
1176 | 0 | runRetryOptionsPredicates(upstream_request); |
1177 | 0 | pending_retries_++; |
1178 | | |
1179 | | // Don't increment upstream_host->stats().rq_error_ here, we'll do that |
1180 | | // later if 1) we hit global timeout or 2) we get bad response headers |
1181 | | // back. |
1182 | 0 | upstream_request.retried(true); |
1183 | | |
1184 | | // TODO: cluster stat for hedge attempted. |
1185 | 0 | } else if (retry_status == RetryStatus::NoOverflow) { |
1186 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow); |
1187 | 0 | } else if (retry_status == RetryStatus::NoRetryLimitExceeded) { |
1188 | 0 | callbacks_->streamInfo().setResponseFlag( |
1189 | 0 | StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded); |
1190 | 0 | } |
1191 | 0 | } |
1192 | 0 | } |
1193 | | |
1194 | 0 | void Filter::onPerTryIdleTimeout(UpstreamRequest& upstream_request) { |
1195 | 0 | onPerTryTimeoutCommon(upstream_request, |
1196 | 0 | cluster_->trafficStats()->upstream_rq_per_try_idle_timeout_, |
1197 | 0 | StreamInfo::ResponseCodeDetails::get().UpstreamPerTryIdleTimeout); |
1198 | 0 | } |
1199 | | |
1200 | 0 | void Filter::onPerTryTimeout(UpstreamRequest& upstream_request) { |
1201 | 0 | onPerTryTimeoutCommon(upstream_request, cluster_->trafficStats()->upstream_rq_per_try_timeout_, |
1202 | 0 | StreamInfo::ResponseCodeDetails::get().UpstreamPerTryTimeout); |
1203 | 0 | } |
1204 | | |
1205 | | void Filter::onPerTryTimeoutCommon(UpstreamRequest& upstream_request, Stats::Counter& error_counter, |
1206 | 0 | const std::string& response_code_details) { |
1207 | 0 | if (hedging_params_.hedge_on_per_try_timeout_) { |
1208 | 0 | onSoftPerTryTimeout(upstream_request); |
1209 | 0 | return; |
1210 | 0 | } |
1211 | | |
1212 | 0 | error_counter.inc(); |
1213 | 0 | if (upstream_request.upstreamHost()) { |
1214 | 0 | upstream_request.upstreamHost()->stats().rq_timeout_.inc(); |
1215 | 0 | } |
1216 | |
|
1217 | 0 | upstream_request.resetStream(); |
1218 | |
|
1219 | 0 | updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request, |
1220 | 0 | absl::optional<uint64_t>(enumToInt(timeout_response_code_))); |
1221 | |
|
1222 | 0 | if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::Yes)) { |
1223 | 0 | return; |
1224 | 0 | } |
1225 | | |
1226 | 0 | chargeUpstreamAbort(timeout_response_code_, false, upstream_request); |
1227 | | |
1228 | | // Remove this upstream request from the list now that we're done with it. |
1229 | 0 | upstream_request.removeFromList(upstream_requests_); |
1230 | 0 | onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag::UpstreamRequestTimeout, |
1231 | 0 | response_code_details); |
1232 | 0 | } |
1233 | | |
1234 | 0 | void Filter::onStreamMaxDurationReached(UpstreamRequest& upstream_request) { |
1235 | 0 | upstream_request.resetStream(); |
1236 | |
|
1237 | 0 | if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::No)) { |
1238 | 0 | return; |
1239 | 0 | } |
1240 | | |
1241 | 0 | upstream_request.removeFromList(upstream_requests_); |
1242 | 0 | cleanup(); |
1243 | |
|
1244 | 0 | callbacks_->streamInfo().setResponseFlag( |
1245 | 0 | StreamInfo::CoreResponseFlag::UpstreamMaxStreamDurationReached); |
1246 | | // Grab the const ref to call the const method of StreamInfo. |
1247 | 0 | const auto& stream_info = callbacks_->streamInfo(); |
1248 | 0 | const bool downstream_decode_complete = |
1249 | 0 | stream_info.downstreamTiming().has_value() && |
1250 | 0 | stream_info.downstreamTiming().value().get().lastDownstreamRxByteReceived().has_value(); |
1251 | | |
1252 | | // sendLocalReply may instead reset the stream if downstream_response_started_ is true. |
1253 | 0 | callbacks_->sendLocalReply( |
1254 | 0 | Http::Utility::maybeRequestTimeoutCode(downstream_decode_complete), |
1255 | 0 | "upstream max stream duration reached", modify_headers_, absl::nullopt, |
1256 | 0 | StreamInfo::ResponseCodeDetails::get().UpstreamMaxStreamDurationReached); |
1257 | 0 | } |
1258 | | |
1259 | | void Filter::updateOutlierDetection(Upstream::Outlier::Result result, |
1260 | | UpstreamRequest& upstream_request, |
1261 | 121 | absl::optional<uint64_t> code) { |
1262 | 121 | if (upstream_request.upstreamHost()) { |
1263 | 121 | upstream_request.upstreamHost()->outlierDetector().putResult(result, code); |
1264 | 121 | } |
1265 | 121 | } |
1266 | | |
1267 | 121 | void Filter::chargeUpstreamAbort(Http::Code code, bool dropped, UpstreamRequest& upstream_request) { |
1268 | 121 | if (downstream_response_started_) { |
1269 | 14 | if (upstream_request.grpcRqSuccessDeferred()) { |
1270 | 14 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1271 | 14 | stats_.rq_reset_after_downstream_response_started_.inc(); |
1272 | 14 | } |
1273 | 107 | } else { |
1274 | 107 | Upstream::HostDescriptionConstSharedPtr upstream_host = upstream_request.upstreamHost(); |
1275 | | |
1276 | 107 | chargeUpstreamCode(code, upstream_host, dropped); |
1277 | | // If we had non-5xx but still have been reset by backend or timeout before |
1278 | | // starting response, we treat this as an error. We only get non-5xx when |
1279 | | // timeout_response_code_ is used for code above, where this member can |
1280 | | // assume values such as 204 (NoContent). |
1281 | 107 | if (upstream_host != nullptr && !Http::CodeUtility::is5xx(enumToInt(code))) { |
1282 | 0 | upstream_host->stats().rq_error_.inc(); |
1283 | 0 | } |
1284 | 107 | } |
1285 | 121 | } |
1286 | | |
1287 | | void Filter::onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag response_flags, |
1288 | 0 | absl::string_view details) { |
1289 | 0 | Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats(); |
1290 | 0 | if (tb_stats.has_value()) { |
1291 | 0 | Event::Dispatcher& dispatcher = callbacks_->dispatcher(); |
1292 | 0 | std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>( |
1293 | 0 | dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_); |
1294 | |
|
1295 | 0 | tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue( |
1296 | 0 | FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_)); |
1297 | 0 | } |
1298 | |
|
1299 | 0 | const absl::string_view body = |
1300 | 0 | timeout_response_code_ == Http::Code::GatewayTimeout ? "upstream request timeout" : ""; |
1301 | 0 | onUpstreamAbort(timeout_response_code_, response_flags, body, false, details); |
1302 | 0 | } |
1303 | | |
1304 | | void Filter::onUpstreamAbort(Http::Code code, StreamInfo::CoreResponseFlag response_flags, |
1305 | 121 | absl::string_view body, bool dropped, absl::string_view details) { |
1306 | | // If we have not yet sent anything downstream, send a response with an appropriate status code. |
1307 | | // Otherwise just reset the ongoing response. |
1308 | 121 | callbacks_->streamInfo().setResponseFlag(response_flags); |
1309 | | // This will destroy any created retry timers. |
1310 | 121 | cleanup(); |
1311 | | // sendLocalReply may instead reset the stream if downstream_response_started_ is true. |
1312 | 121 | callbacks_->sendLocalReply( |
1313 | 121 | code, body, |
1314 | 121 | [dropped, this](Http::ResponseHeaderMap& headers) { |
1315 | 107 | if (dropped && !config_->suppress_envoy_headers_) { |
1316 | 0 | headers.addReference(Http::Headers::get().EnvoyOverloaded, |
1317 | 0 | Http::Headers::get().EnvoyOverloadedValues.True); |
1318 | 0 | } |
1319 | 107 | modify_headers_(headers); |
1320 | 107 | }, |
1321 | 121 | absl::nullopt, details); |
1322 | 121 | } |
1323 | | |
1324 | | bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason, |
1325 | 121 | UpstreamRequest& upstream_request, TimeoutRetry is_timeout_retry) { |
1326 | | // We don't retry if we already started the response, don't have a retry policy defined, |
1327 | | // or if we've already retried this upstream request (currently only possible if a per |
1328 | | // try timeout occurred and hedge_on_per_try_timeout is enabled). |
1329 | 121 | if (downstream_response_started_ || !retry_state_ || upstream_request.retried()) { |
1330 | 121 | return false; |
1331 | 121 | } |
1332 | 0 | RetryState::Http3Used was_using_http3 = RetryState::Http3Used::Unknown; |
1333 | 0 | if (upstream_request.hadUpstream()) { |
1334 | 0 | was_using_http3 = (upstream_request.streamInfo().protocol().has_value() && |
1335 | 0 | upstream_request.streamInfo().protocol().value() == Http::Protocol::Http3) |
1336 | 0 | ? RetryState::Http3Used::Yes |
1337 | 0 | : RetryState::Http3Used::No; |
1338 | 0 | } |
1339 | | |
1340 | | // If the current request in this router has sent data to the upstream, we consider the request |
1341 | | // started. |
1342 | 0 | upstream_request_started_ |= upstream_request.streamInfo() |
1343 | 0 | .upstreamInfo() |
1344 | 0 | ->upstreamTiming() |
1345 | 0 | .first_upstream_tx_byte_sent_.has_value(); |
1346 | |
|
1347 | 0 | const RetryStatus retry_status = retry_state_->shouldRetryReset( |
1348 | 0 | reset_reason, was_using_http3, |
1349 | 0 | [this, can_send_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_, |
1350 | 0 | can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_, |
1351 | 0 | is_timeout_retry](bool disable_http3) -> void { |
1352 | | // This retry might be because of ConnectionFailure of 0-RTT handshake. In this case, though |
1353 | | // the original request is retried with the same can_send_early_data setting, it will not be |
1354 | | // sent as early data by the underlying connection pool grid. |
1355 | 0 | doRetry(can_send_early_data, disable_http3 ? false : can_use_http3, is_timeout_retry); |
1356 | 0 | }, |
1357 | 0 | upstream_request_started_); |
1358 | 0 | if (retry_status == RetryStatus::Yes) { |
1359 | 0 | runRetryOptionsPredicates(upstream_request); |
1360 | 0 | pending_retries_++; |
1361 | |
|
1362 | 0 | if (upstream_request.upstreamHost()) { |
1363 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1364 | 0 | } |
1365 | |
|
1366 | 0 | auto request_ptr = upstream_request.removeFromList(upstream_requests_); |
1367 | 0 | callbacks_->dispatcher().deferredDelete(std::move(request_ptr)); |
1368 | 0 | return true; |
1369 | 0 | } else if (retry_status == RetryStatus::NoOverflow) { |
1370 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow); |
1371 | 0 | } else if (retry_status == RetryStatus::NoRetryLimitExceeded) { |
1372 | 0 | callbacks_->streamInfo().setResponseFlag( |
1373 | 0 | StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded); |
1374 | 0 | } |
1375 | | |
1376 | 0 | return false; |
1377 | 0 | } |
1378 | | |
1379 | | void Filter::onUpstreamReset(Http::StreamResetReason reset_reason, |
1380 | | absl::string_view transport_failure_reason, |
1381 | 121 | UpstreamRequest& upstream_request) { |
1382 | 121 | ENVOY_STREAM_LOG(debug, "upstream reset: reset reason: {}, transport failure reason: {}", |
1383 | 121 | *callbacks_, Http::Utility::resetReasonToString(reset_reason), |
1384 | 121 | transport_failure_reason); |
1385 | | |
1386 | 121 | const bool dropped = reset_reason == Http::StreamResetReason::Overflow; |
1387 | | |
1388 | | // Ignore upstream reset caused by a resource overflow. |
1389 | | // Currently, circuit breakers can only produce this reset reason. |
1390 | | // It means that this reason is cluster-wise, not upstream-related. |
1391 | | // Therefore removing an upstream in the case of an overloaded cluster |
1392 | | // would make the situation even worse. |
1393 | | // https://github.com/envoyproxy/envoy/issues/25487 |
1394 | 121 | if (!dropped) { |
1395 | | // TODO: The reset may also come from upstream over the wire. In this case it should be |
1396 | | // treated as external origin error and distinguished from local origin error. |
1397 | | // This matters only when running OutlierDetection with split_external_local_origin_errors |
1398 | | // config param set to true. |
1399 | 121 | updateOutlierDetection(Upstream::Outlier::Result::LocalOriginConnectFailed, upstream_request, |
1400 | 121 | absl::nullopt); |
1401 | 121 | } |
1402 | | |
1403 | 121 | if (maybeRetryReset(reset_reason, upstream_request, TimeoutRetry::No)) { |
1404 | 0 | return; |
1405 | 0 | } |
1406 | | |
1407 | 121 | const Http::Code error_code = (reset_reason == Http::StreamResetReason::ProtocolError) |
1408 | 121 | ? Http::Code::BadGateway |
1409 | 121 | : Http::Code::ServiceUnavailable; |
1410 | 121 | chargeUpstreamAbort(error_code, dropped, upstream_request); |
1411 | 121 | auto request_ptr = upstream_request.removeFromList(upstream_requests_); |
1412 | 121 | callbacks_->dispatcher().deferredDelete(std::move(request_ptr)); |
1413 | | |
1414 | | // If there are other in-flight requests that might see an upstream response, |
1415 | | // don't return anything downstream. |
1416 | 121 | if (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0) { |
1417 | 0 | return; |
1418 | 0 | } |
1419 | | |
1420 | 121 | const StreamInfo::CoreResponseFlag response_flags = streamResetReasonToResponseFlag(reset_reason); |
1421 | | |
1422 | 121 | const std::string body = |
1423 | 121 | absl::StrCat("upstream connect error or disconnect/reset before headers. ", |
1424 | 121 | (is_retry_ ? "retried and the latest " : ""), |
1425 | 121 | "reset reason: ", Http::Utility::resetReasonToString(reset_reason), |
1426 | 121 | !transport_failure_reason.empty() ? ", transport failure reason: " : "", |
1427 | 121 | transport_failure_reason); |
1428 | 121 | const std::string& basic_details = |
1429 | 121 | downstream_response_started_ ? StreamInfo::ResponseCodeDetails::get().LateUpstreamReset |
1430 | 121 | : StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset; |
1431 | 121 | const std::string details = StringUtil::replaceAllEmptySpace(absl::StrCat( |
1432 | 121 | basic_details, "{", Http::Utility::resetReasonToString(reset_reason), |
1433 | 121 | transport_failure_reason.empty() ? "" : absl::StrCat("|", transport_failure_reason), "}")); |
1434 | 121 | onUpstreamAbort(error_code, response_flags, body, dropped, details); |
1435 | 121 | } |
1436 | | |
1437 | | void Filter::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host, |
1438 | 1.77k | bool pool_success) { |
1439 | 1.77k | if (retry_state_ && host) { |
1440 | 5 | retry_state_->onHostAttempted(host); |
1441 | 5 | } |
1442 | | |
1443 | 1.77k | if (!pool_success) { |
1444 | 0 | return; |
1445 | 0 | } |
1446 | | |
1447 | 1.77k | if (request_vcluster_) { |
1448 | | // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady |
1449 | | // callback. Hence, the upstream request increases the virtual cluster's upstream_rq_total_ stat |
1450 | | // here. |
1451 | 5 | request_vcluster_->stats().upstream_rq_total_.inc(); |
1452 | 5 | } |
1453 | 1.77k | if (route_stats_context_.has_value()) { |
1454 | | // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady |
1455 | | // callback. Hence, the upstream request increases the route level upstream_rq_total_ stat |
1456 | | // here. |
1457 | 0 | route_stats_context_->stats().upstream_rq_total_.inc(); |
1458 | 0 | } |
1459 | 1.77k | } |
1460 | | |
1461 | | StreamInfo::CoreResponseFlag |
1462 | 242 | Filter::streamResetReasonToResponseFlag(Http::StreamResetReason reset_reason) { |
1463 | 242 | switch (reset_reason) { |
1464 | 0 | case Http::StreamResetReason::LocalConnectionFailure: |
1465 | 0 | case Http::StreamResetReason::RemoteConnectionFailure: |
1466 | 0 | case Http::StreamResetReason::ConnectionTimeout: |
1467 | 0 | return StreamInfo::CoreResponseFlag::UpstreamConnectionFailure; |
1468 | 40 | case Http::StreamResetReason::ConnectionTermination: |
1469 | 40 | return StreamInfo::CoreResponseFlag::UpstreamConnectionTermination; |
1470 | 0 | case Http::StreamResetReason::LocalReset: |
1471 | 0 | case Http::StreamResetReason::LocalRefusedStreamReset: |
1472 | 0 | case Http::StreamResetReason::Http1PrematureUpstreamHalfClose: |
1473 | 0 | return StreamInfo::CoreResponseFlag::LocalReset; |
1474 | 0 | case Http::StreamResetReason::Overflow: |
1475 | 0 | return StreamInfo::CoreResponseFlag::UpstreamOverflow; |
1476 | 0 | case Http::StreamResetReason::RemoteReset: |
1477 | 54 | case Http::StreamResetReason::RemoteRefusedStreamReset: |
1478 | 54 | case Http::StreamResetReason::ConnectError: |
1479 | 54 | return StreamInfo::CoreResponseFlag::UpstreamRemoteReset; |
1480 | 148 | case Http::StreamResetReason::ProtocolError: |
1481 | 148 | return StreamInfo::CoreResponseFlag::UpstreamProtocolError; |
1482 | 0 | case Http::StreamResetReason::OverloadManager: |
1483 | 0 | return StreamInfo::CoreResponseFlag::OverloadManager; |
1484 | 242 | } |
1485 | | |
1486 | 0 | PANIC_DUE_TO_CORRUPT_ENUM; |
1487 | 0 | } |
1488 | | |
1489 | | void Filter::handleNon5xxResponseHeaders(absl::optional<Grpc::Status::GrpcStatus> grpc_status, |
1490 | | UpstreamRequest& upstream_request, bool end_stream, |
1491 | 1.63k | uint64_t grpc_to_http_status) { |
1492 | | // We need to defer gRPC success until after we have processed grpc-status in |
1493 | | // the trailers. |
1494 | 1.63k | if (grpc_request_) { |
1495 | 829 | if (end_stream) { |
1496 | 100 | if (grpc_status && !Http::CodeUtility::is5xx(grpc_to_http_status)) { |
1497 | 100 | upstream_request.upstreamHost()->stats().rq_success_.inc(); |
1498 | 100 | } else { |
1499 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1500 | 0 | } |
1501 | 729 | } else { |
1502 | 729 | upstream_request.grpcRqSuccessDeferred(true); |
1503 | 729 | } |
1504 | 829 | } else { |
1505 | 805 | upstream_request.upstreamHost()->stats().rq_success_.inc(); |
1506 | 805 | } |
1507 | 1.63k | } |
1508 | | |
1509 | | void Filter::onUpstream1xxHeaders(Http::ResponseHeaderMapPtr&& headers, |
1510 | 1 | UpstreamRequest& upstream_request) { |
1511 | 1 | const uint64_t response_code = Http::Utility::getResponseStatus(*headers); |
1512 | 1 | chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false); |
1513 | 1 | ENVOY_STREAM_LOG(debug, "upstream 1xx ({}).", *callbacks_, response_code); |
1514 | | |
1515 | 1 | downstream_response_started_ = true; |
1516 | 1 | final_upstream_request_ = &upstream_request; |
1517 | 1 | resetOtherUpstreams(upstream_request); |
1518 | | |
1519 | | // Don't send retries after 100-Continue has been sent on. Arguably we could attempt to do a |
1520 | | // retry, assume the next upstream would also send an 100-Continue and swallow the second one |
1521 | | // but it's sketchy (as the subsequent upstream might not send a 100-Continue) and not worth |
1522 | | // the complexity until someone asks for it. |
1523 | 1 | retry_state_.reset(); |
1524 | | |
1525 | 1 | callbacks_->encode1xxHeaders(std::move(headers)); |
1526 | 1 | } |
1527 | | |
1528 | 3.06k | void Filter::resetAll() { |
1529 | 3.85k | while (!upstream_requests_.empty()) { |
1530 | 793 | auto request_ptr = upstream_requests_.back()->removeFromList(upstream_requests_); |
1531 | 793 | request_ptr->resetStream(); |
1532 | 793 | callbacks_->dispatcher().deferredDelete(std::move(request_ptr)); |
1533 | 793 | } |
1534 | 3.06k | } |
1535 | | |
1536 | 1.63k | void Filter::resetOtherUpstreams(UpstreamRequest& upstream_request) { |
1537 | | // Pop each upstream request on the list and reset it if it's not the one |
1538 | | // provided. At the end we'll move it back into the list. |
1539 | 1.63k | UpstreamRequestPtr final_upstream_request; |
1540 | 3.27k | while (!upstream_requests_.empty()) { |
1541 | 1.63k | UpstreamRequestPtr upstream_request_tmp = |
1542 | 1.63k | upstream_requests_.back()->removeFromList(upstream_requests_); |
1543 | 1.63k | if (upstream_request_tmp.get() != &upstream_request) { |
1544 | 0 | upstream_request_tmp->resetStream(); |
1545 | | // TODO: per-host stat for hedge abandoned. |
1546 | | // TODO: cluster stat for hedge abandoned. |
1547 | 1.63k | } else { |
1548 | 1.63k | final_upstream_request = std::move(upstream_request_tmp); |
1549 | 1.63k | } |
1550 | 1.63k | } |
1551 | | |
1552 | 1.63k | ASSERT(final_upstream_request); |
1553 | | // Now put the final request back on this list. |
1554 | 1.63k | LinkedList::moveIntoList(std::move(final_upstream_request), upstream_requests_); |
1555 | 1.63k | } |
1556 | | |
1557 | | void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers, |
1558 | 1.63k | UpstreamRequest& upstream_request, bool end_stream) { |
1559 | 1.63k | ENVOY_STREAM_LOG(debug, "upstream headers complete: end_stream={}", *callbacks_, end_stream); |
1560 | | |
1561 | 1.63k | modify_headers_(*headers); |
1562 | | // When grpc-status appears in response headers, convert grpc-status to HTTP status code |
1563 | | // for outlier detection. This does not currently change any stats or logging and does not |
1564 | | // handle the case when an error grpc-status is sent as a trailer. |
1565 | 1.63k | absl::optional<Grpc::Status::GrpcStatus> grpc_status; |
1566 | 1.63k | uint64_t grpc_to_http_status = 0; |
1567 | 1.63k | if (grpc_request_) { |
1568 | 829 | grpc_status = Grpc::Common::getGrpcStatus(*headers); |
1569 | 829 | if (grpc_status.has_value()) { |
1570 | 100 | grpc_to_http_status = Grpc::Utility::grpcToHttpStatus(grpc_status.value()); |
1571 | 100 | } |
1572 | 829 | } |
1573 | | |
1574 | 1.63k | maybeProcessOrcaLoadReport(*headers, upstream_request); |
1575 | | |
1576 | 1.63k | if (grpc_status.has_value()) { |
1577 | 100 | upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(grpc_to_http_status); |
1578 | 1.53k | } else { |
1579 | 1.53k | upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(response_code); |
1580 | 1.53k | } |
1581 | | |
1582 | 1.63k | if (headers->EnvoyImmediateHealthCheckFail() != nullptr) { |
1583 | 0 | upstream_request.upstreamHost()->healthChecker().setUnhealthy( |
1584 | 0 | Upstream::HealthCheckHostMonitor::UnhealthyType::ImmediateHealthCheckFail); |
1585 | 0 | } |
1586 | | |
1587 | 1.63k | bool could_not_retry = false; |
1588 | | |
1589 | | // Check if this upstream request was already retried, for instance after |
1590 | | // hitting a per try timeout. Don't retry it if we already have. |
1591 | 1.63k | if (retry_state_) { |
1592 | 0 | if (upstream_request.retried()) { |
1593 | | // We already retried this request (presumably for a per try timeout) so |
1594 | | // we definitely won't retry it again. Check if we would have retried it |
1595 | | // if we could. |
1596 | 0 | bool retry_as_early_data; // Not going to be used as we are not retrying. |
1597 | 0 | could_not_retry = retry_state_->wouldRetryFromHeaders(*headers, *downstream_headers_, |
1598 | 0 | retry_as_early_data) != |
1599 | 0 | RetryState::RetryDecision::NoRetry; |
1600 | 0 | } else { |
1601 | 0 | const RetryStatus retry_status = retry_state_->shouldRetryHeaders( |
1602 | 0 | *headers, *downstream_headers_, |
1603 | 0 | [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_, |
1604 | 0 | had_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_]( |
1605 | 0 | bool disable_early_data) -> void { |
1606 | 0 | doRetry((disable_early_data ? false : had_early_data), can_use_http3, TimeoutRetry::No); |
1607 | 0 | }); |
1608 | 0 | if (retry_status == RetryStatus::Yes) { |
1609 | 0 | runRetryOptionsPredicates(upstream_request); |
1610 | 0 | pending_retries_++; |
1611 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1612 | 0 | Http::CodeStats& code_stats = httpContext().codeStats(); |
1613 | 0 | code_stats.chargeBasicResponseStat(cluster_->statsScope(), stats_.stat_names_.retry_, |
1614 | 0 | static_cast<Http::Code>(response_code), |
1615 | 0 | exclude_http_code_stats_); |
1616 | |
|
1617 | 0 | if (!end_stream || !upstream_request.encodeComplete()) { |
1618 | 0 | upstream_request.resetStream(); |
1619 | 0 | } |
1620 | 0 | auto request_ptr = upstream_request.removeFromList(upstream_requests_); |
1621 | 0 | callbacks_->dispatcher().deferredDelete(std::move(request_ptr)); |
1622 | 0 | return; |
1623 | 0 | } else if (retry_status == RetryStatus::NoOverflow) { |
1624 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow); |
1625 | 0 | could_not_retry = true; |
1626 | 0 | } else if (retry_status == RetryStatus::NoRetryLimitExceeded) { |
1627 | 0 | callbacks_->streamInfo().setResponseFlag( |
1628 | 0 | StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded); |
1629 | 0 | could_not_retry = true; |
1630 | 0 | } |
1631 | 0 | } |
1632 | 0 | } |
1633 | | |
1634 | 1.63k | if (route_entry_->internalRedirectPolicy().enabled() && |
1635 | 1.63k | route_entry_->internalRedirectPolicy().shouldRedirectForResponseCode( |
1636 | 0 | static_cast<Http::Code>(response_code)) && |
1637 | 1.63k | setupRedirect(*headers)) { |
1638 | 0 | return; |
1639 | | // If the redirect could not be handled, fail open and let it pass to the |
1640 | | // next downstream. |
1641 | 0 | } |
1642 | | |
1643 | | // Check if we got a "bad" response, but there are still upstream requests in |
1644 | | // flight awaiting headers or scheduled retries. If so, exit to give them a |
1645 | | // chance to return before returning a response downstream. |
1646 | 1.63k | if (could_not_retry && (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0)) { |
1647 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1648 | | |
1649 | | // Reset the stream because there are other in-flight requests that we'll |
1650 | | // wait around for and we're not interested in consuming any body/trailers. |
1651 | 0 | auto request_ptr = upstream_request.removeFromList(upstream_requests_); |
1652 | 0 | request_ptr->resetStream(); |
1653 | 0 | callbacks_->dispatcher().deferredDelete(std::move(request_ptr)); |
1654 | 0 | return; |
1655 | 0 | } |
1656 | | |
1657 | | // Make sure any retry timers are destroyed since we may not call cleanup() if end_stream is |
1658 | | // false. |
1659 | 1.63k | if (retry_state_) { |
1660 | 0 | retry_state_.reset(); |
1661 | 0 | } |
1662 | | |
1663 | | // Only send upstream service time if we received the complete request and this is not a |
1664 | | // premature response. |
1665 | 1.63k | if (DateUtil::timePointValid(downstream_request_complete_time_)) { |
1666 | 803 | Event::Dispatcher& dispatcher = callbacks_->dispatcher(); |
1667 | 803 | MonotonicTime response_received_time = dispatcher.timeSource().monotonicTime(); |
1668 | 803 | std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
1669 | 803 | response_received_time - downstream_request_complete_time_); |
1670 | 803 | if (!config_->suppress_envoy_headers_) { |
1671 | 803 | headers->setEnvoyUpstreamServiceTime(ms.count()); |
1672 | 803 | } |
1673 | 803 | } |
1674 | | |
1675 | 1.63k | upstream_request.upstreamCanary( |
1676 | 1.63k | (headers->EnvoyUpstreamCanary() && headers->EnvoyUpstreamCanary()->value() == "true") || |
1677 | 1.63k | upstream_request.upstreamHost()->canary()); |
1678 | 1.63k | chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false); |
1679 | 1.63k | if (!Http::CodeUtility::is5xx(response_code)) { |
1680 | 1.63k | handleNon5xxResponseHeaders(grpc_status, upstream_request, end_stream, grpc_to_http_status); |
1681 | 1.63k | } |
1682 | | |
1683 | | // Append routing cookies |
1684 | 1.63k | for (const auto& header_value : downstream_set_cookies_) { |
1685 | 0 | headers->addReferenceKey(Http::Headers::get().SetCookie, header_value); |
1686 | 0 | } |
1687 | | |
1688 | 1.63k | callbacks_->streamInfo().setResponseCodeDetails( |
1689 | 1.63k | StreamInfo::ResponseCodeDetails::get().ViaUpstream); |
1690 | | |
1691 | 1.63k | callbacks_->streamInfo().setResponseCode(response_code); |
1692 | | |
1693 | | // TODO(zuercher): If access to response_headers_to_add (at any level) is ever needed outside |
1694 | | // Router::Filter we'll need to find a better location for this work. One possibility is to |
1695 | | // provide finalizeResponseHeaders functions on the Router::Config and VirtualHost interfaces. |
1696 | 1.63k | route_entry_->finalizeResponseHeaders(*headers, callbacks_->streamInfo()); |
1697 | | |
1698 | 1.63k | downstream_response_started_ = true; |
1699 | 1.63k | final_upstream_request_ = &upstream_request; |
1700 | | // Make sure that for request hedging, we end up with the correct final upstream info. |
1701 | 1.63k | callbacks_->streamInfo().setUpstreamInfo(final_upstream_request_->streamInfo().upstreamInfo()); |
1702 | 1.63k | resetOtherUpstreams(upstream_request); |
1703 | 1.63k | if (end_stream) { |
1704 | 117 | onUpstreamComplete(upstream_request); |
1705 | 117 | } |
1706 | | |
1707 | 1.63k | callbacks_->encodeHeaders(std::move(headers), end_stream, |
1708 | 1.63k | StreamInfo::ResponseCodeDetails::get().ViaUpstream); |
1709 | 1.63k | } |
1710 | | |
1711 | | void Filter::onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request, |
1712 | 3.11k | bool end_stream) { |
1713 | | // This should be true because when we saw headers we either reset the stream |
1714 | | // (hence wouldn't have made it to onUpstreamData) or all other in-flight |
1715 | | // streams. |
1716 | 3.11k | ASSERT(upstream_requests_.size() == 1); |
1717 | 3.11k | if (end_stream) { |
1718 | | // gRPC request termination without trailers is an error. |
1719 | 784 | if (upstream_request.grpcRqSuccessDeferred()) { |
1720 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1721 | 0 | } |
1722 | 784 | onUpstreamComplete(upstream_request); |
1723 | 784 | } |
1724 | | |
1725 | 3.11k | callbacks_->encodeData(data, end_stream); |
1726 | 3.11k | } |
1727 | | |
1728 | | void Filter::onUpstreamTrailers(Http::ResponseTrailerMapPtr&& trailers, |
1729 | 1 | UpstreamRequest& upstream_request) { |
1730 | | // This should be true because when we saw headers we either reset the stream |
1731 | | // (hence wouldn't have made it to onUpstreamTrailers) or all other in-flight |
1732 | | // streams. |
1733 | 1 | ASSERT(upstream_requests_.size() == 1); |
1734 | | |
1735 | 1 | if (upstream_request.grpcRqSuccessDeferred()) { |
1736 | 1 | absl::optional<Grpc::Status::GrpcStatus> grpc_status = Grpc::Common::getGrpcStatus(*trailers); |
1737 | 1 | if (grpc_status && |
1738 | 1 | !Http::CodeUtility::is5xx(Grpc::Utility::grpcToHttpStatus(grpc_status.value()))) { |
1739 | 1 | upstream_request.upstreamHost()->stats().rq_success_.inc(); |
1740 | 1 | } else { |
1741 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1742 | 0 | } |
1743 | 1 | } |
1744 | | |
1745 | 1 | maybeProcessOrcaLoadReport(*trailers, upstream_request); |
1746 | | |
1747 | 1 | onUpstreamComplete(upstream_request); |
1748 | | |
1749 | 1 | callbacks_->encodeTrailers(std::move(trailers)); |
1750 | 1 | } |
1751 | | |
1752 | 293 | void Filter::onUpstreamMetadata(Http::MetadataMapPtr&& metadata_map) { |
1753 | 293 | callbacks_->encodeMetadata(std::move(metadata_map)); |
1754 | 293 | } |
1755 | | |
1756 | 902 | void Filter::onUpstreamComplete(UpstreamRequest& upstream_request) { |
1757 | 902 | if (!downstream_end_stream_) { |
1758 | 103 | if (allow_multiplexed_upstream_half_close_) { |
1759 | | // Continue request if downstream is not done yet. |
1760 | 0 | return; |
1761 | 0 | } |
1762 | 103 | upstream_request.resetStream(); |
1763 | 103 | } |
1764 | 902 | Event::Dispatcher& dispatcher = callbacks_->dispatcher(); |
1765 | 902 | std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>( |
1766 | 902 | dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_); |
1767 | | |
1768 | 902 | Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats(); |
1769 | 902 | if (tb_stats.has_value()) { |
1770 | 0 | tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue( |
1771 | 0 | FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_)); |
1772 | 0 | } |
1773 | | |
1774 | 902 | if (config_->emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck() && |
1775 | 902 | DateUtil::timePointValid(downstream_request_complete_time_)) { |
1776 | 799 | upstream_request.upstreamHost()->outlierDetector().putResponseTime(response_time); |
1777 | 799 | const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_); |
1778 | | |
1779 | 799 | Http::CodeStats& code_stats = httpContext().codeStats(); |
1780 | 799 | Http::CodeStats::ResponseTimingInfo info{ |
1781 | 799 | config_->scope_, |
1782 | 799 | cluster_->statsScope(), |
1783 | 799 | config_->empty_stat_name_, |
1784 | 799 | response_time, |
1785 | 799 | upstream_request.upstreamCanary(), |
1786 | 799 | internal_request, |
1787 | 799 | route_->virtualHost().statName(), |
1788 | 799 | request_vcluster_ ? request_vcluster_->statName() : config_->empty_stat_name_, |
1789 | 799 | route_stats_context_.has_value() ? route_stats_context_->statName() |
1790 | 799 | : config_->empty_stat_name_, |
1791 | 799 | config_->zone_name_, |
1792 | 799 | upstreamZone(upstream_request.upstreamHost())}; |
1793 | | |
1794 | 799 | code_stats.chargeResponseTiming(info); |
1795 | | |
1796 | 799 | if (alt_stat_prefix_ != nullptr) { |
1797 | 0 | Http::CodeStats::ResponseTimingInfo info{config_->scope_, |
1798 | 0 | cluster_->statsScope(), |
1799 | 0 | alt_stat_prefix_->statName(), |
1800 | 0 | response_time, |
1801 | 0 | upstream_request.upstreamCanary(), |
1802 | 0 | internal_request, |
1803 | 0 | config_->empty_stat_name_, |
1804 | 0 | config_->empty_stat_name_, |
1805 | 0 | config_->empty_stat_name_, |
1806 | 0 | config_->zone_name_, |
1807 | 0 | upstreamZone(upstream_request.upstreamHost())}; |
1808 | |
|
1809 | 0 | code_stats.chargeResponseTiming(info); |
1810 | 0 | } |
1811 | 799 | } |
1812 | | |
1813 | | // Defer deletion as this is generally called under the stack of the upstream |
1814 | | // request, and immediate deletion is dangerous. |
1815 | 902 | callbacks_->dispatcher().deferredDelete(upstream_request.removeFromList(upstream_requests_)); |
1816 | 902 | cleanup(); |
1817 | 902 | } |
1818 | | |
1819 | 0 | bool Filter::setupRedirect(const Http::ResponseHeaderMap& headers) { |
1820 | 0 | ENVOY_STREAM_LOG(debug, "attempting internal redirect", *callbacks_); |
1821 | 0 | const Http::HeaderEntry* location = headers.Location(); |
1822 | |
|
1823 | 0 | const uint64_t status_code = Http::Utility::getResponseStatus(headers); |
1824 | | |
1825 | | // Redirects are not supported for streaming requests yet. |
1826 | 0 | if (downstream_end_stream_ && (!request_buffer_overflowed_ || !callbacks_->decodingBuffer()) && |
1827 | 0 | location != nullptr && |
1828 | 0 | convertRequestHeadersForInternalRedirect(*downstream_headers_, headers, *location, |
1829 | 0 | status_code) && |
1830 | 0 | callbacks_->recreateStream(&headers)) { |
1831 | 0 | ENVOY_STREAM_LOG(debug, "Internal redirect succeeded", *callbacks_); |
1832 | 0 | cluster_->trafficStats()->upstream_internal_redirect_succeeded_total_.inc(); |
1833 | 0 | return true; |
1834 | 0 | } |
1835 | | // convertRequestHeadersForInternalRedirect logs failure reasons but log |
1836 | | // details for other failure modes here. |
1837 | 0 | if (!downstream_end_stream_) { |
1838 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: request incomplete", *callbacks_); |
1839 | 0 | } else if (request_buffer_overflowed_) { |
1840 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: request body overflow", *callbacks_); |
1841 | 0 | } else if (location == nullptr) { |
1842 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: missing location header", *callbacks_); |
1843 | 0 | } |
1844 | |
|
1845 | 0 | cluster_->trafficStats()->upstream_internal_redirect_failed_total_.inc(); |
1846 | 0 | return false; |
1847 | 0 | } |
1848 | | |
1849 | | bool Filter::convertRequestHeadersForInternalRedirect( |
1850 | | Http::RequestHeaderMap& downstream_headers, const Http::ResponseHeaderMap& upstream_headers, |
1851 | 0 | const Http::HeaderEntry& internal_redirect, uint64_t status_code) { |
1852 | 0 | if (!downstream_headers.Path()) { |
1853 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: no path in downstream_headers", *callbacks_); |
1854 | 0 | return false; |
1855 | 0 | } |
1856 | | |
1857 | 0 | absl::string_view redirect_url = internal_redirect.value().getStringView(); |
1858 | | // Make sure the redirect response contains a URL to redirect to. |
1859 | 0 | if (redirect_url.empty()) { |
1860 | 0 | stats_.passthrough_internal_redirect_bad_location_.inc(); |
1861 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: empty location", *callbacks_); |
1862 | 0 | return false; |
1863 | 0 | } |
1864 | 0 | Http::Utility::Url absolute_url; |
1865 | 0 | if (!absolute_url.initialize(redirect_url, false)) { |
1866 | 0 | stats_.passthrough_internal_redirect_bad_location_.inc(); |
1867 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: invalid location {}", *callbacks_, |
1868 | 0 | redirect_url); |
1869 | 0 | return false; |
1870 | 0 | } |
1871 | | |
1872 | 0 | const auto& policy = route_entry_->internalRedirectPolicy(); |
1873 | | // Don't change the scheme from the original request |
1874 | 0 | const bool scheme_is_http = schemeIsHttp(downstream_headers, callbacks_->connection()); |
1875 | 0 | const bool target_is_http = Http::Utility::schemeIsHttp(absolute_url.scheme()); |
1876 | 0 | if (!policy.isCrossSchemeRedirectAllowed() && scheme_is_http != target_is_http) { |
1877 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: incorrect scheme for {}", *callbacks_, |
1878 | 0 | redirect_url); |
1879 | 0 | stats_.passthrough_internal_redirect_unsafe_scheme_.inc(); |
1880 | 0 | return false; |
1881 | 0 | } |
1882 | | |
1883 | 0 | const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState(); |
1884 | | // Make sure that performing the redirect won't result in exceeding the configured number of |
1885 | | // redirects allowed for this route. |
1886 | 0 | StreamInfo::UInt32Accessor* num_internal_redirect{}; |
1887 | |
|
1888 | 0 | if (num_internal_redirect = filter_state->getDataMutable<StreamInfo::UInt32Accessor>( |
1889 | 0 | NumInternalRedirectsFilterStateName); |
1890 | 0 | num_internal_redirect == nullptr) { |
1891 | 0 | auto state = std::make_shared<StreamInfo::UInt32AccessorImpl>(0); |
1892 | 0 | num_internal_redirect = state.get(); |
1893 | |
|
1894 | 0 | filter_state->setData(NumInternalRedirectsFilterStateName, std::move(state), |
1895 | 0 | StreamInfo::FilterState::StateType::Mutable, |
1896 | 0 | StreamInfo::FilterState::LifeSpan::Request); |
1897 | 0 | } |
1898 | |
|
1899 | 0 | if (num_internal_redirect->value() >= policy.maxInternalRedirects()) { |
1900 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: redirect limits exceeded.", *callbacks_); |
1901 | 0 | stats_.passthrough_internal_redirect_too_many_redirects_.inc(); |
1902 | 0 | return false; |
1903 | 0 | } |
1904 | | // Copy the old values, so they can be restored if the redirect fails. |
1905 | 0 | const bool scheme_is_set = (downstream_headers.Scheme() != nullptr); |
1906 | |
|
1907 | 0 | std::unique_ptr<Http::RequestHeaderMapImpl> saved_headers = Http::RequestHeaderMapImpl::create(); |
1908 | 0 | Http::RequestHeaderMapImpl::copyFrom(*saved_headers, downstream_headers); |
1909 | |
|
1910 | 0 | for (const Http::LowerCaseString& header : |
1911 | 0 | route_entry_->internalRedirectPolicy().responseHeadersToCopy()) { |
1912 | 0 | Http::HeaderMap::GetResult result = upstream_headers.get(header); |
1913 | 0 | Http::HeaderMap::GetResult downstream_result = downstream_headers.get(header); |
1914 | 0 | if (result.empty()) { |
1915 | | // Clear headers if present, else do nothing: |
1916 | 0 | if (downstream_result.empty()) { |
1917 | 0 | continue; |
1918 | 0 | } |
1919 | 0 | downstream_headers.remove(header); |
1920 | 0 | } else { |
1921 | | // The header exists in the response, copy into the downstream headers |
1922 | 0 | if (!downstream_result.empty()) { |
1923 | 0 | downstream_headers.remove(header); |
1924 | 0 | } |
1925 | 0 | for (size_t idx = 0; idx < result.size(); idx++) { |
1926 | 0 | downstream_headers.addCopy(header, result[idx]->value().getStringView()); |
1927 | 0 | } |
1928 | 0 | } |
1929 | 0 | } |
1930 | |
|
1931 | 0 | Cleanup restore_original_headers( |
1932 | 0 | [&downstream_headers, scheme_is_set, scheme_is_http, &saved_headers]() { |
1933 | 0 | downstream_headers.clear(); |
1934 | 0 | if (scheme_is_set) { |
1935 | 0 | downstream_headers.setScheme(scheme_is_http ? Http::Headers::get().SchemeValues.Http |
1936 | 0 | : Http::Headers::get().SchemeValues.Https); |
1937 | 0 | } |
1938 | |
|
1939 | 0 | Http::RequestHeaderMapImpl::copyFrom(downstream_headers, *saved_headers); |
1940 | 0 | }); |
1941 | | |
1942 | | // Replace the original host, scheme and path. |
1943 | 0 | downstream_headers.setScheme(absolute_url.scheme()); |
1944 | 0 | downstream_headers.setHost(absolute_url.hostAndPort()); |
1945 | |
|
1946 | 0 | auto path_and_query = absolute_url.pathAndQueryParams(); |
1947 | 0 | if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http_reject_path_with_fragment")) { |
1948 | | // Envoy treats internal redirect as a new request and will reject it if URI path |
1949 | | // contains #fragment. However the Location header is allowed to have #fragment in URI path. To |
1950 | | // prevent Envoy from rejecting internal redirect, strip the #fragment from Location URI if it |
1951 | | // is present. |
1952 | 0 | auto fragment_pos = path_and_query.find('#'); |
1953 | 0 | path_and_query = path_and_query.substr(0, fragment_pos); |
1954 | 0 | } |
1955 | 0 | downstream_headers.setPath(path_and_query); |
1956 | | |
1957 | | // Only clear the route cache if there are downstream callbacks. There aren't, for example, |
1958 | | // for async connections. |
1959 | 0 | if (callbacks_->downstreamCallbacks()) { |
1960 | 0 | callbacks_->downstreamCallbacks()->clearRouteCache(); |
1961 | 0 | } |
1962 | 0 | const auto route = callbacks_->route(); |
1963 | | // Don't allow a redirect to a non existing route. |
1964 | 0 | if (!route) { |
1965 | 0 | stats_.passthrough_internal_redirect_no_route_.inc(); |
1966 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: no route found", *callbacks_); |
1967 | 0 | return false; |
1968 | 0 | } |
1969 | | |
1970 | 0 | const auto& route_name = route->routeName(); |
1971 | 0 | for (const auto& predicate : policy.predicates()) { |
1972 | 0 | if (!predicate->acceptTargetRoute(*filter_state, route_name, !scheme_is_http, |
1973 | 0 | !target_is_http)) { |
1974 | 0 | stats_.passthrough_internal_redirect_predicate_.inc(); |
1975 | 0 | ENVOY_STREAM_LOG(trace, |
1976 | 0 | "Internal redirect failed: rejecting redirect targeting {}, by {} predicate", |
1977 | 0 | *callbacks_, route_name, predicate->name()); |
1978 | 0 | return false; |
1979 | 0 | } |
1980 | 0 | } |
1981 | | |
1982 | | // See https://tools.ietf.org/html/rfc7231#section-6.4.4. |
1983 | 0 | if (status_code == enumToInt(Http::Code::SeeOther) && |
1984 | 0 | downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Get && |
1985 | 0 | downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Head) { |
1986 | 0 | downstream_headers.setMethod(Http::Headers::get().MethodValues.Get); |
1987 | 0 | downstream_headers.remove(Http::Headers::get().ContentLength); |
1988 | 0 | callbacks_->modifyDecodingBuffer([](Buffer::Instance& data) { data.drain(data.length()); }); |
1989 | 0 | } |
1990 | |
|
1991 | 0 | num_internal_redirect->increment(); |
1992 | 0 | restore_original_headers.cancel(); |
1993 | | // Preserve the original request URL for the second pass. |
1994 | 0 | downstream_headers.setEnvoyOriginalUrl( |
1995 | 0 | absl::StrCat(scheme_is_http ? Http::Headers::get().SchemeValues.Http |
1996 | 0 | : Http::Headers::get().SchemeValues.Https, |
1997 | 0 | "://", saved_headers->getHostValue(), saved_headers->getPathValue())); |
1998 | 0 | return true; |
1999 | 0 | } |
2000 | | |
2001 | 0 | void Filter::runRetryOptionsPredicates(UpstreamRequest& retriable_request) { |
2002 | 0 | for (const auto& options_predicate : route_entry_->retryPolicy().retryOptionsPredicates()) { |
2003 | 0 | const Upstream::RetryOptionsPredicate::UpdateOptionsParameters parameters{ |
2004 | 0 | retriable_request.streamInfo(), upstreamSocketOptions()}; |
2005 | 0 | auto ret = options_predicate->updateOptions(parameters); |
2006 | 0 | if (ret.new_upstream_socket_options_.has_value()) { |
2007 | 0 | upstream_options_ = ret.new_upstream_socket_options_.value(); |
2008 | 0 | } |
2009 | 0 | } |
2010 | 0 | } |
2011 | | |
2012 | 0 | void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry) { |
2013 | 0 | ENVOY_STREAM_LOG(debug, "performing retry", *callbacks_); |
2014 | |
|
2015 | 0 | is_retry_ = true; |
2016 | 0 | attempt_count_++; |
2017 | 0 | callbacks_->streamInfo().setAttemptCount(attempt_count_); |
2018 | 0 | ASSERT(pending_retries_ > 0); |
2019 | 0 | pending_retries_--; |
2020 | | |
2021 | | // Clusters can technically get removed by CDS during a retry. Make sure it still exists. |
2022 | 0 | const auto cluster = config_->cm_.getThreadLocalCluster(route_entry_->clusterName()); |
2023 | 0 | std::unique_ptr<GenericConnPool> generic_conn_pool; |
2024 | 0 | if (cluster != nullptr) { |
2025 | 0 | cluster_ = cluster->info(); |
2026 | 0 | generic_conn_pool = createConnPool(*cluster); |
2027 | 0 | } |
2028 | |
|
2029 | 0 | if (!generic_conn_pool) { |
2030 | 0 | sendNoHealthyUpstreamResponse(); |
2031 | 0 | cleanup(); |
2032 | 0 | return; |
2033 | 0 | } |
2034 | 0 | UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>( |
2035 | 0 | *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3, |
2036 | 0 | allow_multiplexed_upstream_half_close_ /*enable_half_close*/); |
2037 | |
|
2038 | 0 | if (include_attempt_count_in_request_) { |
2039 | 0 | downstream_headers_->setEnvoyAttemptCount(attempt_count_); |
2040 | 0 | } |
2041 | |
|
2042 | 0 | if (include_timeout_retry_header_in_request_) { |
2043 | 0 | downstream_headers_->setEnvoyIsTimeoutRetry(is_timeout_retry == TimeoutRetry::Yes ? "true" |
2044 | 0 | : "false"); |
2045 | 0 | } |
2046 | | |
2047 | | // The request timeouts only account for time elapsed since the downstream request completed |
2048 | | // which might not have happened yet (in which case zero time has elapsed.) |
2049 | 0 | std::chrono::milliseconds elapsed_time = std::chrono::milliseconds::zero(); |
2050 | |
|
2051 | 0 | if (DateUtil::timePointValid(downstream_request_complete_time_)) { |
2052 | 0 | Event::Dispatcher& dispatcher = callbacks_->dispatcher(); |
2053 | 0 | elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>( |
2054 | 0 | dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_); |
2055 | 0 | } |
2056 | |
|
2057 | 0 | FilterUtility::setTimeoutHeaders(elapsed_time.count(), timeout_, *route_entry_, |
2058 | 0 | *downstream_headers_, !config_->suppress_envoy_headers_, |
2059 | 0 | grpc_request_, hedging_params_.hedge_on_per_try_timeout_); |
2060 | |
|
2061 | 0 | UpstreamRequest* upstream_request_tmp = upstream_request.get(); |
2062 | 0 | LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_); |
2063 | 0 | upstream_requests_.front()->acceptHeadersFromRouter( |
2064 | 0 | !callbacks_->decodingBuffer() && !downstream_trailers_ && downstream_end_stream_); |
2065 | | // It's possible we got immediately reset which means the upstream request we just |
2066 | | // added to the front of the list might have been removed, so we need to check to make |
2067 | | // sure we don't send data on the wrong request. |
2068 | 0 | if (!upstream_requests_.empty() && (upstream_requests_.front().get() == upstream_request_tmp)) { |
2069 | 0 | if (callbacks_->decodingBuffer()) { |
2070 | | // If we are doing a retry we need to make a copy. |
2071 | 0 | Buffer::OwnedImpl copy(*callbacks_->decodingBuffer()); |
2072 | 0 | upstream_requests_.front()->acceptDataFromRouter(copy, !downstream_trailers_ && |
2073 | 0 | downstream_end_stream_); |
2074 | 0 | } |
2075 | |
|
2076 | 0 | if (downstream_trailers_) { |
2077 | 0 | upstream_requests_.front()->acceptTrailersFromRouter(*downstream_trailers_); |
2078 | 0 | } |
2079 | 0 | } |
2080 | 0 | } |
2081 | | |
2082 | 121 | uint32_t Filter::numRequestsAwaitingHeaders() { |
2083 | 121 | return std::count_if(upstream_requests_.begin(), upstream_requests_.end(), |
2084 | 121 | [](const auto& req) -> bool { return req->awaitingHeaders(); }); |
2085 | 121 | } |
2086 | | |
2087 | | bool Filter::checkDropOverload(Upstream::ThreadLocalCluster& cluster, |
2088 | 1.81k | std::function<void(Http::ResponseHeaderMap&)>& modify_headers) { |
2089 | 1.81k | if (cluster.dropOverload().value()) { |
2090 | 0 | ENVOY_STREAM_LOG(debug, "Router filter: cluster DROP_OVERLOAD configuration: {}", *callbacks_, |
2091 | 0 | cluster.dropOverload().value()); |
2092 | 0 | if (config_->random_.bernoulli(cluster.dropOverload())) { |
2093 | 0 | ENVOY_STREAM_LOG(debug, "The request is dropped by DROP_OVERLOAD", *callbacks_); |
2094 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::DropOverLoad); |
2095 | 0 | chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true); |
2096 | 0 | callbacks_->sendLocalReply( |
2097 | 0 | Http::Code::ServiceUnavailable, "drop overload", |
2098 | 0 | [modify_headers, this](Http::ResponseHeaderMap& headers) { |
2099 | 0 | if (!config_->suppress_envoy_headers_) { |
2100 | 0 | headers.addReference(Http::Headers::get().EnvoyDropOverload, |
2101 | 0 | Http::Headers::get().EnvoyDropOverloadValues.True); |
2102 | 0 | } |
2103 | 0 | modify_headers(headers); |
2104 | 0 | }, |
2105 | 0 | absl::nullopt, StreamInfo::ResponseCodeDetails::get().DropOverload); |
2106 | |
|
2107 | 0 | cluster.info()->loadReportStats().upstream_rq_drop_overload_.inc(); |
2108 | 0 | return true; |
2109 | 0 | } |
2110 | 0 | } |
2111 | 1.81k | return false; |
2112 | 1.81k | } |
2113 | | |
2114 | | void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or_trailers, |
2115 | 1.63k | UpstreamRequest& upstream_request) { |
2116 | | // Process the load report only once, so if response has report in headers, |
2117 | | // then don't process it in trailers. |
2118 | 1.63k | if (orca_load_report_received_) { |
2119 | 0 | return; |
2120 | 0 | } |
2121 | | // Check whether we need to send the load report to the LRS or invoke the ORCA |
2122 | | // callbacks. |
2123 | 1.63k | auto host = upstream_request.upstreamHost(); |
2124 | 1.63k | const bool need_to_send_load_report = |
2125 | 1.63k | (host != nullptr) && cluster_->lrsReportMetricNames().has_value(); |
2126 | 1.63k | if (!need_to_send_load_report && !orca_load_report_callbacks_.has_value()) { |
2127 | 1.63k | return; |
2128 | 1.63k | } |
2129 | | |
2130 | 0 | absl::StatusOr<xds::data::orca::v3::OrcaLoadReport> orca_load_report = |
2131 | 0 | Envoy::Orca::parseOrcaLoadReportHeaders(headers_or_trailers); |
2132 | 0 | if (!orca_load_report.ok()) { |
2133 | 0 | ENVOY_STREAM_LOG(trace, "Headers don't have orca load report: {}", *callbacks_, |
2134 | 0 | orca_load_report.status().message()); |
2135 | 0 | return; |
2136 | 0 | } |
2137 | | |
2138 | 0 | orca_load_report_received_ = true; |
2139 | |
|
2140 | 0 | if (need_to_send_load_report) { |
2141 | 0 | ENVOY_STREAM_LOG(trace, "Adding ORCA load report {} to load metrics", *callbacks_, |
2142 | 0 | orca_load_report->DebugString()); |
2143 | 0 | Envoy::Orca::addOrcaLoadReportToLoadMetricStats(cluster_->lrsReportMetricNames().value(), |
2144 | 0 | orca_load_report.value(), |
2145 | 0 | host->loadMetricStats()); |
2146 | 0 | } |
2147 | 0 | if (orca_load_report_callbacks_.has_value()) { |
2148 | 0 | const absl::Status status = orca_load_report_callbacks_->onOrcaLoadReport(*orca_load_report); |
2149 | 0 | if (!status.ok()) { |
2150 | 0 | ENVOY_STREAM_LOG(error, "Failed to invoke OrcaLoadReportCallbacks: {}", *callbacks_, |
2151 | 0 | status.message()); |
2152 | 0 | } |
2153 | 0 | } |
2154 | 0 | } |
2155 | | |
2156 | | RetryStatePtr |
2157 | | ProdFilter::createRetryState(const RetryPolicy& policy, Http::RequestHeaderMap& request_headers, |
2158 | | const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster, |
2159 | | RouteStatsContextOptRef route_stats_context, |
2160 | | Server::Configuration::CommonFactoryContext& context, |
2161 | 1.81k | Event::Dispatcher& dispatcher, Upstream::ResourcePriority priority) { |
2162 | 1.81k | std::unique_ptr<RetryStateImpl> retry_state = |
2163 | 1.81k | RetryStateImpl::create(policy, request_headers, cluster, vcluster, route_stats_context, |
2164 | 1.81k | context, dispatcher, priority); |
2165 | 1.81k | if (retry_state != nullptr && retry_state->isAutomaticallyConfiguredForHttp3()) { |
2166 | | // Since doing retry will make Envoy to buffer the request body, if upstream using HTTP/3 is the |
2167 | | // only reason for doing retry, set the retry shadow buffer limit to 0 so that we don't retry or |
2168 | | // buffer safe requests with body which is not common. |
2169 | 0 | setRetryShadowBufferLimit(0); |
2170 | 0 | } |
2171 | 1.81k | return retry_state; |
2172 | 1.81k | } |
2173 | | |
2174 | | } // namespace Router |
2175 | | } // namespace Envoy |