/proc/self/cwd/source/common/router/router.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/common/router/router.h" |
2 | | |
3 | | #include <algorithm> |
4 | | #include <chrono> |
5 | | #include <cstdint> |
6 | | #include <functional> |
7 | | #include <memory> |
8 | | #include <string> |
9 | | |
10 | | #include "envoy/event/dispatcher.h" |
11 | | #include "envoy/event/timer.h" |
12 | | #include "envoy/grpc/status.h" |
13 | | #include "envoy/http/conn_pool.h" |
14 | | #include "envoy/runtime/runtime.h" |
15 | | #include "envoy/upstream/cluster_manager.h" |
16 | | #include "envoy/upstream/health_check_host_monitor.h" |
17 | | #include "envoy/upstream/upstream.h" |
18 | | |
19 | | #include "source/common/common/assert.h" |
20 | | #include "source/common/common/cleanup.h" |
21 | | #include "source/common/common/empty_string.h" |
22 | | #include "source/common/common/enum_to_int.h" |
23 | | #include "source/common/common/scope_tracker.h" |
24 | | #include "source/common/common/utility.h" |
25 | | #include "source/common/config/utility.h" |
26 | | #include "source/common/grpc/common.h" |
27 | | #include "source/common/http/codes.h" |
28 | | #include "source/common/http/header_map_impl.h" |
29 | | #include "source/common/http/headers.h" |
30 | | #include "source/common/http/message_impl.h" |
31 | | #include "source/common/http/utility.h" |
32 | | #include "source/common/network/application_protocol.h" |
33 | | #include "source/common/network/socket_option_factory.h" |
34 | | #include "source/common/network/transport_socket_options_impl.h" |
35 | | #include "source/common/network/upstream_server_name.h" |
36 | | #include "source/common/network/upstream_socket_options_filter_state.h" |
37 | | #include "source/common/network/upstream_subject_alt_names.h" |
38 | | #include "source/common/router/config_impl.h" |
39 | | #include "source/common/router/debug_config.h" |
40 | | #include "source/common/router/retry_state_impl.h" |
41 | | #include "source/common/runtime/runtime_features.h" |
42 | | #include "source/common/stream_info/uint32_accessor_impl.h" |
43 | | #include "source/common/tracing/http_tracer_impl.h" |
44 | | |
45 | | namespace Envoy { |
46 | | namespace Router { |
47 | | namespace { |
48 | | constexpr char NumInternalRedirectsFilterStateName[] = "num_internal_redirects"; |
49 | | |
50 | 0 | uint32_t getLength(const Buffer::Instance* instance) { return instance ? instance->length() : 0; } |
51 | | |
52 | | bool schemeIsHttp(const Http::RequestHeaderMap& downstream_headers, |
53 | 0 | OptRef<const Network::Connection> connection) { |
54 | 0 | if (Http::Utility::schemeIsHttp(downstream_headers.getSchemeValue())) { |
55 | 0 | return true; |
56 | 0 | } |
57 | 0 | if (connection.has_value() && !connection->ssl()) { |
58 | 0 | return true; |
59 | 0 | } |
60 | 0 | return false; |
61 | 0 | } |
62 | | |
63 | | constexpr uint64_t TimeoutPrecisionFactor = 100; |
64 | | |
65 | | } // namespace |
66 | | |
67 | | FilterConfig::FilterConfig(Stats::StatName stat_prefix, |
68 | | Server::Configuration::FactoryContext& context, |
69 | | ShadowWriterPtr&& shadow_writer, |
70 | | const envoy::extensions::filters::http::router::v3::Router& config) |
71 | | : FilterConfig(stat_prefix, context.localInfo(), context.scope(), context.clusterManager(), |
72 | | context.runtime(), context.api().randomGenerator(), std::move(shadow_writer), |
73 | | PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, dynamic_stats, true), |
74 | | config.start_child_span(), config.suppress_envoy_headers(), |
75 | | config.respect_expected_rq_timeout(), |
76 | | config.suppress_grpc_request_failure_code_stats(), |
77 | | config.has_upstream_log_options() |
78 | | ? config.upstream_log_options().flush_upstream_log_on_upstream_stream() |
79 | | : false, |
80 | | config.strict_check_headers(), context.api().timeSource(), context.httpContext(), |
81 | 2.91k | context.routerContext()) { |
82 | 2.91k | for (const auto& upstream_log : config.upstream_log()) { |
83 | 125 | upstream_logs_.push_back(AccessLog::AccessLogFactory::fromProto(upstream_log, context)); |
84 | 125 | } |
85 | | |
86 | 2.91k | if (config.has_upstream_log_options() && |
87 | 2.91k | config.upstream_log_options().has_upstream_log_flush_interval()) { |
88 | 0 | upstream_log_flush_interval_ = std::chrono::milliseconds(DurationUtil::durationToMilliseconds( |
89 | 0 | config.upstream_log_options().upstream_log_flush_interval())); |
90 | 0 | } |
91 | | |
92 | 2.91k | if (config.upstream_http_filters_size() > 0) { |
93 | 63 | auto& server_factory_ctx = context.getServerFactoryContext(); |
94 | 63 | const Http::FilterChainUtility::FiltersList& upstream_http_filters = |
95 | 63 | config.upstream_http_filters(); |
96 | 63 | std::shared_ptr<Http::UpstreamFilterConfigProviderManager> filter_config_provider_manager = |
97 | 63 | Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager( |
98 | 63 | server_factory_ctx); |
99 | 63 | std::string prefix = context.scope().symbolTable().toString(context.scope().prefix()); |
100 | 63 | upstream_ctx_ = std::make_unique<Upstream::UpstreamFactoryContextImpl>( |
101 | 63 | server_factory_ctx, context.initManager(), context.scope()); |
102 | 63 | Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext, |
103 | 63 | Server::Configuration::UpstreamHttpFilterConfigFactory> |
104 | 63 | helper(*filter_config_provider_manager, server_factory_ctx, context.clusterManager(), |
105 | 63 | *upstream_ctx_, prefix); |
106 | 63 | THROW_IF_NOT_OK(helper.processFilters(upstream_http_filters, "router upstream http", |
107 | 63 | "router upstream http", upstream_http_filter_factories_)); |
108 | 59 | } |
109 | 2.91k | } |
110 | | |
111 | | // Express percentage as [0, TimeoutPrecisionFactor] because stats do not accept floating point |
112 | | // values, and getting multiple significant figures on the histogram would be nice. |
113 | | uint64_t FilterUtility::percentageOfTimeout(const std::chrono::milliseconds response_time, |
114 | 6 | const std::chrono::milliseconds timeout) { |
115 | | // Timeouts of 0 are considered infinite. Any portion of an infinite timeout used is still |
116 | | // none of it. |
117 | 6 | if (timeout.count() == 0) { |
118 | 6 | return 0; |
119 | 6 | } |
120 | | |
121 | 0 | return static_cast<uint64_t>(response_time.count() * TimeoutPrecisionFactor / timeout.count()); |
122 | 6 | } |
123 | | |
124 | 24.7k | void FilterUtility::setUpstreamScheme(Http::RequestHeaderMap& headers, bool downstream_secure) { |
125 | 24.7k | if (Http::Utility::schemeIsValid(headers.getSchemeValue())) { |
126 | 1.28k | return; |
127 | 1.28k | } |
128 | | // After all the changes in https://github.com/envoyproxy/envoy/issues/14587 |
129 | | // this path should only occur if a buggy filter has removed the :scheme |
130 | | // header. In that case best-effort set from X-Forwarded-Proto. |
131 | 23.5k | absl::string_view xfp = headers.getForwardedProtoValue(); |
132 | 23.5k | if (Http::Utility::schemeIsValid(xfp)) { |
133 | 0 | headers.setScheme(xfp); |
134 | 0 | return; |
135 | 0 | } |
136 | | |
137 | 23.5k | if (downstream_secure) { |
138 | 0 | headers.setReferenceScheme(Http::Headers::get().SchemeValues.Https); |
139 | 23.5k | } else { |
140 | 23.5k | headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http); |
141 | 23.5k | } |
142 | 23.5k | } |
143 | | |
144 | | bool FilterUtility::shouldShadow(const ShadowPolicy& policy, Runtime::Loader& runtime, |
145 | 0 | uint64_t stable_random) { |
146 | | |
147 | | // The policy's default value is set correctly regardless of whether there is a runtime key |
148 | | // or not, thus this call is sufficient for all cases (100% if no runtime set, otherwise |
149 | | // using the default value within the runtime fractional percent setting). |
150 | 0 | return runtime.snapshot().featureEnabled(policy.runtimeKey(), policy.defaultValue(), |
151 | 0 | stable_random); |
152 | 0 | } |
153 | | |
154 | | TimeoutData FilterUtility::finalTimeout(const RouteEntry& route, |
155 | | Http::RequestHeaderMap& request_headers, |
156 | | bool insert_envoy_expected_request_timeout_ms, |
157 | | bool grpc_request, bool per_try_timeout_hedging_enabled, |
158 | 2.40k | bool respect_expected_rq_timeout) { |
159 | | // See if there is a user supplied timeout in a request header. If there is we take that. |
160 | | // Otherwise if the request is gRPC and a maximum gRPC timeout is configured we use the timeout |
161 | | // in the gRPC headers (or infinity when gRPC headers have no timeout), but cap that timeout to |
162 | | // the configured maximum gRPC timeout (which may also be infinity, represented by a 0 value), |
163 | | // or the default from the route config otherwise. |
164 | 2.40k | TimeoutData timeout; |
165 | 2.40k | if (!route.usingNewTimeouts()) { |
166 | 2.40k | if (grpc_request && route.maxGrpcTimeout()) { |
167 | 0 | const std::chrono::milliseconds max_grpc_timeout = route.maxGrpcTimeout().value(); |
168 | 0 | auto header_timeout = Grpc::Common::getGrpcTimeout(request_headers); |
169 | 0 | std::chrono::milliseconds grpc_timeout = |
170 | 0 | header_timeout ? header_timeout.value() : std::chrono::milliseconds(0); |
171 | 0 | if (route.grpcTimeoutOffset()) { |
172 | | // We only apply the offset if it won't result in grpc_timeout hitting 0 or below, as |
173 | | // setting it to 0 means infinity and a negative timeout makes no sense. |
174 | 0 | const auto offset = *route.grpcTimeoutOffset(); |
175 | 0 | if (offset < grpc_timeout) { |
176 | 0 | grpc_timeout -= offset; |
177 | 0 | } |
178 | 0 | } |
179 | | |
180 | | // Cap gRPC timeout to the configured maximum considering that 0 means infinity. |
181 | 0 | if (max_grpc_timeout != std::chrono::milliseconds(0) && |
182 | 0 | (grpc_timeout == std::chrono::milliseconds(0) || grpc_timeout > max_grpc_timeout)) { |
183 | 0 | grpc_timeout = max_grpc_timeout; |
184 | 0 | } |
185 | 0 | timeout.global_timeout_ = grpc_timeout; |
186 | 2.40k | } else { |
187 | 2.40k | timeout.global_timeout_ = route.timeout(); |
188 | 2.40k | } |
189 | 2.40k | } |
190 | 2.40k | timeout.per_try_timeout_ = route.retryPolicy().perTryTimeout(); |
191 | 2.40k | timeout.per_try_idle_timeout_ = route.retryPolicy().perTryIdleTimeout(); |
192 | | |
193 | 2.40k | uint64_t header_timeout; |
194 | | |
195 | 2.40k | if (respect_expected_rq_timeout) { |
196 | | // Check if there is timeout set by egress Envoy. |
197 | | // If present, use that value as route timeout and don't override |
198 | | // *x-envoy-expected-rq-timeout-ms* header. At this point *x-envoy-upstream-rq-timeout-ms* |
199 | | // header should have been sanitized by egress Envoy. |
200 | 0 | const Http::HeaderEntry* header_expected_timeout_entry = |
201 | 0 | request_headers.EnvoyExpectedRequestTimeoutMs(); |
202 | 0 | if (header_expected_timeout_entry) { |
203 | 0 | trySetGlobalTimeout(*header_expected_timeout_entry, timeout); |
204 | 0 | } else { |
205 | 0 | const Http::HeaderEntry* header_timeout_entry = |
206 | 0 | request_headers.EnvoyUpstreamRequestTimeoutMs(); |
207 | |
|
208 | 0 | if (header_timeout_entry) { |
209 | 0 | trySetGlobalTimeout(*header_timeout_entry, timeout); |
210 | 0 | request_headers.removeEnvoyUpstreamRequestTimeoutMs(); |
211 | 0 | } |
212 | 0 | } |
213 | 2.40k | } else { |
214 | 2.40k | const Http::HeaderEntry* header_timeout_entry = request_headers.EnvoyUpstreamRequestTimeoutMs(); |
215 | | |
216 | 2.40k | if (header_timeout_entry) { |
217 | 0 | trySetGlobalTimeout(*header_timeout_entry, timeout); |
218 | 0 | request_headers.removeEnvoyUpstreamRequestTimeoutMs(); |
219 | 0 | } |
220 | 2.40k | } |
221 | | |
222 | | // See if there is a per try/retry timeout. If it's >= global we just ignore it. |
223 | 2.40k | const absl::string_view per_try_timeout_entry = |
224 | 2.40k | request_headers.getEnvoyUpstreamRequestPerTryTimeoutMsValue(); |
225 | 2.40k | if (!per_try_timeout_entry.empty()) { |
226 | 0 | if (absl::SimpleAtoi(per_try_timeout_entry, &header_timeout)) { |
227 | 0 | timeout.per_try_timeout_ = std::chrono::milliseconds(header_timeout); |
228 | 0 | } |
229 | 0 | request_headers.removeEnvoyUpstreamRequestPerTryTimeoutMs(); |
230 | 0 | } |
231 | | |
232 | 2.40k | if (timeout.per_try_timeout_ >= timeout.global_timeout_ && timeout.global_timeout_.count() != 0) { |
233 | 0 | timeout.per_try_timeout_ = std::chrono::milliseconds(0); |
234 | 0 | } |
235 | | |
236 | 2.40k | setTimeoutHeaders(0, timeout, route, request_headers, insert_envoy_expected_request_timeout_ms, |
237 | 2.40k | grpc_request, per_try_timeout_hedging_enabled); |
238 | | |
239 | 2.40k | return timeout; |
240 | 2.40k | } |
241 | | |
242 | | void FilterUtility::setTimeoutHeaders(uint64_t elapsed_time, const TimeoutData& timeout, |
243 | | const RouteEntry& route, |
244 | | Http::RequestHeaderMap& request_headers, |
245 | | bool insert_envoy_expected_request_timeout_ms, |
246 | 2.40k | bool grpc_request, bool per_try_timeout_hedging_enabled) { |
247 | | |
248 | 2.40k | const uint64_t global_timeout = timeout.global_timeout_.count(); |
249 | | |
250 | | // See if there is any timeout to write in the expected timeout header. |
251 | 2.40k | uint64_t expected_timeout = timeout.per_try_timeout_.count(); |
252 | | |
253 | | // Use the global timeout if no per try timeout was specified or if we're |
254 | | // doing hedging when there are per try timeouts. Either of these scenarios |
255 | | // mean that the upstream server can use the full global timeout. |
256 | 2.40k | if (per_try_timeout_hedging_enabled || expected_timeout == 0) { |
257 | 2.40k | expected_timeout = global_timeout; |
258 | 2.40k | } |
259 | | |
260 | | // If the expected timeout is 0 set no timeout, as Envoy treats 0 as infinite timeout. |
261 | 2.40k | if (expected_timeout > 0) { |
262 | | |
263 | 1.28k | if (global_timeout > 0) { |
264 | 1.28k | if (elapsed_time >= global_timeout) { |
265 | | // We are out of time, but 0 would be an infinite timeout. So instead we send a 1ms timeout |
266 | | // and assume the timers armed by onRequestComplete() will fire very soon. |
267 | 0 | expected_timeout = 1; |
268 | 1.28k | } else { |
269 | 1.28k | expected_timeout = std::min(expected_timeout, global_timeout - elapsed_time); |
270 | 1.28k | } |
271 | 1.28k | } |
272 | | |
273 | 1.28k | if (insert_envoy_expected_request_timeout_ms) { |
274 | 1.27k | request_headers.setEnvoyExpectedRequestTimeoutMs(expected_timeout); |
275 | 1.27k | } |
276 | | |
277 | | // If we've configured max_grpc_timeout, override the grpc-timeout header with |
278 | | // the expected timeout. This ensures that the optional per try timeout is reflected |
279 | | // in grpc-timeout, ensuring that the upstream gRPC server is aware of the actual timeout. |
280 | 1.28k | if (grpc_request && !route.usingNewTimeouts() && route.maxGrpcTimeout()) { |
281 | 0 | Grpc::Common::toGrpcTimeout(std::chrono::milliseconds(expected_timeout), request_headers); |
282 | 0 | } |
283 | 1.28k | } |
284 | 2.40k | } |
285 | | |
286 | | absl::optional<std::chrono::milliseconds> |
287 | 0 | FilterUtility::tryParseHeaderTimeout(const Http::HeaderEntry& header_timeout_entry) { |
288 | 0 | uint64_t header_timeout; |
289 | 0 | if (absl::SimpleAtoi(header_timeout_entry.value().getStringView(), &header_timeout)) { |
290 | 0 | return std::chrono::milliseconds(header_timeout); |
291 | 0 | } |
292 | 0 | return absl::nullopt; |
293 | 0 | } |
294 | | |
295 | | void FilterUtility::trySetGlobalTimeout(const Http::HeaderEntry& header_timeout_entry, |
296 | 0 | TimeoutData& timeout) { |
297 | 0 | const auto timeout_ms = tryParseHeaderTimeout(header_timeout_entry); |
298 | 0 | if (timeout_ms.has_value()) { |
299 | 0 | timeout.global_timeout_ = timeout_ms.value(); |
300 | 0 | } |
301 | 0 | } |
302 | | |
303 | | FilterUtility::HedgingParams |
304 | | FilterUtility::finalHedgingParams(const RouteEntry& route, |
305 | 2.40k | Http::RequestHeaderMap& request_headers) { |
306 | 2.40k | HedgingParams hedging_params; |
307 | 2.40k | hedging_params.hedge_on_per_try_timeout_ = route.hedgePolicy().hedgeOnPerTryTimeout(); |
308 | | |
309 | 2.40k | const Http::HeaderEntry* hedge_on_per_try_timeout_entry = |
310 | 2.40k | request_headers.EnvoyHedgeOnPerTryTimeout(); |
311 | 2.40k | if (hedge_on_per_try_timeout_entry) { |
312 | 0 | if (hedge_on_per_try_timeout_entry->value() == "true") { |
313 | 0 | hedging_params.hedge_on_per_try_timeout_ = true; |
314 | 0 | } |
315 | 0 | if (hedge_on_per_try_timeout_entry->value() == "false") { |
316 | 0 | hedging_params.hedge_on_per_try_timeout_ = false; |
317 | 0 | } |
318 | |
|
319 | 0 | request_headers.removeEnvoyHedgeOnPerTryTimeout(); |
320 | 0 | } |
321 | | |
322 | 2.40k | return hedging_params; |
323 | 2.40k | } |
324 | | |
325 | 2.88k | Filter::~Filter() { |
326 | | // Upstream resources should already have been cleaned. |
327 | 2.88k | ASSERT(upstream_requests_.empty()); |
328 | 2.88k | ASSERT(!retry_state_); |
329 | | |
330 | | // Unregister from shadow stream notifications and cancel active streams. |
331 | 2.88k | for (auto* shadow_stream : shadow_streams_) { |
332 | 0 | shadow_stream->removeDestructorCallback(); |
333 | 0 | shadow_stream->removeWatermarkCallbacks(); |
334 | 0 | shadow_stream->cancel(); |
335 | 0 | } |
336 | 2.88k | } |
337 | | |
338 | | const FilterUtility::StrictHeaderChecker::HeaderCheckResult |
339 | | FilterUtility::StrictHeaderChecker::checkHeader(Http::RequestHeaderMap& headers, |
340 | 0 | const Http::LowerCaseString& target_header) { |
341 | 0 | if (target_header == Http::Headers::get().EnvoyUpstreamRequestTimeoutMs) { |
342 | 0 | return isInteger(headers.EnvoyUpstreamRequestTimeoutMs()); |
343 | 0 | } else if (target_header == Http::Headers::get().EnvoyUpstreamRequestPerTryTimeoutMs) { |
344 | 0 | return isInteger(headers.EnvoyUpstreamRequestPerTryTimeoutMs()); |
345 | 0 | } else if (target_header == Http::Headers::get().EnvoyMaxRetries) { |
346 | 0 | return isInteger(headers.EnvoyMaxRetries()); |
347 | 0 | } else if (target_header == Http::Headers::get().EnvoyRetryOn) { |
348 | 0 | return hasValidRetryFields(headers.EnvoyRetryOn(), &Router::RetryStateImpl::parseRetryOn); |
349 | 0 | } else if (target_header == Http::Headers::get().EnvoyRetryGrpcOn) { |
350 | 0 | return hasValidRetryFields(headers.EnvoyRetryGrpcOn(), |
351 | 0 | &Router::RetryStateImpl::parseRetryGrpcOn); |
352 | 0 | } |
353 | | // Should only validate headers for which we have implemented a validator. |
354 | 0 | PANIC("unexpectedly reached"); |
355 | 0 | } |
356 | | |
357 | 3.36k | Stats::StatName Filter::upstreamZone(Upstream::HostDescriptionConstSharedPtr upstream_host) { |
358 | 3.36k | return upstream_host ? upstream_host->localityZoneStatName() : config_.empty_stat_name_; |
359 | 3.36k | } |
360 | | |
361 | | void Filter::chargeUpstreamCode(uint64_t response_status_code, |
362 | | const Http::ResponseHeaderMap& response_headers, |
363 | | Upstream::HostDescriptionConstSharedPtr upstream_host, |
364 | 2.27k | bool dropped) { |
365 | | // Passing the response_status_code explicitly is an optimization to avoid |
366 | | // multiple calls to slow Http::Utility::getResponseStatus. |
367 | 2.27k | ASSERT(response_status_code == Http::Utility::getResponseStatus(response_headers)); |
368 | 2.27k | if (config_.emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck()) { |
369 | 2.27k | const Http::HeaderEntry* upstream_canary_header = response_headers.EnvoyUpstreamCanary(); |
370 | 2.27k | const bool is_canary = (upstream_canary_header && upstream_canary_header->value() == "true") || |
371 | 2.27k | (upstream_host ? upstream_host->canary() : false); |
372 | 2.27k | const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_); |
373 | | |
374 | 2.27k | Stats::StatName upstream_zone = upstreamZone(upstream_host); |
375 | 2.27k | Http::CodeStats::ResponseStatInfo info{ |
376 | 2.27k | config_.scope_, |
377 | 2.27k | cluster_->statsScope(), |
378 | 2.27k | config_.empty_stat_name_, |
379 | 2.27k | response_status_code, |
380 | 2.27k | internal_request, |
381 | 2.27k | route_entry_->virtualHost().statName(), |
382 | 2.27k | request_vcluster_ ? request_vcluster_->statName() : config_.empty_stat_name_, |
383 | 2.27k | route_stats_context_.has_value() ? route_stats_context_->statName() |
384 | 2.27k | : config_.empty_stat_name_, |
385 | 2.27k | config_.zone_name_, |
386 | 2.27k | upstream_zone, |
387 | 2.27k | is_canary}; |
388 | | |
389 | 2.27k | Http::CodeStats& code_stats = httpContext().codeStats(); |
390 | 2.27k | code_stats.chargeResponseStat(info, exclude_http_code_stats_); |
391 | | |
392 | 2.27k | if (alt_stat_prefix_ != nullptr) { |
393 | 0 | Http::CodeStats::ResponseStatInfo alt_info{config_.scope_, |
394 | 0 | cluster_->statsScope(), |
395 | 0 | alt_stat_prefix_->statName(), |
396 | 0 | response_status_code, |
397 | 0 | internal_request, |
398 | 0 | config_.empty_stat_name_, |
399 | 0 | config_.empty_stat_name_, |
400 | 0 | config_.empty_stat_name_, |
401 | 0 | config_.zone_name_, |
402 | 0 | upstream_zone, |
403 | 0 | is_canary}; |
404 | 0 | code_stats.chargeResponseStat(alt_info, exclude_http_code_stats_); |
405 | 0 | } |
406 | | |
407 | 2.27k | if (dropped) { |
408 | 0 | cluster_->loadReportStats().upstream_rq_dropped_.inc(); |
409 | 0 | } |
410 | 2.27k | if (upstream_host && Http::CodeUtility::is5xx(response_status_code)) { |
411 | 58 | upstream_host->stats().rq_error_.inc(); |
412 | 58 | } |
413 | 2.27k | } |
414 | 2.27k | } |
415 | | |
416 | | void Filter::chargeUpstreamCode(Http::Code code, |
417 | | Upstream::HostDescriptionConstSharedPtr upstream_host, |
418 | 58 | bool dropped) { |
419 | 58 | const uint64_t response_status_code = enumToInt(code); |
420 | 58 | const auto fake_response_headers = Http::createHeaderMap<Http::ResponseHeaderMapImpl>( |
421 | 58 | {{Http::Headers::get().Status, std::to_string(response_status_code)}}); |
422 | 58 | chargeUpstreamCode(response_status_code, *fake_response_headers, upstream_host, dropped); |
423 | 58 | } |
424 | | |
425 | 2.58k | Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) { |
426 | 2.58k | downstream_headers_ = &headers; |
427 | | |
428 | | // Extract debug configuration from filter state. This is used further along to determine whether |
429 | | // we should append cluster and host headers to the response, and whether to forward the request |
430 | | // upstream. |
431 | 2.58k | const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState(); |
432 | 2.58k | const DebugConfig* debug_config = filter_state->getDataReadOnly<DebugConfig>(DebugConfig::key()); |
433 | | |
434 | | // TODO: Maybe add a filter API for this. |
435 | 2.58k | grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers); |
436 | 2.58k | exclude_http_code_stats_ = grpc_request_ && config_.suppress_grpc_request_failure_code_stats_; |
437 | | |
438 | | // Only increment rq total stat if we actually decode headers here. This does not count requests |
439 | | // that get handled by earlier filters. |
440 | 2.58k | stats_.rq_total_.inc(); |
441 | | |
442 | | // Initialize the `modify_headers` function as a no-op (so we don't have to remember to check it |
443 | | // against nullptr before calling it), and feed it behavior later if/when we have cluster info |
444 | | // headers to append. |
445 | 2.58k | std::function<void(Http::ResponseHeaderMap&)> modify_headers = [](Http::ResponseHeaderMap&) {}; |
446 | | |
447 | | // Determine if there is a route entry or a direct response for the request. |
448 | 2.58k | route_ = callbacks_->route(); |
449 | 2.58k | if (!route_) { |
450 | 0 | stats_.no_route_.inc(); |
451 | 0 | ENVOY_STREAM_LOG(debug, "no route match for URL '{}'", *callbacks_, headers.getPathValue()); |
452 | |
|
453 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound); |
454 | 0 | callbacks_->sendLocalReply(Http::Code::NotFound, "", modify_headers, absl::nullopt, |
455 | 0 | StreamInfo::ResponseCodeDetails::get().RouteNotFound); |
456 | 0 | return Http::FilterHeadersStatus::StopIteration; |
457 | 0 | } |
458 | | |
459 | | // Determine if there is a direct response for the request. |
460 | 2.58k | const auto* direct_response = route_->directResponseEntry(); |
461 | 2.58k | if (direct_response != nullptr) { |
462 | 171 | stats_.rq_direct_response_.inc(); |
463 | 171 | direct_response->rewritePathHeader(headers, !config_.suppress_envoy_headers_); |
464 | 171 | callbacks_->sendLocalReply( |
465 | 171 | direct_response->responseCode(), direct_response->responseBody(), |
466 | 171 | [this, direct_response, |
467 | 171 | &request_headers = headers](Http::ResponseHeaderMap& response_headers) -> void { |
468 | 171 | std::string new_uri; |
469 | 171 | if (request_headers.Path()) { |
470 | 171 | new_uri = direct_response->newUri(request_headers); |
471 | 171 | } |
472 | | // See https://tools.ietf.org/html/rfc7231#section-7.1.2. |
473 | 171 | const auto add_location = |
474 | 171 | direct_response->responseCode() == Http::Code::Created || |
475 | 171 | Http::CodeUtility::is3xx(enumToInt(direct_response->responseCode())); |
476 | 171 | if (!new_uri.empty() && add_location) { |
477 | 1 | response_headers.addReferenceKey(Http::Headers::get().Location, new_uri); |
478 | 1 | } |
479 | 171 | direct_response->finalizeResponseHeaders(response_headers, callbacks_->streamInfo()); |
480 | 171 | }, |
481 | 171 | absl::nullopt, StreamInfo::ResponseCodeDetails::get().DirectResponse); |
482 | 171 | return Http::FilterHeadersStatus::StopIteration; |
483 | 171 | } |
484 | | |
485 | | // A route entry matches for the request. |
486 | 2.41k | route_entry_ = route_->routeEntry(); |
487 | | // If there's a route specific limit and it's smaller than general downstream |
488 | | // limits, apply the new cap. |
489 | 2.41k | retry_shadow_buffer_limit_ = |
490 | 2.41k | std::min(retry_shadow_buffer_limit_, route_entry_->retryShadowBufferLimit()); |
491 | 2.41k | if (debug_config && debug_config->append_cluster_) { |
492 | | // The cluster name will be appended to any local or upstream responses from this point. |
493 | 0 | modify_headers = [this, debug_config](Http::ResponseHeaderMap& headers) { |
494 | 0 | headers.addCopy(debug_config->cluster_header_.value_or(Http::Headers::get().EnvoyCluster), |
495 | 0 | route_entry_->clusterName()); |
496 | 0 | }; |
497 | 0 | } |
498 | 2.41k | Upstream::ThreadLocalCluster* cluster = |
499 | 2.41k | config_.cm_.getThreadLocalCluster(route_entry_->clusterName()); |
500 | 2.41k | if (!cluster) { |
501 | 18 | stats_.no_cluster_.inc(); |
502 | 18 | ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName()); |
503 | | |
504 | 18 | callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoClusterFound); |
505 | 18 | callbacks_->sendLocalReply(route_entry_->clusterNotFoundResponseCode(), "", modify_headers, |
506 | 18 | absl::nullopt, |
507 | 18 | StreamInfo::ResponseCodeDetails::get().ClusterNotFound); |
508 | 18 | return Http::FilterHeadersStatus::StopIteration; |
509 | 18 | } |
510 | 2.40k | cluster_ = cluster->info(); |
511 | | |
512 | | // Set up stat prefixes, etc. |
513 | 2.40k | request_vcluster_ = route_entry_->virtualCluster(headers); |
514 | 2.40k | if (request_vcluster_ != nullptr) { |
515 | 6 | callbacks_->streamInfo().setVirtualClusterName(request_vcluster_->name()); |
516 | 6 | } |
517 | 2.40k | route_stats_context_ = route_entry_->routeStatsContext(); |
518 | 2.40k | ENVOY_STREAM_LOG(debug, "cluster '{}' match for URL '{}'", *callbacks_, |
519 | 2.40k | route_entry_->clusterName(), headers.getPathValue()); |
520 | | |
521 | 2.40k | if (config_.strict_check_headers_ != nullptr) { |
522 | 0 | for (const auto& header : *config_.strict_check_headers_) { |
523 | 0 | const auto res = FilterUtility::StrictHeaderChecker::checkHeader(headers, header); |
524 | 0 | if (!res.valid_) { |
525 | 0 | callbacks_->streamInfo().setResponseFlag( |
526 | 0 | StreamInfo::ResponseFlag::InvalidEnvoyRequestHeaders); |
527 | 0 | const std::string body = fmt::format("invalid header '{}' with value '{}'", |
528 | 0 | std::string(res.entry_->key().getStringView()), |
529 | 0 | std::string(res.entry_->value().getStringView())); |
530 | 0 | const std::string details = |
531 | 0 | absl::StrCat(StreamInfo::ResponseCodeDetails::get().InvalidEnvoyRequestHeaders, "{", |
532 | 0 | StringUtil::replaceAllEmptySpace(res.entry_->key().getStringView()), "}"); |
533 | 0 | callbacks_->sendLocalReply(Http::Code::BadRequest, body, nullptr, absl::nullopt, details); |
534 | 0 | return Http::FilterHeadersStatus::StopIteration; |
535 | 0 | } |
536 | 0 | } |
537 | 0 | } |
538 | | |
539 | 2.40k | const Http::HeaderEntry* request_alt_name = headers.EnvoyUpstreamAltStatName(); |
540 | 2.40k | if (request_alt_name) { |
541 | 0 | alt_stat_prefix_ = std::make_unique<Stats::StatNameDynamicStorage>( |
542 | 0 | request_alt_name->value().getStringView(), config_.scope_.symbolTable()); |
543 | 0 | headers.removeEnvoyUpstreamAltStatName(); |
544 | 0 | } |
545 | | |
546 | | // See if we are supposed to immediately kill some percentage of this cluster's traffic. |
547 | 2.40k | if (cluster_->maintenanceMode()) { |
548 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow); |
549 | 0 | chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true); |
550 | 0 | callbacks_->sendLocalReply( |
551 | 0 | Http::Code::ServiceUnavailable, "maintenance mode", |
552 | 0 | [modify_headers, this](Http::ResponseHeaderMap& headers) { |
553 | 0 | if (!config_.suppress_envoy_headers_) { |
554 | 0 | headers.addReference(Http::Headers::get().EnvoyOverloaded, |
555 | 0 | Http::Headers::get().EnvoyOverloadedValues.True); |
556 | 0 | } |
557 | | // Note: append_cluster_info does not respect suppress_envoy_headers. |
558 | 0 | modify_headers(headers); |
559 | 0 | }, |
560 | 0 | absl::nullopt, StreamInfo::ResponseCodeDetails::get().MaintenanceMode); |
561 | 0 | cluster_->trafficStats()->upstream_rq_maintenance_mode_.inc(); |
562 | 0 | return Http::FilterHeadersStatus::StopIteration; |
563 | 0 | } |
564 | | |
565 | | // Fetch a connection pool for the upstream cluster. |
566 | 2.40k | const auto& upstream_http_protocol_options = cluster_->upstreamHttpProtocolOptions(); |
567 | | |
568 | 2.40k | if (upstream_http_protocol_options.has_value() && |
569 | 2.40k | (upstream_http_protocol_options.value().auto_sni() || |
570 | 0 | upstream_http_protocol_options.value().auto_san_validation())) { |
571 | | // Default the header to Host/Authority header. |
572 | 0 | absl::string_view header_value = headers.getHostValue(); |
573 | | |
574 | | // Check whether `override_auto_sni_header` is specified. |
575 | 0 | const auto override_auto_sni_header = |
576 | 0 | upstream_http_protocol_options.value().override_auto_sni_header(); |
577 | 0 | if (!override_auto_sni_header.empty()) { |
578 | | // Use the header value from `override_auto_sni_header` to set the SNI value. |
579 | 0 | const auto overridden_header_value = Http::HeaderUtility::getAllOfHeaderAsString( |
580 | 0 | headers, Http::LowerCaseString(override_auto_sni_header)); |
581 | 0 | if (overridden_header_value.result().has_value() && |
582 | 0 | !overridden_header_value.result().value().empty()) { |
583 | 0 | header_value = overridden_header_value.result().value(); |
584 | 0 | } |
585 | 0 | } |
586 | 0 | const auto parsed_authority = Http::Utility::parseAuthority(header_value); |
587 | 0 | bool should_set_sni = !parsed_authority.is_ip_address_; |
588 | | // `host_` returns a string_view so doing this should be safe. |
589 | 0 | absl::string_view sni_value = parsed_authority.host_; |
590 | |
|
591 | 0 | if (should_set_sni && upstream_http_protocol_options.value().auto_sni() && |
592 | 0 | !callbacks_->streamInfo().filterState()->hasDataWithName( |
593 | 0 | Network::UpstreamServerName::key())) { |
594 | 0 | callbacks_->streamInfo().filterState()->setData( |
595 | 0 | Network::UpstreamServerName::key(), |
596 | 0 | std::make_unique<Network::UpstreamServerName>(sni_value), |
597 | 0 | StreamInfo::FilterState::StateType::Mutable); |
598 | 0 | } |
599 | |
|
600 | 0 | if (upstream_http_protocol_options.value().auto_san_validation() && |
601 | 0 | !callbacks_->streamInfo().filterState()->hasDataWithName( |
602 | 0 | Network::UpstreamSubjectAltNames::key())) { |
603 | 0 | callbacks_->streamInfo().filterState()->setData( |
604 | 0 | Network::UpstreamSubjectAltNames::key(), |
605 | 0 | std::make_unique<Network::UpstreamSubjectAltNames>( |
606 | 0 | std::vector<std::string>{std::string(sni_value)}), |
607 | 0 | StreamInfo::FilterState::StateType::Mutable); |
608 | 0 | } |
609 | 0 | } |
610 | | |
611 | 2.40k | transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState( |
612 | 2.40k | *callbacks_->streamInfo().filterState()); |
613 | | |
614 | 2.40k | if (auto downstream_connection = downstreamConnection(); downstream_connection != nullptr) { |
615 | 1.28k | if (auto typed_state = downstream_connection->streamInfo() |
616 | 1.28k | .filterState() |
617 | 1.28k | .getDataReadOnly<Network::UpstreamSocketOptionsFilterState>( |
618 | 1.28k | Network::UpstreamSocketOptionsFilterState::key()); |
619 | 1.28k | typed_state != nullptr) { |
620 | 0 | auto downstream_options = typed_state->value(); |
621 | 0 | if (!upstream_options_) { |
622 | 0 | upstream_options_ = std::make_shared<Network::Socket::Options>(); |
623 | 0 | } |
624 | 0 | Network::Socket::appendOptions(upstream_options_, downstream_options); |
625 | 0 | } |
626 | 1.28k | } |
627 | | |
628 | 2.40k | if (upstream_options_ && callbacks_->getUpstreamSocketOptions()) { |
629 | 0 | Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions()); |
630 | 0 | } |
631 | | |
632 | 2.40k | std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster); |
633 | | |
634 | 2.40k | if (!generic_conn_pool) { |
635 | 0 | sendNoHealthyUpstreamResponse(); |
636 | 0 | return Http::FilterHeadersStatus::StopIteration; |
637 | 0 | } |
638 | 2.40k | Upstream::HostDescriptionConstSharedPtr host = generic_conn_pool->host(); |
639 | | |
640 | 2.40k | if (debug_config && debug_config->append_upstream_host_) { |
641 | | // The hostname and address will be appended to any local or upstream responses from this point, |
642 | | // possibly in addition to the cluster name. |
643 | 0 | modify_headers = [modify_headers, debug_config, host](Http::ResponseHeaderMap& headers) { |
644 | 0 | modify_headers(headers); |
645 | 0 | headers.addCopy( |
646 | 0 | debug_config->hostname_header_.value_or(Http::Headers::get().EnvoyUpstreamHostname), |
647 | 0 | host->hostname()); |
648 | 0 | headers.addCopy(debug_config->host_address_header_.value_or( |
649 | 0 | Http::Headers::get().EnvoyUpstreamHostAddress), |
650 | 0 | host->address()->asString()); |
651 | 0 | }; |
652 | 0 | } |
653 | | |
654 | | // If we've been instructed not to forward the request upstream, send an empty local response. |
655 | 2.40k | if (debug_config && debug_config->do_not_forward_) { |
656 | 0 | modify_headers = [modify_headers, debug_config](Http::ResponseHeaderMap& headers) { |
657 | 0 | modify_headers(headers); |
658 | 0 | headers.addCopy( |
659 | 0 | debug_config->not_forwarded_header_.value_or(Http::Headers::get().EnvoyNotForwarded), |
660 | 0 | "true"); |
661 | 0 | }; |
662 | 0 | callbacks_->sendLocalReply(Http::Code::NoContent, "", modify_headers, absl::nullopt, ""); |
663 | 0 | return Http::FilterHeadersStatus::StopIteration; |
664 | 0 | } |
665 | | |
666 | 2.40k | hedging_params_ = FilterUtility::finalHedgingParams(*route_entry_, headers); |
667 | | |
668 | 2.40k | timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_.suppress_envoy_headers_, |
669 | 2.40k | grpc_request_, hedging_params_.hedge_on_per_try_timeout_, |
670 | 2.40k | config_.respect_expected_rq_timeout_); |
671 | | |
672 | 2.40k | const Http::HeaderEntry* header_max_stream_duration_entry = |
673 | 2.40k | headers.EnvoyUpstreamStreamDurationMs(); |
674 | 2.40k | if (header_max_stream_duration_entry) { |
675 | 0 | dynamic_max_stream_duration_ = |
676 | 0 | FilterUtility::tryParseHeaderTimeout(*header_max_stream_duration_entry); |
677 | 0 | headers.removeEnvoyUpstreamStreamDurationMs(); |
678 | 0 | } |
679 | | |
680 | | // If this header is set with any value, use an alternate response code on timeout |
681 | 2.40k | if (headers.EnvoyUpstreamRequestTimeoutAltResponse()) { |
682 | 0 | timeout_response_code_ = Http::Code::NoContent; |
683 | 0 | headers.removeEnvoyUpstreamRequestTimeoutAltResponse(); |
684 | 0 | } |
685 | | |
686 | 2.40k | include_attempt_count_in_request_ = route_entry_->includeAttemptCountInRequest(); |
687 | 2.40k | if (include_attempt_count_in_request_) { |
688 | 0 | headers.setEnvoyAttemptCount(attempt_count_); |
689 | 0 | } |
690 | | |
691 | | // The router has reached a point where it is going to try to send a request upstream, |
692 | | // so now modify_headers should attach x-envoy-attempt-count to the downstream response if the |
693 | | // config flag is true. |
694 | 2.40k | if (route_entry_->includeAttemptCountInResponse()) { |
695 | 0 | modify_headers = [modify_headers, this](Http::ResponseHeaderMap& headers) { |
696 | 0 | modify_headers(headers); |
697 | | |
698 | | // This header is added without checking for config_.suppress_envoy_headers_ to mirror what is |
699 | | // done for upstream requests. |
700 | 0 | headers.setEnvoyAttemptCount(attempt_count_); |
701 | 0 | }; |
702 | 0 | } |
703 | 2.40k | callbacks_->streamInfo().setAttemptCount(attempt_count_); |
704 | | |
705 | 2.40k | route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(), |
706 | 2.40k | !config_.suppress_envoy_headers_); |
707 | 2.40k | FilterUtility::setUpstreamScheme( |
708 | 2.40k | headers, callbacks_->streamInfo().downstreamAddressProvider().sslConnection() != nullptr); |
709 | | |
710 | | // Ensure an http transport scheme is selected before continuing with decoding. |
711 | 2.40k | ASSERT(headers.Scheme()); |
712 | | |
713 | 2.40k | retry_state_ = |
714 | 2.40k | createRetryState(route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_, |
715 | 2.40k | route_stats_context_, config_.runtime_, config_.random_, |
716 | 2.40k | callbacks_->dispatcher(), config_.timeSource(), route_entry_->priority()); |
717 | | |
718 | | // Determine which shadow policies to use. It's possible that we don't do any shadowing due to |
719 | | // runtime keys. Also the method CONNECT doesn't support shadowing. |
720 | 2.40k | auto method = headers.getMethodValue(); |
721 | 2.40k | if (method != Http::Headers::get().MethodValues.Connect) { |
722 | 2.40k | for (const auto& shadow_policy : route_entry_->shadowPolicies()) { |
723 | 0 | const auto& policy_ref = *shadow_policy; |
724 | 0 | if (FilterUtility::shouldShadow(policy_ref, config_.runtime_, callbacks_->streamId())) { |
725 | 0 | active_shadow_policies_.push_back(std::cref(policy_ref)); |
726 | 0 | shadow_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*downstream_headers_); |
727 | 0 | } |
728 | 0 | } |
729 | 2.40k | } |
730 | | |
731 | 2.40k | ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers); |
732 | | |
733 | | // Hang onto the modify_headers function for later use in handling upstream responses. |
734 | 2.40k | modify_headers_ = modify_headers; |
735 | | |
736 | 2.40k | const bool can_send_early_data = |
737 | 2.40k | route_entry_->earlyDataPolicy().allowsEarlyDataForRequest(*downstream_headers_); |
738 | | |
739 | 2.40k | include_timeout_retry_header_in_request_ = |
740 | 2.40k | route_entry_->virtualHost().includeIsTimeoutRetryHeader(); |
741 | | |
742 | | // Set initial HTTP/3 use based on the presence of HTTP/1.1 proxy config. |
743 | | // For retries etc, HTTP/3 usability may transition from true to false, but |
744 | | // will never transition from false to true. |
745 | 2.40k | bool can_use_http3 = |
746 | 2.40k | !transport_socket_options_ || !transport_socket_options_->http11ProxyInfo().has_value(); |
747 | 2.40k | UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>( |
748 | 2.40k | *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3); |
749 | 2.40k | LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_); |
750 | 2.40k | upstream_requests_.front()->acceptHeadersFromRouter(end_stream); |
751 | 2.40k | if (streaming_shadows_) { |
752 | | // start the shadow streams. |
753 | 0 | for (const auto& shadow_policy_wrapper : active_shadow_policies_) { |
754 | 0 | const auto& shadow_policy = shadow_policy_wrapper.get(); |
755 | 0 | const absl::optional<absl::string_view> shadow_cluster_name = |
756 | 0 | getShadowCluster(shadow_policy, *downstream_headers_); |
757 | 0 | if (!shadow_cluster_name.has_value()) { |
758 | 0 | continue; |
759 | 0 | } |
760 | 0 | auto shadow_headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_); |
761 | 0 | auto options = |
762 | 0 | Http::AsyncClient::RequestOptions() |
763 | 0 | .setTimeout(timeout_.global_timeout_) |
764 | 0 | .setParentSpan(callbacks_->activeSpan()) |
765 | 0 | .setChildSpanName("mirror") |
766 | 0 | .setSampled(shadow_policy.traceSampled()) |
767 | 0 | .setIsShadow(true) |
768 | 0 | .setBufferAccount(callbacks_->account()) |
769 | | // A buffer limit of 1 is set in the case that retry_shadow_buffer_limit_ == 0, |
770 | | // because a buffer limit of zero on async clients is interpreted as no buffer limit. |
771 | 0 | .setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_); |
772 | 0 | options.setFilterConfig(config_); |
773 | 0 | if (end_stream) { |
774 | | // This is a header-only request, and can be dispatched immediately to the shadow |
775 | | // without waiting. |
776 | 0 | Http::RequestMessagePtr request(new Http::RequestMessageImpl( |
777 | 0 | Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_))); |
778 | 0 | config_.shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request), |
779 | 0 | options); |
780 | 0 | } else { |
781 | 0 | Http::AsyncClient::OngoingRequest* shadow_stream = config_.shadowWriter().streamingShadow( |
782 | 0 | std::string(shadow_cluster_name.value()), std::move(shadow_headers), options); |
783 | 0 | if (shadow_stream != nullptr) { |
784 | 0 | shadow_streams_.insert(shadow_stream); |
785 | 0 | shadow_stream->setDestructorCallback( |
786 | 0 | [this, shadow_stream]() { shadow_streams_.erase(shadow_stream); }); |
787 | 0 | shadow_stream->setWatermarkCallbacks(*callbacks_); |
788 | 0 | } |
789 | 0 | } |
790 | 0 | } |
791 | 0 | } |
792 | 2.40k | if (end_stream) { |
793 | 857 | onRequestComplete(); |
794 | 857 | } |
795 | | |
796 | 2.40k | return Http::FilterHeadersStatus::StopIteration; |
797 | 2.40k | } |
798 | | |
799 | | std::unique_ptr<GenericConnPool> |
800 | 2.40k | Filter::createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster) { |
801 | 2.40k | GenericConnPoolFactory* factory = nullptr; |
802 | 2.40k | if (cluster_->upstreamConfig().has_value()) { |
803 | 0 | factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>( |
804 | 0 | cluster_->upstreamConfig().ref()); |
805 | 0 | ENVOY_BUG(factory != nullptr, |
806 | 0 | fmt::format("invalid factory type '{}', failing over to default upstream", |
807 | 0 | cluster_->upstreamConfig().ref().DebugString())); |
808 | 0 | } |
809 | 2.40k | if (!factory) { |
810 | 2.40k | factory = &config_.router_context_.genericConnPoolFactory(); |
811 | 2.40k | } |
812 | | |
813 | 2.40k | using UpstreamProtocol = Envoy::Router::GenericConnPoolFactory::UpstreamProtocol; |
814 | 2.40k | UpstreamProtocol upstream_protocol = UpstreamProtocol::HTTP; |
815 | 2.40k | if (route_entry_->connectConfig().has_value()) { |
816 | 0 | auto method = downstream_headers_->getMethodValue(); |
817 | 0 | if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.enable_connect_udp_support") && |
818 | 0 | Http::HeaderUtility::isConnectUdpRequest(*downstream_headers_)) { |
819 | 0 | upstream_protocol = UpstreamProtocol::UDP; |
820 | 0 | } else if (method == Http::Headers::get().MethodValues.Connect || |
821 | 0 | (route_entry_->connectConfig()->allow_post() && |
822 | 0 | method == Http::Headers::get().MethodValues.Post)) { |
823 | | // Allow POST for proxying raw TCP if it is configured. |
824 | 0 | upstream_protocol = UpstreamProtocol::TCP; |
825 | 0 | } |
826 | 0 | } |
827 | 2.40k | return factory->createGenericConnPool(thread_local_cluster, upstream_protocol, |
828 | 2.40k | route_entry_->priority(), |
829 | 2.40k | callbacks_->streamInfo().protocol(), this); |
830 | 2.40k | } |
831 | | |
832 | 0 | void Filter::sendNoHealthyUpstreamResponse() { |
833 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream); |
834 | 0 | chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, false); |
835 | 0 | callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream", modify_headers_, |
836 | 0 | absl::nullopt, |
837 | 0 | StreamInfo::ResponseCodeDetails::get().NoHealthyUpstream); |
838 | 0 | } |
839 | | |
840 | 4.66k | Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) { |
841 | | // upstream_requests_.size() cannot be > 1 because that only happens when a per |
842 | | // try timeout occurs with hedge_on_per_try_timeout enabled but the per |
843 | | // try timeout timer is not started until onRequestComplete(). It could be zero |
844 | | // if the first request attempt has already failed and a retry is waiting for |
845 | | // a backoff timer. |
846 | 4.66k | ASSERT(upstream_requests_.size() <= 1); |
847 | | |
848 | 4.66k | bool buffering = (retry_state_ && retry_state_->enabled()) || |
849 | 4.66k | (!active_shadow_policies_.empty() && !streaming_shadows_) || |
850 | 4.66k | (route_entry_ && route_entry_->internalRedirectPolicy().enabled()); |
851 | 4.66k | if (buffering && |
852 | 4.66k | getLength(callbacks_->decodingBuffer()) + data.length() > retry_shadow_buffer_limit_) { |
853 | 0 | ENVOY_LOG(debug, |
854 | 0 | "The request payload has at least {} bytes data which exceeds buffer limit {}. Give " |
855 | 0 | "up on the retry/shadow.", |
856 | 0 | getLength(callbacks_->decodingBuffer()) + data.length(), retry_shadow_buffer_limit_); |
857 | 0 | cluster_->trafficStats()->retry_or_shadow_abandoned_.inc(); |
858 | 0 | retry_state_.reset(); |
859 | 0 | buffering = false; |
860 | 0 | active_shadow_policies_.clear(); |
861 | 0 | request_buffer_overflowed_ = true; |
862 | | |
863 | | // If we had to abandon buffering and there's no request in progress, abort the request and |
864 | | // clean up. This happens if the initial upstream request failed, and we are currently waiting |
865 | | // for a backoff timer before starting the next upstream attempt. |
866 | 0 | if (upstream_requests_.empty()) { |
867 | 0 | cleanup(); |
868 | 0 | callbacks_->sendLocalReply( |
869 | 0 | Http::Code::InsufficientStorage, "exceeded request buffer limit while retrying upstream", |
870 | 0 | modify_headers_, absl::nullopt, |
871 | 0 | StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit); |
872 | 0 | return Http::FilterDataStatus::StopIterationNoBuffer; |
873 | 0 | } |
874 | 0 | } |
875 | | |
876 | | // If we aren't buffering and there is no active request, an abort should have occurred |
877 | | // already. |
878 | 4.66k | ASSERT(buffering || !upstream_requests_.empty()); |
879 | | |
880 | 4.66k | for (auto* shadow_stream : shadow_streams_) { |
881 | 0 | if (end_stream) { |
882 | 0 | shadow_stream->removeDestructorCallback(); |
883 | 0 | shadow_stream->removeWatermarkCallbacks(); |
884 | 0 | } |
885 | 0 | Buffer::OwnedImpl copy(data); |
886 | 0 | shadow_stream->sendData(copy, end_stream); |
887 | 0 | } |
888 | 4.66k | if (end_stream) { |
889 | 1.35k | shadow_streams_.clear(); |
890 | 1.35k | } |
891 | 4.66k | if (buffering) { |
892 | 0 | if (!upstream_requests_.empty()) { |
893 | 0 | Buffer::OwnedImpl copy(data); |
894 | 0 | upstream_requests_.front()->acceptDataFromRouter(copy, end_stream); |
895 | 0 | } |
896 | | |
897 | | // If we are potentially going to retry or buffer shadow this request we need to buffer. |
898 | | // This will not cause the connection manager to 413 because before we hit the |
899 | | // buffer limit we give up on retries and buffering. We must buffer using addDecodedData() |
900 | | // so that all buffered data is available by the time we do request complete processing and |
901 | | // potentially shadow. Additionally, we can't do a copy here because there's a check down |
902 | | // this stack for whether `data` is the same buffer as already buffered data. |
903 | 0 | callbacks_->addDecodedData(data, true); |
904 | 4.66k | } else { |
905 | 4.66k | upstream_requests_.front()->acceptDataFromRouter(data, end_stream); |
906 | 4.66k | } |
907 | | |
908 | 4.66k | if (end_stream) { |
909 | 1.35k | onRequestComplete(); |
910 | 1.35k | } |
911 | | |
912 | 4.66k | return Http::FilterDataStatus::StopIterationNoBuffer; |
913 | 4.66k | } |
914 | | |
915 | 0 | Http::FilterTrailersStatus Filter::decodeTrailers(Http::RequestTrailerMap& trailers) { |
916 | 0 | ENVOY_STREAM_LOG(debug, "router decoding trailers:\n{}", *callbacks_, trailers); |
917 | |
|
918 | 0 | if (shadow_headers_) { |
919 | 0 | shadow_trailers_ = Http::createHeaderMap<Http::RequestTrailerMapImpl>(trailers); |
920 | 0 | } |
921 | | |
922 | | // upstream_requests_.size() cannot be > 1 because that only happens when a per |
923 | | // try timeout occurs with hedge_on_per_try_timeout enabled but the per |
924 | | // try timeout timer is not started until onRequestComplete(). It could be zero |
925 | | // if the first request attempt has already failed and a retry is waiting for |
926 | | // a backoff timer. |
927 | 0 | ASSERT(upstream_requests_.size() <= 1); |
928 | 0 | downstream_trailers_ = &trailers; |
929 | 0 | if (!upstream_requests_.empty()) { |
930 | 0 | upstream_requests_.front()->acceptTrailersFromRouter(trailers); |
931 | 0 | } |
932 | 0 | for (auto* shadow_stream : shadow_streams_) { |
933 | 0 | shadow_stream->removeDestructorCallback(); |
934 | 0 | shadow_stream->removeWatermarkCallbacks(); |
935 | 0 | shadow_stream->captureAndSendTrailers( |
936 | 0 | Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_)); |
937 | 0 | } |
938 | 0 | shadow_streams_.clear(); |
939 | |
|
940 | 0 | onRequestComplete(); |
941 | 0 | return Http::FilterTrailersStatus::StopIteration; |
942 | 0 | } |
943 | | |
944 | 6 | Http::FilterMetadataStatus Filter::decodeMetadata(Http::MetadataMap& metadata_map) { |
945 | 6 | Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map); |
946 | 6 | if (!upstream_requests_.empty()) { |
947 | | // TODO(soya3129): Save metadata for retry, redirect and shadowing case. |
948 | 6 | upstream_requests_.front()->acceptMetadataFromRouter(std::move(metadata_map_ptr)); |
949 | 6 | } |
950 | 6 | return Http::FilterMetadataStatus::Continue; |
951 | 6 | } |
952 | | |
953 | 2.88k | void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { |
954 | 2.88k | callbacks_ = &callbacks; |
955 | | // As the decoder filter only pushes back via watermarks once data has reached |
956 | | // it, it can latch the current buffer limit and does not need to update the |
957 | | // limit if another filter increases it. |
958 | | // |
959 | | // The default is "do not limit". If there are configured (non-zero) buffer |
960 | | // limits, apply them here. |
961 | 2.88k | if (callbacks_->decoderBufferLimit() != 0) { |
962 | 1.74k | retry_shadow_buffer_limit_ = callbacks_->decoderBufferLimit(); |
963 | 1.74k | } |
964 | 2.88k | } |
965 | | |
966 | 5.25k | void Filter::cleanup() { |
967 | | // All callers of cleanup() should have cleaned out the upstream_requests_ |
968 | | // list as appropriate. |
969 | 5.25k | ASSERT(upstream_requests_.empty()); |
970 | | |
971 | 5.25k | retry_state_.reset(); |
972 | 5.25k | if (response_timeout_) { |
973 | 1.20k | response_timeout_->disableTimer(); |
974 | 1.20k | response_timeout_.reset(); |
975 | 1.20k | } |
976 | 5.25k | } |
977 | | |
978 | | absl::optional<absl::string_view> Filter::getShadowCluster(const ShadowPolicy& policy, |
979 | 0 | const Http::HeaderMap& headers) const { |
980 | 0 | if (!policy.cluster().empty()) { |
981 | 0 | return policy.cluster(); |
982 | 0 | } else { |
983 | 0 | ASSERT(!policy.clusterHeader().get().empty()); |
984 | 0 | const auto entry = headers.get(policy.clusterHeader()); |
985 | 0 | if (!entry.empty() && !entry[0]->value().empty()) { |
986 | 0 | return entry[0]->value().getStringView(); |
987 | 0 | } |
988 | 0 | ENVOY_STREAM_LOG(debug, "There is no cluster name in header: {}", *callbacks_, |
989 | 0 | policy.clusterHeader()); |
990 | 0 | return absl::nullopt; |
991 | 0 | } |
992 | 0 | } |
993 | | |
994 | 2.21k | void Filter::maybeDoShadowing() { |
995 | 2.21k | for (const auto& shadow_policy_wrapper : active_shadow_policies_) { |
996 | 0 | const auto& shadow_policy = shadow_policy_wrapper.get(); |
997 | |
|
998 | 0 | const absl::optional<absl::string_view> shadow_cluster_name = |
999 | 0 | getShadowCluster(shadow_policy, *downstream_headers_); |
1000 | | |
1001 | | // The cluster name got from headers is empty. |
1002 | 0 | if (!shadow_cluster_name.has_value()) { |
1003 | 0 | continue; |
1004 | 0 | } |
1005 | | |
1006 | 0 | Http::RequestMessagePtr request(new Http::RequestMessageImpl( |
1007 | 0 | Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_))); |
1008 | 0 | if (callbacks_->decodingBuffer()) { |
1009 | 0 | request->body().add(*callbacks_->decodingBuffer()); |
1010 | 0 | } |
1011 | 0 | if (shadow_trailers_) { |
1012 | 0 | request->trailers(Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_)); |
1013 | 0 | } |
1014 | |
|
1015 | 0 | auto options = Http::AsyncClient::RequestOptions() |
1016 | 0 | .setTimeout(timeout_.global_timeout_) |
1017 | 0 | .setParentSpan(callbacks_->activeSpan()) |
1018 | 0 | .setChildSpanName("mirror") |
1019 | 0 | .setSampled(shadow_policy.traceSampled()) |
1020 | 0 | .setIsShadow(true); |
1021 | 0 | options.setFilterConfig(config_); |
1022 | 0 | config_.shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request), |
1023 | 0 | options); |
1024 | 0 | } |
1025 | 2.21k | } |
1026 | | |
1027 | 2.21k | void Filter::onRequestComplete() { |
1028 | | // This should be called exactly once, when the downstream request has been received in full. |
1029 | 2.21k | ASSERT(!downstream_end_stream_); |
1030 | 2.21k | downstream_end_stream_ = true; |
1031 | 2.21k | Event::Dispatcher& dispatcher = callbacks_->dispatcher(); |
1032 | 2.21k | downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime(); |
1033 | | |
1034 | | // Possible that we got an immediate reset. |
1035 | 2.21k | if (!upstream_requests_.empty()) { |
1036 | | // Even if we got an immediate reset, we could still shadow, but that is a riskier change and |
1037 | | // seems unnecessary right now. |
1038 | 2.21k | if (!streaming_shadows_) { |
1039 | 2.21k | maybeDoShadowing(); |
1040 | 2.21k | } |
1041 | | |
1042 | 2.21k | if (timeout_.global_timeout_.count() > 0) { |
1043 | 1.20k | response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); }); |
1044 | 1.20k | response_timeout_->enableTimer(timeout_.global_timeout_); |
1045 | 1.20k | } |
1046 | | |
1047 | 2.21k | for (auto& upstream_request : upstream_requests_) { |
1048 | 2.21k | if (upstream_request->createPerTryTimeoutOnRequestComplete()) { |
1049 | 1.54k | upstream_request->setupPerTryTimeout(); |
1050 | 1.54k | } |
1051 | 2.21k | } |
1052 | 2.21k | } |
1053 | 2.21k | } |
1054 | | |
1055 | 3.99k | void Filter::onDestroy() { |
1056 | | // Reset any in-flight upstream requests. |
1057 | 3.99k | resetAll(); |
1058 | 3.99k | cleanup(); |
1059 | 3.99k | } |
1060 | | |
1061 | 0 | void Filter::onResponseTimeout() { |
1062 | 0 | ENVOY_STREAM_LOG(debug, "upstream timeout", *callbacks_); |
1063 | | |
1064 | | // Reset any upstream requests that are still in flight. |
1065 | 0 | while (!upstream_requests_.empty()) { |
1066 | 0 | UpstreamRequestPtr upstream_request = |
1067 | 0 | upstream_requests_.back()->removeFromList(upstream_requests_); |
1068 | | |
1069 | | // We want to record the upstream timeouts and increase the stats counters in all the cases. |
1070 | | // For example, we also want to record the stats in the case of BiDi streaming APIs where we |
1071 | | // might have already seen the headers. |
1072 | 0 | cluster_->trafficStats()->upstream_rq_timeout_.inc(); |
1073 | 0 | if (request_vcluster_) { |
1074 | 0 | request_vcluster_->stats().upstream_rq_timeout_.inc(); |
1075 | 0 | } |
1076 | 0 | if (route_stats_context_.has_value()) { |
1077 | 0 | route_stats_context_->stats().upstream_rq_timeout_.inc(); |
1078 | 0 | } |
1079 | |
|
1080 | 0 | if (upstream_request->upstreamHost()) { |
1081 | 0 | upstream_request->upstreamHost()->stats().rq_timeout_.inc(); |
1082 | 0 | } |
1083 | |
|
1084 | 0 | if (upstream_request->awaitingHeaders()) { |
1085 | 0 | if (cluster_->timeoutBudgetStats().has_value()) { |
1086 | | // Cancel firing per-try timeout information, because the per-try timeout did not come into |
1087 | | // play when the global timeout was hit. |
1088 | 0 | upstream_request->recordTimeoutBudget(false); |
1089 | 0 | } |
1090 | | |
1091 | | // If this upstream request already hit a "soft" timeout, then it |
1092 | | // already recorded a timeout into outlier detection. Don't do it again. |
1093 | 0 | if (!upstream_request->outlierDetectionTimeoutRecorded()) { |
1094 | 0 | updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, *upstream_request, |
1095 | 0 | absl::optional<uint64_t>(enumToInt(timeout_response_code_))); |
1096 | 0 | } |
1097 | |
|
1098 | 0 | chargeUpstreamAbort(timeout_response_code_, false, *upstream_request); |
1099 | 0 | } |
1100 | 0 | upstream_request->resetStream(); |
1101 | 0 | } |
1102 | |
|
1103 | 0 | onUpstreamTimeoutAbort(StreamInfo::ResponseFlag::UpstreamRequestTimeout, |
1104 | 0 | StreamInfo::ResponseCodeDetails::get().ResponseTimeout); |
1105 | 0 | } |
1106 | | |
1107 | | // Called when the per try timeout is hit but we didn't reset the request |
1108 | | // (hedge_on_per_try_timeout enabled). |
1109 | 0 | void Filter::onSoftPerTryTimeout(UpstreamRequest& upstream_request) { |
1110 | | // Track this as a timeout for outlier detection purposes even though we didn't |
1111 | | // cancel the request yet and might get a 2xx later. |
1112 | 0 | updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request, |
1113 | 0 | absl::optional<uint64_t>(enumToInt(timeout_response_code_))); |
1114 | 0 | upstream_request.outlierDetectionTimeoutRecorded(true); |
1115 | |
|
1116 | 0 | if (!downstream_response_started_ && retry_state_) { |
1117 | 0 | RetryStatus retry_status = retry_state_->shouldHedgeRetryPerTryTimeout( |
1118 | 0 | [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_]() -> void { |
1119 | | // Without any knowledge about what's going on in the connection pool, retry the request |
1120 | | // with the safest settings which is no early data but keep using or not using alt-svc as |
1121 | | // before. In this way, QUIC won't be falsely marked as broken. |
1122 | 0 | doRetry(/*can_send_early_data*/ false, can_use_http3, TimeoutRetry::Yes); |
1123 | 0 | }); |
1124 | |
|
1125 | 0 | if (retry_status == RetryStatus::Yes) { |
1126 | 0 | runRetryOptionsPredicates(upstream_request); |
1127 | 0 | pending_retries_++; |
1128 | | |
1129 | | // Don't increment upstream_host->stats().rq_error_ here, we'll do that |
1130 | | // later if 1) we hit global timeout or 2) we get bad response headers |
1131 | | // back. |
1132 | 0 | upstream_request.retried(true); |
1133 | | |
1134 | | // TODO: cluster stat for hedge attempted. |
1135 | 0 | } else if (retry_status == RetryStatus::NoOverflow) { |
1136 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow); |
1137 | 0 | } else if (retry_status == RetryStatus::NoRetryLimitExceeded) { |
1138 | 0 | callbacks_->streamInfo().setResponseFlag( |
1139 | 0 | StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded); |
1140 | 0 | } |
1141 | 0 | } |
1142 | 0 | } |
1143 | | |
1144 | 0 | void Filter::onPerTryIdleTimeout(UpstreamRequest& upstream_request) { |
1145 | 0 | onPerTryTimeoutCommon(upstream_request, |
1146 | 0 | cluster_->trafficStats()->upstream_rq_per_try_idle_timeout_, |
1147 | 0 | StreamInfo::ResponseCodeDetails::get().UpstreamPerTryIdleTimeout); |
1148 | 0 | } |
1149 | | |
1150 | 0 | void Filter::onPerTryTimeout(UpstreamRequest& upstream_request) { |
1151 | 0 | onPerTryTimeoutCommon(upstream_request, cluster_->trafficStats()->upstream_rq_per_try_timeout_, |
1152 | 0 | StreamInfo::ResponseCodeDetails::get().UpstreamPerTryTimeout); |
1153 | 0 | } |
1154 | | |
1155 | | void Filter::onPerTryTimeoutCommon(UpstreamRequest& upstream_request, Stats::Counter& error_counter, |
1156 | 0 | const std::string& response_code_details) { |
1157 | 0 | if (hedging_params_.hedge_on_per_try_timeout_) { |
1158 | 0 | onSoftPerTryTimeout(upstream_request); |
1159 | 0 | return; |
1160 | 0 | } |
1161 | | |
1162 | 0 | error_counter.inc(); |
1163 | 0 | if (upstream_request.upstreamHost()) { |
1164 | 0 | upstream_request.upstreamHost()->stats().rq_timeout_.inc(); |
1165 | 0 | } |
1166 | |
|
1167 | 0 | upstream_request.resetStream(); |
1168 | |
|
1169 | 0 | updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request, |
1170 | 0 | absl::optional<uint64_t>(enumToInt(timeout_response_code_))); |
1171 | |
|
1172 | 0 | if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::Yes)) { |
1173 | 0 | return; |
1174 | 0 | } |
1175 | | |
1176 | 0 | chargeUpstreamAbort(timeout_response_code_, false, upstream_request); |
1177 | | |
1178 | | // Remove this upstream request from the list now that we're done with it. |
1179 | 0 | upstream_request.removeFromList(upstream_requests_); |
1180 | 0 | onUpstreamTimeoutAbort(StreamInfo::ResponseFlag::UpstreamRequestTimeout, response_code_details); |
1181 | 0 | } |
1182 | | |
1183 | 0 | void Filter::onStreamMaxDurationReached(UpstreamRequest& upstream_request) { |
1184 | 0 | upstream_request.resetStream(); |
1185 | |
|
1186 | 0 | if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::No)) { |
1187 | 0 | return; |
1188 | 0 | } |
1189 | | |
1190 | 0 | upstream_request.removeFromList(upstream_requests_); |
1191 | 0 | cleanup(); |
1192 | |
|
1193 | 0 | callbacks_->streamInfo().setResponseFlag( |
1194 | 0 | StreamInfo::ResponseFlag::UpstreamMaxStreamDurationReached); |
1195 | | // Grab the const ref to call the const method of StreamInfo. |
1196 | 0 | const auto& stream_info = callbacks_->streamInfo(); |
1197 | 0 | const bool downstream_decode_complete = |
1198 | 0 | stream_info.downstreamTiming().has_value() && |
1199 | 0 | stream_info.downstreamTiming().value().get().lastDownstreamRxByteReceived().has_value(); |
1200 | | |
1201 | | // sendLocalReply may instead reset the stream if downstream_response_started_ is true. |
1202 | 0 | callbacks_->sendLocalReply( |
1203 | 0 | Http::Utility::maybeRequestTimeoutCode(downstream_decode_complete), |
1204 | 0 | "upstream max stream duration reached", modify_headers_, absl::nullopt, |
1205 | 0 | StreamInfo::ResponseCodeDetails::get().UpstreamMaxStreamDurationReached); |
1206 | 0 | } |
1207 | | |
1208 | | void Filter::updateOutlierDetection(Upstream::Outlier::Result result, |
1209 | | UpstreamRequest& upstream_request, |
1210 | 74 | absl::optional<uint64_t> code) { |
1211 | 74 | if (upstream_request.upstreamHost()) { |
1212 | 74 | upstream_request.upstreamHost()->outlierDetector().putResult(result, code); |
1213 | 74 | } |
1214 | 74 | } |
1215 | | |
1216 | 74 | void Filter::chargeUpstreamAbort(Http::Code code, bool dropped, UpstreamRequest& upstream_request) { |
1217 | 74 | if (downstream_response_started_) { |
1218 | 16 | if (upstream_request.grpcRqSuccessDeferred()) { |
1219 | 14 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1220 | 14 | stats_.rq_reset_after_downstream_response_started_.inc(); |
1221 | 14 | } |
1222 | 58 | } else { |
1223 | 58 | Upstream::HostDescriptionConstSharedPtr upstream_host = upstream_request.upstreamHost(); |
1224 | | |
1225 | 58 | chargeUpstreamCode(code, upstream_host, dropped); |
1226 | | // If we had non-5xx but still have been reset by backend or timeout before |
1227 | | // starting response, we treat this as an error. We only get non-5xx when |
1228 | | // timeout_response_code_ is used for code above, where this member can |
1229 | | // assume values such as 204 (NoContent). |
1230 | 58 | if (upstream_host != nullptr && !Http::CodeUtility::is5xx(enumToInt(code))) { |
1231 | 0 | upstream_host->stats().rq_error_.inc(); |
1232 | 0 | } |
1233 | 58 | } |
1234 | 74 | } |
1235 | | |
1236 | | void Filter::onUpstreamTimeoutAbort(StreamInfo::ResponseFlag response_flags, |
1237 | 0 | absl::string_view details) { |
1238 | 0 | Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats(); |
1239 | 0 | if (tb_stats.has_value()) { |
1240 | 0 | Event::Dispatcher& dispatcher = callbacks_->dispatcher(); |
1241 | 0 | std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>( |
1242 | 0 | dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_); |
1243 | |
|
1244 | 0 | tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue( |
1245 | 0 | FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_)); |
1246 | 0 | } |
1247 | |
|
1248 | 0 | const absl::string_view body = |
1249 | 0 | timeout_response_code_ == Http::Code::GatewayTimeout ? "upstream request timeout" : ""; |
1250 | 0 | onUpstreamAbort(timeout_response_code_, response_flags, body, false, details); |
1251 | 0 | } |
1252 | | |
1253 | | void Filter::onUpstreamAbort(Http::Code code, StreamInfo::ResponseFlag response_flags, |
1254 | 74 | absl::string_view body, bool dropped, absl::string_view details) { |
1255 | | // If we have not yet sent anything downstream, send a response with an appropriate status code. |
1256 | | // Otherwise just reset the ongoing response. |
1257 | 74 | callbacks_->streamInfo().setResponseFlag(response_flags); |
1258 | | // This will destroy any created retry timers. |
1259 | 74 | cleanup(); |
1260 | | // sendLocalReply may instead reset the stream if downstream_response_started_ is true. |
1261 | 74 | callbacks_->sendLocalReply( |
1262 | 74 | code, body, |
1263 | 74 | [dropped, this](Http::ResponseHeaderMap& headers) { |
1264 | 58 | if (dropped && !config_.suppress_envoy_headers_) { |
1265 | 0 | headers.addReference(Http::Headers::get().EnvoyOverloaded, |
1266 | 0 | Http::Headers::get().EnvoyOverloadedValues.True); |
1267 | 0 | } |
1268 | 58 | modify_headers_(headers); |
1269 | 58 | }, |
1270 | 74 | absl::nullopt, details); |
1271 | 74 | } |
1272 | | |
1273 | | bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason, |
1274 | 74 | UpstreamRequest& upstream_request, TimeoutRetry is_timeout_retry) { |
1275 | | // We don't retry if we already started the response, don't have a retry policy defined, |
1276 | | // or if we've already retried this upstream request (currently only possible if a per |
1277 | | // try timeout occurred and hedge_on_per_try_timeout is enabled). |
1278 | 74 | if (downstream_response_started_ || !retry_state_ || upstream_request.retried()) { |
1279 | 74 | return false; |
1280 | 74 | } |
1281 | 0 | RetryState::Http3Used was_using_http3 = RetryState::Http3Used::Unknown; |
1282 | 0 | if (upstream_request.hadUpstream()) { |
1283 | 0 | was_using_http3 = (upstream_request.streamInfo().protocol().has_value() && |
1284 | 0 | upstream_request.streamInfo().protocol().value() == Http::Protocol::Http3) |
1285 | 0 | ? RetryState::Http3Used::Yes |
1286 | 0 | : RetryState::Http3Used::No; |
1287 | 0 | } |
1288 | 0 | const RetryStatus retry_status = retry_state_->shouldRetryReset( |
1289 | 0 | reset_reason, was_using_http3, |
1290 | 0 | [this, can_send_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_, |
1291 | 0 | can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_, |
1292 | 0 | is_timeout_retry](bool disable_http3) -> void { |
1293 | | // This retry might be because of ConnectionFailure of 0-RTT handshake. In this case, though |
1294 | | // the original request is retried with the same can_send_early_data setting, it will not be |
1295 | | // sent as early data by the underlying connection pool grid. |
1296 | 0 | doRetry(can_send_early_data, disable_http3 ? false : can_use_http3, is_timeout_retry); |
1297 | 0 | }); |
1298 | 0 | if (retry_status == RetryStatus::Yes) { |
1299 | 0 | runRetryOptionsPredicates(upstream_request); |
1300 | 0 | pending_retries_++; |
1301 | |
|
1302 | 0 | if (upstream_request.upstreamHost()) { |
1303 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1304 | 0 | } |
1305 | |
|
1306 | 0 | auto request_ptr = upstream_request.removeFromList(upstream_requests_); |
1307 | 0 | callbacks_->dispatcher().deferredDelete(std::move(request_ptr)); |
1308 | 0 | return true; |
1309 | 0 | } else if (retry_status == RetryStatus::NoOverflow) { |
1310 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow); |
1311 | 0 | } else if (retry_status == RetryStatus::NoRetryLimitExceeded) { |
1312 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded); |
1313 | 0 | } |
1314 | | |
1315 | 0 | return false; |
1316 | 0 | } |
1317 | | |
1318 | | void Filter::onUpstreamReset(Http::StreamResetReason reset_reason, |
1319 | | absl::string_view transport_failure_reason, |
1320 | 74 | UpstreamRequest& upstream_request) { |
1321 | 74 | ENVOY_STREAM_LOG(debug, "upstream reset: reset reason: {}, transport failure reason: {}", |
1322 | 74 | *callbacks_, Http::Utility::resetReasonToString(reset_reason), |
1323 | 74 | transport_failure_reason); |
1324 | | |
1325 | 74 | const bool dropped = reset_reason == Http::StreamResetReason::Overflow; |
1326 | | |
1327 | | // Ignore upstream reset caused by a resource overflow. |
1328 | | // Currently, circuit breakers can only produce this reset reason. |
1329 | | // It means that this reason is cluster-wise, not upstream-related. |
1330 | | // Therefore removing an upstream in the case of an overloaded cluster |
1331 | | // would make the situation even worse. |
1332 | | // https://github.com/envoyproxy/envoy/issues/25487 |
1333 | 74 | if (!dropped) { |
1334 | | // TODO: The reset may also come from upstream over the wire. In this case it should be |
1335 | | // treated as external origin error and distinguished from local origin error. |
1336 | | // This matters only when running OutlierDetection with split_external_local_origin_errors |
1337 | | // config param set to true. |
1338 | 74 | updateOutlierDetection(Upstream::Outlier::Result::LocalOriginConnectFailed, upstream_request, |
1339 | 74 | absl::nullopt); |
1340 | 74 | } |
1341 | | |
1342 | 74 | if (maybeRetryReset(reset_reason, upstream_request, TimeoutRetry::No)) { |
1343 | 0 | return; |
1344 | 0 | } |
1345 | | |
1346 | 74 | const Http::Code error_code = (reset_reason == Http::StreamResetReason::ProtocolError) |
1347 | 74 | ? Http::Code::BadGateway |
1348 | 74 | : Http::Code::ServiceUnavailable; |
1349 | 74 | chargeUpstreamAbort(error_code, dropped, upstream_request); |
1350 | 74 | auto request_ptr = upstream_request.removeFromList(upstream_requests_); |
1351 | 74 | callbacks_->dispatcher().deferredDelete(std::move(request_ptr)); |
1352 | | |
1353 | | // If there are other in-flight requests that might see an upstream response, |
1354 | | // don't return anything downstream. |
1355 | 74 | if (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0) { |
1356 | 0 | return; |
1357 | 0 | } |
1358 | | |
1359 | 74 | const StreamInfo::ResponseFlag response_flags = streamResetReasonToResponseFlag(reset_reason); |
1360 | | |
1361 | 74 | const std::string body = |
1362 | 74 | absl::StrCat("upstream connect error or disconnect/reset before headers. ", |
1363 | 74 | (is_retry_ ? "retried and the latest " : ""), |
1364 | 74 | "reset reason: ", Http::Utility::resetReasonToString(reset_reason), |
1365 | 74 | !transport_failure_reason.empty() ? ", transport failure reason: " : "", |
1366 | 74 | transport_failure_reason); |
1367 | 74 | const std::string& basic_details = |
1368 | 74 | downstream_response_started_ ? StreamInfo::ResponseCodeDetails::get().LateUpstreamReset |
1369 | 74 | : StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset; |
1370 | 74 | const std::string details = StringUtil::replaceAllEmptySpace(absl::StrCat( |
1371 | 74 | basic_details, "{", Http::Utility::resetReasonToString(reset_reason), |
1372 | 74 | transport_failure_reason.empty() ? "" : absl::StrCat(",", transport_failure_reason), "}")); |
1373 | 74 | onUpstreamAbort(error_code, response_flags, body, dropped, details); |
1374 | 74 | } |
1375 | | |
1376 | | void Filter::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host, |
1377 | 2.32k | bool pool_success) { |
1378 | 2.32k | if (retry_state_ && host) { |
1379 | 6 | retry_state_->onHostAttempted(host); |
1380 | 6 | } |
1381 | | |
1382 | 2.32k | if (!pool_success) { |
1383 | 0 | return; |
1384 | 0 | } |
1385 | | |
1386 | 2.32k | if (request_vcluster_) { |
1387 | | // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady |
1388 | | // callback. Hence, the upstream request increases the virtual cluster's upstream_rq_total_ stat |
1389 | | // here. |
1390 | 6 | request_vcluster_->stats().upstream_rq_total_.inc(); |
1391 | 6 | } |
1392 | 2.32k | if (route_stats_context_.has_value()) { |
1393 | | // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady |
1394 | | // callback. Hence, the upstream request increases the route level upstream_rq_total_ stat |
1395 | | // here. |
1396 | 0 | route_stats_context_->stats().upstream_rq_total_.inc(); |
1397 | 0 | } |
1398 | 2.32k | } |
1399 | | |
1400 | | StreamInfo::ResponseFlag |
1401 | 148 | Filter::streamResetReasonToResponseFlag(Http::StreamResetReason reset_reason) { |
1402 | 148 | switch (reset_reason) { |
1403 | 0 | case Http::StreamResetReason::LocalConnectionFailure: |
1404 | 0 | case Http::StreamResetReason::RemoteConnectionFailure: |
1405 | 0 | case Http::StreamResetReason::ConnectionTimeout: |
1406 | 0 | return StreamInfo::ResponseFlag::UpstreamConnectionFailure; |
1407 | 40 | case Http::StreamResetReason::ConnectionTermination: |
1408 | 40 | return StreamInfo::ResponseFlag::UpstreamConnectionTermination; |
1409 | 0 | case Http::StreamResetReason::LocalReset: |
1410 | 0 | case Http::StreamResetReason::LocalRefusedStreamReset: |
1411 | 0 | return StreamInfo::ResponseFlag::LocalReset; |
1412 | 0 | case Http::StreamResetReason::Overflow: |
1413 | 0 | return StreamInfo::ResponseFlag::UpstreamOverflow; |
1414 | 0 | case Http::StreamResetReason::RemoteReset: |
1415 | 2 | case Http::StreamResetReason::RemoteRefusedStreamReset: |
1416 | 2 | case Http::StreamResetReason::ConnectError: |
1417 | 2 | return StreamInfo::ResponseFlag::UpstreamRemoteReset; |
1418 | 106 | case Http::StreamResetReason::ProtocolError: |
1419 | 106 | return StreamInfo::ResponseFlag::UpstreamProtocolError; |
1420 | 0 | case Http::StreamResetReason::OverloadManager: |
1421 | 0 | return StreamInfo::ResponseFlag::OverloadManager; |
1422 | 148 | } |
1423 | | |
1424 | 0 | PANIC_DUE_TO_CORRUPT_ENUM; |
1425 | 0 | } |
1426 | | |
1427 | | void Filter::handleNon5xxResponseHeaders(absl::optional<Grpc::Status::GrpcStatus> grpc_status, |
1428 | | UpstreamRequest& upstream_request, bool end_stream, |
1429 | 2.22k | uint64_t grpc_to_http_status) { |
1430 | | // We need to defer gRPC success until after we have processed grpc-status in |
1431 | | // the trailers. |
1432 | 2.22k | if (grpc_request_) { |
1433 | 1.12k | if (end_stream) { |
1434 | 93 | if (grpc_status && !Http::CodeUtility::is5xx(grpc_to_http_status)) { |
1435 | 93 | upstream_request.upstreamHost()->stats().rq_success_.inc(); |
1436 | 93 | } else { |
1437 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1438 | 0 | } |
1439 | 1.02k | } else { |
1440 | 1.02k | upstream_request.grpcRqSuccessDeferred(true); |
1441 | 1.02k | } |
1442 | 1.12k | } else { |
1443 | 1.10k | upstream_request.upstreamHost()->stats().rq_success_.inc(); |
1444 | 1.10k | } |
1445 | 2.22k | } |
1446 | | |
1447 | | void Filter::onUpstream1xxHeaders(Http::ResponseHeaderMapPtr&& headers, |
1448 | 1 | UpstreamRequest& upstream_request) { |
1449 | 1 | const uint64_t response_code = Http::Utility::getResponseStatus(*headers); |
1450 | 1 | chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false); |
1451 | 1 | ENVOY_STREAM_LOG(debug, "upstream 1xx ({}).", *callbacks_, response_code); |
1452 | | |
1453 | 1 | downstream_response_started_ = true; |
1454 | 1 | final_upstream_request_ = &upstream_request; |
1455 | 1 | resetOtherUpstreams(upstream_request); |
1456 | | |
1457 | | // Don't send retries after 100-Continue has been sent on. Arguably we could attempt to do a |
1458 | | // retry, assume the next upstream would also send an 100-Continue and swallow the second one |
1459 | | // but it's sketchy (as the subsequent upstream might not send a 100-Continue) and not worth |
1460 | | // the complexity until someone asks for it. |
1461 | 1 | retry_state_.reset(); |
1462 | | |
1463 | 1 | callbacks_->encode1xxHeaders(std::move(headers)); |
1464 | 1 | } |
1465 | | |
1466 | 3.99k | void Filter::resetAll() { |
1467 | 5.14k | while (!upstream_requests_.empty()) { |
1468 | 1.14k | auto request_ptr = upstream_requests_.back()->removeFromList(upstream_requests_); |
1469 | 1.14k | request_ptr->resetStream(); |
1470 | 1.14k | callbacks_->dispatcher().deferredDelete(std::move(request_ptr)); |
1471 | 1.14k | } |
1472 | 3.99k | } |
1473 | | |
1474 | 2.22k | void Filter::resetOtherUpstreams(UpstreamRequest& upstream_request) { |
1475 | | // Pop each upstream request on the list and reset it if it's not the one |
1476 | | // provided. At the end we'll move it back into the list. |
1477 | 2.22k | UpstreamRequestPtr final_upstream_request; |
1478 | 4.44k | while (!upstream_requests_.empty()) { |
1479 | 2.22k | UpstreamRequestPtr upstream_request_tmp = |
1480 | 2.22k | upstream_requests_.back()->removeFromList(upstream_requests_); |
1481 | 2.22k | if (upstream_request_tmp.get() != &upstream_request) { |
1482 | 0 | upstream_request_tmp->resetStream(); |
1483 | | // TODO: per-host stat for hedge abandoned. |
1484 | | // TODO: cluster stat for hedge abandoned. |
1485 | 2.22k | } else { |
1486 | 2.22k | final_upstream_request = std::move(upstream_request_tmp); |
1487 | 2.22k | } |
1488 | 2.22k | } |
1489 | | |
1490 | 2.22k | ASSERT(final_upstream_request); |
1491 | | // Now put the final request back on this list. |
1492 | 2.22k | LinkedList::moveIntoList(std::move(final_upstream_request), upstream_requests_); |
1493 | 2.22k | } |
1494 | | |
1495 | | void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers, |
1496 | 2.22k | UpstreamRequest& upstream_request, bool end_stream) { |
1497 | 2.22k | ENVOY_STREAM_LOG(debug, "upstream headers complete: end_stream={}", *callbacks_, end_stream); |
1498 | | |
1499 | 2.22k | modify_headers_(*headers); |
1500 | | // When grpc-status appears in response headers, convert grpc-status to HTTP status code |
1501 | | // for outlier detection. This does not currently change any stats or logging and does not |
1502 | | // handle the case when an error grpc-status is sent as a trailer. |
1503 | 2.22k | absl::optional<Grpc::Status::GrpcStatus> grpc_status; |
1504 | 2.22k | uint64_t grpc_to_http_status = 0; |
1505 | 2.22k | if (grpc_request_) { |
1506 | 1.12k | grpc_status = Grpc::Common::getGrpcStatus(*headers); |
1507 | 1.12k | if (grpc_status.has_value()) { |
1508 | 93 | grpc_to_http_status = Grpc::Utility::grpcToHttpStatus(grpc_status.value()); |
1509 | 93 | } |
1510 | 1.12k | } |
1511 | | |
1512 | 2.22k | if (grpc_status.has_value()) { |
1513 | 93 | upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(grpc_to_http_status); |
1514 | 2.12k | } else { |
1515 | 2.12k | upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(response_code); |
1516 | 2.12k | } |
1517 | | |
1518 | 2.22k | if (headers->EnvoyImmediateHealthCheckFail() != nullptr) { |
1519 | 0 | upstream_request.upstreamHost()->healthChecker().setUnhealthy( |
1520 | 0 | Upstream::HealthCheckHostMonitor::UnhealthyType::ImmediateHealthCheckFail); |
1521 | 0 | } |
1522 | | |
1523 | 2.22k | bool could_not_retry = false; |
1524 | | |
1525 | | // Check if this upstream request was already retried, for instance after |
1526 | | // hitting a per try timeout. Don't retry it if we already have. |
1527 | 2.22k | if (retry_state_) { |
1528 | 0 | if (upstream_request.retried()) { |
1529 | | // We already retried this request (presumably for a per try timeout) so |
1530 | | // we definitely won't retry it again. Check if we would have retried it |
1531 | | // if we could. |
1532 | 0 | bool retry_as_early_data; // Not going to be used as we are not retrying. |
1533 | 0 | could_not_retry = retry_state_->wouldRetryFromHeaders(*headers, *downstream_headers_, |
1534 | 0 | retry_as_early_data) != |
1535 | 0 | RetryState::RetryDecision::NoRetry; |
1536 | 0 | } else { |
1537 | 0 | const RetryStatus retry_status = retry_state_->shouldRetryHeaders( |
1538 | 0 | *headers, *downstream_headers_, |
1539 | 0 | [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_, |
1540 | 0 | had_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_]( |
1541 | 0 | bool disable_early_data) -> void { |
1542 | 0 | doRetry((disable_early_data ? false : had_early_data), can_use_http3, TimeoutRetry::No); |
1543 | 0 | }); |
1544 | 0 | if (retry_status == RetryStatus::Yes) { |
1545 | 0 | runRetryOptionsPredicates(upstream_request); |
1546 | 0 | pending_retries_++; |
1547 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1548 | 0 | Http::CodeStats& code_stats = httpContext().codeStats(); |
1549 | 0 | code_stats.chargeBasicResponseStat(cluster_->statsScope(), stats_.stat_names_.retry_, |
1550 | 0 | static_cast<Http::Code>(response_code), |
1551 | 0 | exclude_http_code_stats_); |
1552 | |
|
1553 | 0 | if (!end_stream || !upstream_request.encodeComplete()) { |
1554 | 0 | upstream_request.resetStream(); |
1555 | 0 | } |
1556 | 0 | auto request_ptr = upstream_request.removeFromList(upstream_requests_); |
1557 | 0 | callbacks_->dispatcher().deferredDelete(std::move(request_ptr)); |
1558 | 0 | return; |
1559 | 0 | } else if (retry_status == RetryStatus::NoOverflow) { |
1560 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow); |
1561 | 0 | could_not_retry = true; |
1562 | 0 | } else if (retry_status == RetryStatus::NoRetryLimitExceeded) { |
1563 | 0 | callbacks_->streamInfo().setResponseFlag( |
1564 | 0 | StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded); |
1565 | 0 | could_not_retry = true; |
1566 | 0 | } |
1567 | 0 | } |
1568 | 0 | } |
1569 | | |
1570 | 2.22k | if (route_entry_->internalRedirectPolicy().enabled() && |
1571 | 2.22k | route_entry_->internalRedirectPolicy().shouldRedirectForResponseCode( |
1572 | 0 | static_cast<Http::Code>(response_code)) && |
1573 | 2.22k | setupRedirect(*headers)) { |
1574 | 0 | return; |
1575 | | // If the redirect could not be handled, fail open and let it pass to the |
1576 | | // next downstream. |
1577 | 0 | } |
1578 | | |
1579 | | // Check if we got a "bad" response, but there are still upstream requests in |
1580 | | // flight awaiting headers or scheduled retries. If so, exit to give them a |
1581 | | // chance to return before returning a response downstream. |
1582 | 2.22k | if (could_not_retry && (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0)) { |
1583 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1584 | | |
1585 | | // Reset the stream because there are other in-flight requests that we'll |
1586 | | // wait around for and we're not interested in consuming any body/trailers. |
1587 | 0 | auto request_ptr = upstream_request.removeFromList(upstream_requests_); |
1588 | 0 | request_ptr->resetStream(); |
1589 | 0 | callbacks_->dispatcher().deferredDelete(std::move(request_ptr)); |
1590 | 0 | return; |
1591 | 0 | } |
1592 | | |
1593 | | // Make sure any retry timers are destroyed since we may not call cleanup() if end_stream is |
1594 | | // false. |
1595 | 2.22k | if (retry_state_) { |
1596 | 0 | retry_state_.reset(); |
1597 | 0 | } |
1598 | | |
1599 | | // Only send upstream service time if we received the complete request and this is not a |
1600 | | // premature response. |
1601 | 2.22k | if (DateUtil::timePointValid(downstream_request_complete_time_)) { |
1602 | 1.09k | Event::Dispatcher& dispatcher = callbacks_->dispatcher(); |
1603 | 1.09k | MonotonicTime response_received_time = dispatcher.timeSource().monotonicTime(); |
1604 | 1.09k | std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
1605 | 1.09k | response_received_time - downstream_request_complete_time_); |
1606 | 1.09k | if (!config_.suppress_envoy_headers_) { |
1607 | 1.09k | headers->setEnvoyUpstreamServiceTime(ms.count()); |
1608 | 1.09k | } |
1609 | 1.09k | } |
1610 | | |
1611 | 2.22k | upstream_request.upstreamCanary( |
1612 | 2.22k | (headers->EnvoyUpstreamCanary() && headers->EnvoyUpstreamCanary()->value() == "true") || |
1613 | 2.22k | upstream_request.upstreamHost()->canary()); |
1614 | 2.22k | chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false); |
1615 | 2.22k | if (!Http::CodeUtility::is5xx(response_code)) { |
1616 | 2.22k | handleNon5xxResponseHeaders(grpc_status, upstream_request, end_stream, grpc_to_http_status); |
1617 | 2.22k | } |
1618 | | |
1619 | | // Append routing cookies |
1620 | 2.22k | for (const auto& header_value : downstream_set_cookies_) { |
1621 | 0 | headers->addReferenceKey(Http::Headers::get().SetCookie, header_value); |
1622 | 0 | } |
1623 | | |
1624 | 2.22k | callbacks_->streamInfo().setResponseCodeDetails( |
1625 | 2.22k | StreamInfo::ResponseCodeDetails::get().ViaUpstream); |
1626 | | |
1627 | 2.22k | if (Runtime::runtimeFeatureEnabled( |
1628 | 2.22k | "envoy.reloadable_features.copy_response_code_to_downstream_stream_info")) { |
1629 | 2.22k | callbacks_->streamInfo().setResponseCode(response_code); |
1630 | 2.22k | } |
1631 | | |
1632 | | // TODO(zuercher): If access to response_headers_to_add (at any level) is ever needed outside |
1633 | | // Router::Filter we'll need to find a better location for this work. One possibility is to |
1634 | | // provide finalizeResponseHeaders functions on the Router::Config and VirtualHost interfaces. |
1635 | 2.22k | route_entry_->finalizeResponseHeaders(*headers, callbacks_->streamInfo()); |
1636 | | |
1637 | 2.22k | downstream_response_started_ = true; |
1638 | 2.22k | final_upstream_request_ = &upstream_request; |
1639 | | // Make sure that for request hedging, we end up with the correct final upstream info. |
1640 | 2.22k | callbacks_->streamInfo().setUpstreamInfo(final_upstream_request_->streamInfo().upstreamInfo()); |
1641 | 2.22k | resetOtherUpstreams(upstream_request); |
1642 | 2.22k | if (end_stream) { |
1643 | 110 | onUpstreamComplete(upstream_request); |
1644 | 110 | } |
1645 | | |
1646 | 2.22k | callbacks_->encodeHeaders(std::move(headers), end_stream, |
1647 | 2.22k | StreamInfo::ResponseCodeDetails::get().ViaUpstream); |
1648 | 2.22k | } |
1649 | | |
1650 | | void Filter::onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request, |
1651 | 4.28k | bool end_stream) { |
1652 | | // This should be true because when we saw headers we either reset the stream |
1653 | | // (hence wouldn't have made it to onUpstreamData) or all other in-flight |
1654 | | // streams. |
1655 | 4.28k | ASSERT(upstream_requests_.size() == 1); |
1656 | 4.28k | if (end_stream) { |
1657 | | // gRPC request termination without trailers is an error. |
1658 | 1.06k | if (upstream_request.grpcRqSuccessDeferred()) { |
1659 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1660 | 0 | } |
1661 | 1.06k | onUpstreamComplete(upstream_request); |
1662 | 1.06k | } |
1663 | | |
1664 | 4.28k | callbacks_->encodeData(data, end_stream); |
1665 | 4.28k | } |
1666 | | |
1667 | | void Filter::onUpstreamTrailers(Http::ResponseTrailerMapPtr&& trailers, |
1668 | 3 | UpstreamRequest& upstream_request) { |
1669 | | // This should be true because when we saw headers we either reset the stream |
1670 | | // (hence wouldn't have made it to onUpstreamTrailers) or all other in-flight |
1671 | | // streams. |
1672 | 3 | ASSERT(upstream_requests_.size() == 1); |
1673 | | |
1674 | 3 | if (upstream_request.grpcRqSuccessDeferred()) { |
1675 | 3 | absl::optional<Grpc::Status::GrpcStatus> grpc_status = Grpc::Common::getGrpcStatus(*trailers); |
1676 | 3 | if (grpc_status && |
1677 | 3 | !Http::CodeUtility::is5xx(Grpc::Utility::grpcToHttpStatus(grpc_status.value()))) { |
1678 | 3 | upstream_request.upstreamHost()->stats().rq_success_.inc(); |
1679 | 3 | } else { |
1680 | 0 | upstream_request.upstreamHost()->stats().rq_error_.inc(); |
1681 | 0 | } |
1682 | 3 | } |
1683 | | |
1684 | 3 | onUpstreamComplete(upstream_request); |
1685 | | |
1686 | 3 | callbacks_->encodeTrailers(std::move(trailers)); |
1687 | 3 | } |
1688 | | |
1689 | 181 | void Filter::onUpstreamMetadata(Http::MetadataMapPtr&& metadata_map) { |
1690 | 181 | callbacks_->encodeMetadata(std::move(metadata_map)); |
1691 | 181 | } |
1692 | | |
1693 | 1.18k | void Filter::onUpstreamComplete(UpstreamRequest& upstream_request) { |
1694 | 1.18k | if (!downstream_end_stream_) { |
1695 | 99 | upstream_request.resetStream(); |
1696 | 99 | } |
1697 | 1.18k | Event::Dispatcher& dispatcher = callbacks_->dispatcher(); |
1698 | 1.18k | std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>( |
1699 | 1.18k | dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_); |
1700 | | |
1701 | 1.18k | Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats(); |
1702 | 1.18k | if (tb_stats.has_value()) { |
1703 | 0 | tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue( |
1704 | 0 | FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_)); |
1705 | 0 | } |
1706 | | |
1707 | 1.18k | if (config_.emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck() && |
1708 | 1.18k | DateUtil::timePointValid(downstream_request_complete_time_)) { |
1709 | 1.08k | upstream_request.upstreamHost()->outlierDetector().putResponseTime(response_time); |
1710 | 1.08k | const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_); |
1711 | | |
1712 | 1.08k | Http::CodeStats& code_stats = httpContext().codeStats(); |
1713 | 1.08k | Http::CodeStats::ResponseTimingInfo info{ |
1714 | 1.08k | config_.scope_, |
1715 | 1.08k | cluster_->statsScope(), |
1716 | 1.08k | config_.empty_stat_name_, |
1717 | 1.08k | response_time, |
1718 | 1.08k | upstream_request.upstreamCanary(), |
1719 | 1.08k | internal_request, |
1720 | 1.08k | route_entry_->virtualHost().statName(), |
1721 | 1.08k | request_vcluster_ ? request_vcluster_->statName() : config_.empty_stat_name_, |
1722 | 1.08k | route_stats_context_.has_value() ? route_stats_context_->statName() |
1723 | 1.08k | : config_.empty_stat_name_, |
1724 | 1.08k | config_.zone_name_, |
1725 | 1.08k | upstreamZone(upstream_request.upstreamHost())}; |
1726 | | |
1727 | 1.08k | code_stats.chargeResponseTiming(info); |
1728 | | |
1729 | 1.08k | if (alt_stat_prefix_ != nullptr) { |
1730 | 0 | Http::CodeStats::ResponseTimingInfo info{config_.scope_, |
1731 | 0 | cluster_->statsScope(), |
1732 | 0 | alt_stat_prefix_->statName(), |
1733 | 0 | response_time, |
1734 | 0 | upstream_request.upstreamCanary(), |
1735 | 0 | internal_request, |
1736 | 0 | config_.empty_stat_name_, |
1737 | 0 | config_.empty_stat_name_, |
1738 | 0 | config_.empty_stat_name_, |
1739 | 0 | config_.zone_name_, |
1740 | 0 | upstreamZone(upstream_request.upstreamHost())}; |
1741 | |
|
1742 | 0 | code_stats.chargeResponseTiming(info); |
1743 | 0 | } |
1744 | 1.08k | } |
1745 | | |
1746 | | // Defer deletion as this is generally called under the stack of the upstream |
1747 | | // request, and immediate deletion is dangerous. |
1748 | 1.18k | callbacks_->dispatcher().deferredDelete(upstream_request.removeFromList(upstream_requests_)); |
1749 | 1.18k | cleanup(); |
1750 | 1.18k | } |
1751 | | |
1752 | 0 | bool Filter::setupRedirect(const Http::ResponseHeaderMap& headers) { |
1753 | 0 | ENVOY_STREAM_LOG(debug, "attempting internal redirect", *callbacks_); |
1754 | 0 | const Http::HeaderEntry* location = headers.Location(); |
1755 | |
|
1756 | 0 | const uint64_t status_code = Http::Utility::getResponseStatus(headers); |
1757 | | |
1758 | | // Redirects are not supported for streaming requests yet. |
1759 | 0 | if (downstream_end_stream_ && (!request_buffer_overflowed_ || !callbacks_->decodingBuffer()) && |
1760 | 0 | location != nullptr && |
1761 | 0 | convertRequestHeadersForInternalRedirect(*downstream_headers_, *location, status_code) && |
1762 | 0 | callbacks_->recreateStream(&headers)) { |
1763 | 0 | ENVOY_STREAM_LOG(debug, "Internal redirect succeeded", *callbacks_); |
1764 | 0 | cluster_->trafficStats()->upstream_internal_redirect_succeeded_total_.inc(); |
1765 | 0 | return true; |
1766 | 0 | } |
1767 | | // convertRequestHeadersForInternalRedirect logs failure reasons but log |
1768 | | // details for other failure modes here. |
1769 | 0 | if (!downstream_end_stream_) { |
1770 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: request incomplete", *callbacks_); |
1771 | 0 | } else if (request_buffer_overflowed_) { |
1772 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: request body overflow", *callbacks_); |
1773 | 0 | } else if (location == nullptr) { |
1774 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: missing location header", *callbacks_); |
1775 | 0 | } |
1776 | |
|
1777 | 0 | cluster_->trafficStats()->upstream_internal_redirect_failed_total_.inc(); |
1778 | 0 | return false; |
1779 | 0 | } |
1780 | | |
1781 | | bool Filter::convertRequestHeadersForInternalRedirect(Http::RequestHeaderMap& downstream_headers, |
1782 | | const Http::HeaderEntry& internal_redirect, |
1783 | 0 | uint64_t status_code) { |
1784 | 0 | if (!downstream_headers.Path()) { |
1785 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: no path in downstream_headers", *callbacks_); |
1786 | 0 | return false; |
1787 | 0 | } |
1788 | | |
1789 | 0 | absl::string_view redirect_url = internal_redirect.value().getStringView(); |
1790 | | // Make sure the redirect response contains a URL to redirect to. |
1791 | 0 | if (redirect_url.empty()) { |
1792 | 0 | stats_.passthrough_internal_redirect_bad_location_.inc(); |
1793 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: empty location", *callbacks_); |
1794 | 0 | return false; |
1795 | 0 | } |
1796 | 0 | Http::Utility::Url absolute_url; |
1797 | 0 | if (!absolute_url.initialize(redirect_url, false)) { |
1798 | 0 | stats_.passthrough_internal_redirect_bad_location_.inc(); |
1799 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: invalid location {}", *callbacks_, |
1800 | 0 | redirect_url); |
1801 | 0 | return false; |
1802 | 0 | } |
1803 | | |
1804 | 0 | const auto& policy = route_entry_->internalRedirectPolicy(); |
1805 | | // Don't change the scheme from the original request |
1806 | 0 | const bool scheme_is_http = schemeIsHttp(downstream_headers, callbacks_->connection()); |
1807 | 0 | const bool target_is_http = Http::Utility::schemeIsHttp(absolute_url.scheme()); |
1808 | 0 | if (!policy.isCrossSchemeRedirectAllowed() && scheme_is_http != target_is_http) { |
1809 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: incorrect scheme for {}", *callbacks_, |
1810 | 0 | redirect_url); |
1811 | 0 | stats_.passthrough_internal_redirect_unsafe_scheme_.inc(); |
1812 | 0 | return false; |
1813 | 0 | } |
1814 | | |
1815 | 0 | const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState(); |
1816 | | // Make sure that performing the redirect won't result in exceeding the configured number of |
1817 | | // redirects allowed for this route. |
1818 | 0 | StreamInfo::UInt32Accessor* num_internal_redirect{}; |
1819 | |
|
1820 | 0 | if (num_internal_redirect = filter_state->getDataMutable<StreamInfo::UInt32Accessor>( |
1821 | 0 | NumInternalRedirectsFilterStateName); |
1822 | 0 | num_internal_redirect == nullptr) { |
1823 | 0 | auto state = std::make_shared<StreamInfo::UInt32AccessorImpl>(0); |
1824 | 0 | num_internal_redirect = state.get(); |
1825 | |
|
1826 | 0 | filter_state->setData(NumInternalRedirectsFilterStateName, std::move(state), |
1827 | 0 | StreamInfo::FilterState::StateType::Mutable, |
1828 | 0 | StreamInfo::FilterState::LifeSpan::Request); |
1829 | 0 | } |
1830 | |
|
1831 | 0 | if (num_internal_redirect->value() >= policy.maxInternalRedirects()) { |
1832 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: redirect limits exceeded.", *callbacks_); |
1833 | 0 | stats_.passthrough_internal_redirect_too_many_redirects_.inc(); |
1834 | 0 | return false; |
1835 | 0 | } |
1836 | | // Copy the old values, so they can be restored if the redirect fails. |
1837 | 0 | const std::string original_host(downstream_headers.getHostValue()); |
1838 | 0 | const std::string original_path(downstream_headers.getPathValue()); |
1839 | 0 | const bool scheme_is_set = (downstream_headers.Scheme() != nullptr); |
1840 | 0 | Cleanup restore_original_headers( |
1841 | 0 | [&downstream_headers, original_host, original_path, scheme_is_set, scheme_is_http]() { |
1842 | 0 | downstream_headers.setHost(original_host); |
1843 | 0 | downstream_headers.setPath(original_path); |
1844 | 0 | if (scheme_is_set) { |
1845 | 0 | downstream_headers.setScheme(scheme_is_http ? Http::Headers::get().SchemeValues.Http |
1846 | 0 | : Http::Headers::get().SchemeValues.Https); |
1847 | 0 | } |
1848 | 0 | }); |
1849 | | |
1850 | | // Replace the original host, scheme and path. |
1851 | 0 | downstream_headers.setScheme(absolute_url.scheme()); |
1852 | 0 | downstream_headers.setHost(absolute_url.hostAndPort()); |
1853 | |
|
1854 | 0 | auto path_and_query = absolute_url.pathAndQueryParams(); |
1855 | 0 | if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http_reject_path_with_fragment")) { |
1856 | | // Envoy treats internal redirect as a new request and will reject it if URI path |
1857 | | // contains #fragment. However the Location header is allowed to have #fragment in URI path. To |
1858 | | // prevent Envoy from rejecting internal redirect, strip the #fragment from Location URI if it |
1859 | | // is present. |
1860 | 0 | auto fragment_pos = path_and_query.find('#'); |
1861 | 0 | path_and_query = path_and_query.substr(0, fragment_pos); |
1862 | 0 | } |
1863 | 0 | downstream_headers.setPath(path_and_query); |
1864 | | |
1865 | | // Only clear the route cache if there are downstream callbacks. There aren't, for example, |
1866 | | // for async connections. |
1867 | 0 | if (callbacks_->downstreamCallbacks()) { |
1868 | 0 | callbacks_->downstreamCallbacks()->clearRouteCache(); |
1869 | 0 | } |
1870 | 0 | const auto route = callbacks_->route(); |
1871 | | // Don't allow a redirect to a non existing route. |
1872 | 0 | if (!route) { |
1873 | 0 | stats_.passthrough_internal_redirect_no_route_.inc(); |
1874 | 0 | ENVOY_STREAM_LOG(trace, "Internal redirect failed: no route found", *callbacks_); |
1875 | 0 | return false; |
1876 | 0 | } |
1877 | | |
1878 | 0 | const auto& route_name = route->routeName(); |
1879 | 0 | for (const auto& predicate : policy.predicates()) { |
1880 | 0 | if (!predicate->acceptTargetRoute(*filter_state, route_name, !scheme_is_http, |
1881 | 0 | !target_is_http)) { |
1882 | 0 | stats_.passthrough_internal_redirect_predicate_.inc(); |
1883 | 0 | ENVOY_STREAM_LOG(trace, |
1884 | 0 | "Internal redirect failed: rejecting redirect targeting {}, by {} predicate", |
1885 | 0 | *callbacks_, route_name, predicate->name()); |
1886 | 0 | return false; |
1887 | 0 | } |
1888 | 0 | } |
1889 | | |
1890 | | // See https://tools.ietf.org/html/rfc7231#section-6.4.4. |
1891 | 0 | if (status_code == enumToInt(Http::Code::SeeOther) && |
1892 | 0 | downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Get && |
1893 | 0 | downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Head) { |
1894 | 0 | downstream_headers.setMethod(Http::Headers::get().MethodValues.Get); |
1895 | 0 | downstream_headers.remove(Http::Headers::get().ContentLength); |
1896 | 0 | callbacks_->modifyDecodingBuffer([](Buffer::Instance& data) { data.drain(data.length()); }); |
1897 | 0 | } |
1898 | |
|
1899 | 0 | num_internal_redirect->increment(); |
1900 | 0 | restore_original_headers.cancel(); |
1901 | | // Preserve the original request URL for the second pass. |
1902 | 0 | downstream_headers.setEnvoyOriginalUrl(absl::StrCat(scheme_is_http |
1903 | 0 | ? Http::Headers::get().SchemeValues.Http |
1904 | 0 | : Http::Headers::get().SchemeValues.Https, |
1905 | 0 | "://", original_host, original_path)); |
1906 | 0 | return true; |
1907 | 0 | } |
1908 | | |
1909 | 0 | void Filter::runRetryOptionsPredicates(UpstreamRequest& retriable_request) { |
1910 | 0 | for (const auto& options_predicate : route_entry_->retryPolicy().retryOptionsPredicates()) { |
1911 | 0 | const Upstream::RetryOptionsPredicate::UpdateOptionsParameters parameters{ |
1912 | 0 | retriable_request.streamInfo(), upstreamSocketOptions()}; |
1913 | 0 | auto ret = options_predicate->updateOptions(parameters); |
1914 | 0 | if (ret.new_upstream_socket_options_.has_value()) { |
1915 | 0 | upstream_options_ = ret.new_upstream_socket_options_.value(); |
1916 | 0 | } |
1917 | 0 | } |
1918 | 0 | } |
1919 | | |
1920 | 0 | void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry) { |
1921 | 0 | ENVOY_STREAM_LOG(debug, "performing retry", *callbacks_); |
1922 | |
|
1923 | 0 | is_retry_ = true; |
1924 | 0 | attempt_count_++; |
1925 | 0 | callbacks_->streamInfo().setAttemptCount(attempt_count_); |
1926 | 0 | ASSERT(pending_retries_ > 0); |
1927 | 0 | pending_retries_--; |
1928 | | |
1929 | | // Clusters can technically get removed by CDS during a retry. Make sure it still exists. |
1930 | 0 | const auto cluster = config_.cm_.getThreadLocalCluster(route_entry_->clusterName()); |
1931 | 0 | std::unique_ptr<GenericConnPool> generic_conn_pool; |
1932 | 0 | if (cluster != nullptr) { |
1933 | 0 | cluster_ = cluster->info(); |
1934 | 0 | generic_conn_pool = createConnPool(*cluster); |
1935 | 0 | } |
1936 | |
|
1937 | 0 | if (!generic_conn_pool) { |
1938 | 0 | sendNoHealthyUpstreamResponse(); |
1939 | 0 | cleanup(); |
1940 | 0 | return; |
1941 | 0 | } |
1942 | 0 | UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>( |
1943 | 0 | *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3); |
1944 | |
|
1945 | 0 | if (include_attempt_count_in_request_) { |
1946 | 0 | downstream_headers_->setEnvoyAttemptCount(attempt_count_); |
1947 | 0 | } |
1948 | |
|
1949 | 0 | if (include_timeout_retry_header_in_request_) { |
1950 | 0 | downstream_headers_->setEnvoyIsTimeoutRetry(is_timeout_retry == TimeoutRetry::Yes ? "true" |
1951 | 0 | : "false"); |
1952 | 0 | } |
1953 | | |
1954 | | // The request timeouts only account for time elapsed since the downstream request completed |
1955 | | // which might not have happened yet (in which case zero time has elapsed.) |
1956 | 0 | std::chrono::milliseconds elapsed_time = std::chrono::milliseconds::zero(); |
1957 | |
|
1958 | 0 | if (DateUtil::timePointValid(downstream_request_complete_time_)) { |
1959 | 0 | Event::Dispatcher& dispatcher = callbacks_->dispatcher(); |
1960 | 0 | elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>( |
1961 | 0 | dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_); |
1962 | 0 | } |
1963 | |
|
1964 | 0 | FilterUtility::setTimeoutHeaders(elapsed_time.count(), timeout_, *route_entry_, |
1965 | 0 | *downstream_headers_, !config_.suppress_envoy_headers_, |
1966 | 0 | grpc_request_, hedging_params_.hedge_on_per_try_timeout_); |
1967 | |
|
1968 | 0 | UpstreamRequest* upstream_request_tmp = upstream_request.get(); |
1969 | 0 | LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_); |
1970 | 0 | upstream_requests_.front()->acceptHeadersFromRouter( |
1971 | 0 | !callbacks_->decodingBuffer() && !downstream_trailers_ && downstream_end_stream_); |
1972 | | // It's possible we got immediately reset which means the upstream request we just |
1973 | | // added to the front of the list might have been removed, so we need to check to make |
1974 | | // sure we don't send data on the wrong request. |
1975 | 0 | if (!upstream_requests_.empty() && (upstream_requests_.front().get() == upstream_request_tmp)) { |
1976 | 0 | if (callbacks_->decodingBuffer()) { |
1977 | | // If we are doing a retry we need to make a copy. |
1978 | 0 | Buffer::OwnedImpl copy(*callbacks_->decodingBuffer()); |
1979 | 0 | upstream_requests_.front()->acceptDataFromRouter(copy, !downstream_trailers_ && |
1980 | 0 | downstream_end_stream_); |
1981 | 0 | } |
1982 | |
|
1983 | 0 | if (downstream_trailers_) { |
1984 | 0 | upstream_requests_.front()->acceptTrailersFromRouter(*downstream_trailers_); |
1985 | 0 | } |
1986 | 0 | } |
1987 | 0 | } |
1988 | | |
1989 | 74 | uint32_t Filter::numRequestsAwaitingHeaders() { |
1990 | 74 | return std::count_if(upstream_requests_.begin(), upstream_requests_.end(), |
1991 | 74 | [](const auto& req) -> bool { return req->awaitingHeaders(); }); |
1992 | 74 | } |
1993 | | |
1994 | | RetryStatePtr |
1995 | | ProdFilter::createRetryState(const RetryPolicy& policy, Http::RequestHeaderMap& request_headers, |
1996 | | const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster, |
1997 | | RouteStatsContextOptRef route_stats_context, Runtime::Loader& runtime, |
1998 | | Random::RandomGenerator& random, Event::Dispatcher& dispatcher, |
1999 | 2.39k | TimeSource& time_source, Upstream::ResourcePriority priority) { |
2000 | 2.39k | std::unique_ptr<RetryStateImpl> retry_state = |
2001 | 2.39k | RetryStateImpl::create(policy, request_headers, cluster, vcluster, route_stats_context, |
2002 | 2.39k | runtime, random, dispatcher, time_source, priority); |
2003 | 2.39k | if (retry_state != nullptr && retry_state->isAutomaticallyConfiguredForHttp3()) { |
2004 | | // Since doing retry will make Envoy to buffer the request body, if upstream using HTTP/3 is the |
2005 | | // only reason for doing retry, set the retry shadow buffer limit to 0 so that we don't retry or |
2006 | | // buffer safe requests with body which is not common. |
2007 | 0 | setRetryShadowBufferLimit(0); |
2008 | 0 | } |
2009 | 2.39k | return retry_state; |
2010 | 2.39k | } |
2011 | | |
2012 | | } // namespace Router |
2013 | | } // namespace Envoy |