Line data Source code
1 : #include "source/common/router/router.h"
2 :
3 : #include <algorithm>
4 : #include <chrono>
5 : #include <cstdint>
6 : #include <functional>
7 : #include <memory>
8 : #include <string>
9 :
10 : #include "envoy/event/dispatcher.h"
11 : #include "envoy/event/timer.h"
12 : #include "envoy/grpc/status.h"
13 : #include "envoy/http/conn_pool.h"
14 : #include "envoy/runtime/runtime.h"
15 : #include "envoy/upstream/cluster_manager.h"
16 : #include "envoy/upstream/health_check_host_monitor.h"
17 : #include "envoy/upstream/upstream.h"
18 :
19 : #include "source/common/common/assert.h"
20 : #include "source/common/common/cleanup.h"
21 : #include "source/common/common/empty_string.h"
22 : #include "source/common/common/enum_to_int.h"
23 : #include "source/common/common/scope_tracker.h"
24 : #include "source/common/common/utility.h"
25 : #include "source/common/config/utility.h"
26 : #include "source/common/grpc/common.h"
27 : #include "source/common/http/codes.h"
28 : #include "source/common/http/header_map_impl.h"
29 : #include "source/common/http/headers.h"
30 : #include "source/common/http/message_impl.h"
31 : #include "source/common/http/utility.h"
32 : #include "source/common/network/application_protocol.h"
33 : #include "source/common/network/socket_option_factory.h"
34 : #include "source/common/network/transport_socket_options_impl.h"
35 : #include "source/common/network/upstream_server_name.h"
36 : #include "source/common/network/upstream_socket_options_filter_state.h"
37 : #include "source/common/network/upstream_subject_alt_names.h"
38 : #include "source/common/router/config_impl.h"
39 : #include "source/common/router/debug_config.h"
40 : #include "source/common/router/retry_state_impl.h"
41 : #include "source/common/runtime/runtime_features.h"
42 : #include "source/common/stream_info/uint32_accessor_impl.h"
43 : #include "source/common/tracing/http_tracer_impl.h"
44 :
45 : namespace Envoy {
46 : namespace Router {
47 : namespace {
48 : constexpr char NumInternalRedirectsFilterStateName[] = "num_internal_redirects";
49 :
50 0 : uint32_t getLength(const Buffer::Instance* instance) { return instance ? instance->length() : 0; }
51 :
52 : bool schemeIsHttp(const Http::RequestHeaderMap& downstream_headers,
53 0 : OptRef<const Network::Connection> connection) {
54 0 : if (Http::Utility::schemeIsHttp(downstream_headers.getSchemeValue())) {
55 0 : return true;
56 0 : }
57 0 : if (connection.has_value() && !connection->ssl()) {
58 0 : return true;
59 0 : }
60 0 : return false;
61 0 : }
62 :
63 : constexpr uint64_t TimeoutPrecisionFactor = 100;
64 :
65 : } // namespace
66 :
67 : FilterConfig::FilterConfig(Stats::StatName stat_prefix,
68 : Server::Configuration::FactoryContext& context,
69 : ShadowWriterPtr&& shadow_writer,
70 : const envoy::extensions::filters::http::router::v3::Router& config)
71 : : FilterConfig(
72 : stat_prefix, context.serverFactoryContext().localInfo(), context.scope(),
73 : context.serverFactoryContext().clusterManager(), context.serverFactoryContext().runtime(),
74 : context.serverFactoryContext().api().randomGenerator(), std::move(shadow_writer),
75 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, dynamic_stats, true), config.start_child_span(),
76 : config.suppress_envoy_headers(), config.respect_expected_rq_timeout(),
77 : config.suppress_grpc_request_failure_code_stats(),
78 : config.has_upstream_log_options()
79 : ? config.upstream_log_options().flush_upstream_log_on_upstream_stream()
80 : : false,
81 : config.strict_check_headers(), context.serverFactoryContext().api().timeSource(),
82 : context.serverFactoryContext().httpContext(),
83 142 : context.serverFactoryContext().routerContext()) {
84 142 : for (const auto& upstream_log : config.upstream_log()) {
85 0 : upstream_logs_.push_back(AccessLog::AccessLogFactory::fromProto(upstream_log, context));
86 0 : }
87 :
88 142 : if (config.has_upstream_log_options() &&
89 142 : config.upstream_log_options().has_upstream_log_flush_interval()) {
90 0 : upstream_log_flush_interval_ = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
91 0 : config.upstream_log_options().upstream_log_flush_interval()));
92 0 : }
93 :
94 142 : if (config.upstream_http_filters_size() > 0) {
95 0 : auto& server_factory_ctx = context.serverFactoryContext();
96 0 : const Http::FilterChainUtility::FiltersList& upstream_http_filters =
97 0 : config.upstream_http_filters();
98 0 : std::shared_ptr<Http::UpstreamFilterConfigProviderManager> filter_config_provider_manager =
99 0 : Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager(
100 0 : server_factory_ctx);
101 0 : std::string prefix = context.scope().symbolTable().toString(context.scope().prefix());
102 0 : upstream_ctx_ = std::make_unique<Upstream::UpstreamFactoryContextImpl>(
103 0 : server_factory_ctx, context.initManager(), context.scope());
104 0 : Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext,
105 0 : Server::Configuration::UpstreamHttpFilterConfigFactory>
106 0 : helper(*filter_config_provider_manager, server_factory_ctx,
107 0 : context.serverFactoryContext().clusterManager(), *upstream_ctx_, prefix);
108 0 : THROW_IF_NOT_OK(helper.processFilters(upstream_http_filters, "router upstream http",
109 0 : "router upstream http", upstream_http_filter_factories_));
110 0 : }
111 142 : }
112 :
113 : // Express percentage as [0, TimeoutPrecisionFactor] because stats do not accept floating point
114 : // values, and getting multiple significant figures on the histogram would be nice.
115 : uint64_t FilterUtility::percentageOfTimeout(const std::chrono::milliseconds response_time,
116 0 : const std::chrono::milliseconds timeout) {
117 : // Timeouts of 0 are considered infinite. Any portion of an infinite timeout used is still
118 : // none of it.
119 0 : if (timeout.count() == 0) {
120 0 : return 0;
121 0 : }
122 :
123 0 : return static_cast<uint64_t>(response_time.count() * TimeoutPrecisionFactor / timeout.count());
124 0 : }
125 :
126 309 : void FilterUtility::setUpstreamScheme(Http::RequestHeaderMap& headers, bool downstream_secure) {
127 309 : if (Http::Utility::schemeIsValid(headers.getSchemeValue())) {
128 183 : return;
129 183 : }
130 : // After all the changes in https://github.com/envoyproxy/envoy/issues/14587
131 : // this path should only occur if a buggy filter has removed the :scheme
132 : // header. In that case best-effort set from X-Forwarded-Proto.
133 126 : absl::string_view xfp = headers.getForwardedProtoValue();
134 126 : if (Http::Utility::schemeIsValid(xfp)) {
135 0 : headers.setScheme(xfp);
136 0 : return;
137 0 : }
138 :
139 126 : if (downstream_secure) {
140 0 : headers.setReferenceScheme(Http::Headers::get().SchemeValues.Https);
141 126 : } else {
142 126 : headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http);
143 126 : }
144 126 : }
145 :
146 : bool FilterUtility::shouldShadow(const ShadowPolicy& policy, Runtime::Loader& runtime,
147 0 : uint64_t stable_random) {
148 :
149 : // The policy's default value is set correctly regardless of whether there is a runtime key
150 : // or not, thus this call is sufficient for all cases (100% if no runtime set, otherwise
151 : // using the default value within the runtime fractional percent setting).
152 0 : return runtime.snapshot().featureEnabled(policy.runtimeKey(), policy.defaultValue(),
153 0 : stable_random);
154 0 : }
155 :
156 : TimeoutData FilterUtility::finalTimeout(const RouteEntry& route,
157 : Http::RequestHeaderMap& request_headers,
158 : bool insert_envoy_expected_request_timeout_ms,
159 : bool grpc_request, bool per_try_timeout_hedging_enabled,
160 251 : bool respect_expected_rq_timeout) {
161 : // See if there is a user supplied timeout in a request header. If there is we take that.
162 : // Otherwise if the request is gRPC and a maximum gRPC timeout is configured we use the timeout
163 : // in the gRPC headers (or infinity when gRPC headers have no timeout), but cap that timeout to
164 : // the configured maximum gRPC timeout (which may also be infinity, represented by a 0 value),
165 : // or the default from the route config otherwise.
166 251 : TimeoutData timeout;
167 251 : if (!route.usingNewTimeouts()) {
168 251 : if (grpc_request && route.maxGrpcTimeout()) {
169 0 : const std::chrono::milliseconds max_grpc_timeout = route.maxGrpcTimeout().value();
170 0 : auto header_timeout = Grpc::Common::getGrpcTimeout(request_headers);
171 0 : std::chrono::milliseconds grpc_timeout =
172 0 : header_timeout ? header_timeout.value() : std::chrono::milliseconds(0);
173 0 : if (route.grpcTimeoutOffset()) {
174 : // We only apply the offset if it won't result in grpc_timeout hitting 0 or below, as
175 : // setting it to 0 means infinity and a negative timeout makes no sense.
176 0 : const auto offset = *route.grpcTimeoutOffset();
177 0 : if (offset < grpc_timeout) {
178 0 : grpc_timeout -= offset;
179 0 : }
180 0 : }
181 :
182 : // Cap gRPC timeout to the configured maximum considering that 0 means infinity.
183 0 : if (max_grpc_timeout != std::chrono::milliseconds(0) &&
184 0 : (grpc_timeout == std::chrono::milliseconds(0) || grpc_timeout > max_grpc_timeout)) {
185 0 : grpc_timeout = max_grpc_timeout;
186 0 : }
187 0 : timeout.global_timeout_ = grpc_timeout;
188 251 : } else {
189 251 : timeout.global_timeout_ = route.timeout();
190 251 : }
191 251 : }
192 251 : timeout.per_try_timeout_ = route.retryPolicy().perTryTimeout();
193 251 : timeout.per_try_idle_timeout_ = route.retryPolicy().perTryIdleTimeout();
194 :
195 251 : uint64_t header_timeout;
196 :
197 251 : if (respect_expected_rq_timeout) {
198 : // Check if there is timeout set by egress Envoy.
199 : // If present, use that value as route timeout and don't override
200 : // *x-envoy-expected-rq-timeout-ms* header. At this point *x-envoy-upstream-rq-timeout-ms*
201 : // header should have been sanitized by egress Envoy.
202 0 : const Http::HeaderEntry* header_expected_timeout_entry =
203 0 : request_headers.EnvoyExpectedRequestTimeoutMs();
204 0 : if (header_expected_timeout_entry) {
205 0 : trySetGlobalTimeout(*header_expected_timeout_entry, timeout);
206 0 : } else {
207 0 : const Http::HeaderEntry* header_timeout_entry =
208 0 : request_headers.EnvoyUpstreamRequestTimeoutMs();
209 :
210 0 : if (header_timeout_entry) {
211 0 : trySetGlobalTimeout(*header_timeout_entry, timeout);
212 0 : request_headers.removeEnvoyUpstreamRequestTimeoutMs();
213 0 : }
214 0 : }
215 251 : } else {
216 251 : const Http::HeaderEntry* header_timeout_entry = request_headers.EnvoyUpstreamRequestTimeoutMs();
217 :
218 251 : if (header_timeout_entry) {
219 0 : trySetGlobalTimeout(*header_timeout_entry, timeout);
220 0 : request_headers.removeEnvoyUpstreamRequestTimeoutMs();
221 0 : }
222 251 : }
223 :
224 : // See if there is a per try/retry timeout. If it's >= global we just ignore it.
225 251 : const absl::string_view per_try_timeout_entry =
226 251 : request_headers.getEnvoyUpstreamRequestPerTryTimeoutMsValue();
227 251 : if (!per_try_timeout_entry.empty()) {
228 0 : if (absl::SimpleAtoi(per_try_timeout_entry, &header_timeout)) {
229 0 : timeout.per_try_timeout_ = std::chrono::milliseconds(header_timeout);
230 0 : }
231 0 : request_headers.removeEnvoyUpstreamRequestPerTryTimeoutMs();
232 0 : }
233 :
234 251 : if (timeout.per_try_timeout_ >= timeout.global_timeout_ && timeout.global_timeout_.count() != 0) {
235 0 : timeout.per_try_timeout_ = std::chrono::milliseconds(0);
236 0 : }
237 :
238 251 : setTimeoutHeaders(0, timeout, route, request_headers, insert_envoy_expected_request_timeout_ms,
239 251 : grpc_request, per_try_timeout_hedging_enabled);
240 :
241 251 : return timeout;
242 251 : }
243 :
244 : void FilterUtility::setTimeoutHeaders(uint64_t elapsed_time, const TimeoutData& timeout,
245 : const RouteEntry& route,
246 : Http::RequestHeaderMap& request_headers,
247 : bool insert_envoy_expected_request_timeout_ms,
248 251 : bool grpc_request, bool per_try_timeout_hedging_enabled) {
249 :
250 251 : const uint64_t global_timeout = timeout.global_timeout_.count();
251 :
252 : // See if there is any timeout to write in the expected timeout header.
253 251 : uint64_t expected_timeout = timeout.per_try_timeout_.count();
254 :
255 : // Use the global timeout if no per try timeout was specified or if we're
256 : // doing hedging when there are per try timeouts. Either of these scenarios
257 : // mean that the upstream server can use the full global timeout.
258 251 : if (per_try_timeout_hedging_enabled || expected_timeout == 0) {
259 251 : expected_timeout = global_timeout;
260 251 : }
261 :
262 : // If the expected timeout is 0 set no timeout, as Envoy treats 0 as infinite timeout.
263 251 : if (expected_timeout > 0) {
264 :
265 183 : if (global_timeout > 0) {
266 183 : if (elapsed_time >= global_timeout) {
267 : // We are out of time, but 0 would be an infinite timeout. So instead we send a 1ms timeout
268 : // and assume the timers armed by onRequestComplete() will fire very soon.
269 0 : expected_timeout = 1;
270 183 : } else {
271 183 : expected_timeout = std::min(expected_timeout, global_timeout - elapsed_time);
272 183 : }
273 183 : }
274 :
275 183 : if (insert_envoy_expected_request_timeout_ms) {
276 183 : request_headers.setEnvoyExpectedRequestTimeoutMs(expected_timeout);
277 183 : }
278 :
279 : // If we've configured max_grpc_timeout, override the grpc-timeout header with
280 : // the expected timeout. This ensures that the optional per try timeout is reflected
281 : // in grpc-timeout, ensuring that the upstream gRPC server is aware of the actual timeout.
282 183 : if (grpc_request && !route.usingNewTimeouts() && route.maxGrpcTimeout()) {
283 0 : Grpc::Common::toGrpcTimeout(std::chrono::milliseconds(expected_timeout), request_headers);
284 0 : }
285 183 : }
286 251 : }
287 :
288 : absl::optional<std::chrono::milliseconds>
289 0 : FilterUtility::tryParseHeaderTimeout(const Http::HeaderEntry& header_timeout_entry) {
290 0 : uint64_t header_timeout;
291 0 : if (absl::SimpleAtoi(header_timeout_entry.value().getStringView(), &header_timeout)) {
292 0 : return std::chrono::milliseconds(header_timeout);
293 0 : }
294 0 : return absl::nullopt;
295 0 : }
296 :
297 : void FilterUtility::trySetGlobalTimeout(const Http::HeaderEntry& header_timeout_entry,
298 0 : TimeoutData& timeout) {
299 0 : const auto timeout_ms = tryParseHeaderTimeout(header_timeout_entry);
300 0 : if (timeout_ms.has_value()) {
301 0 : timeout.global_timeout_ = timeout_ms.value();
302 0 : }
303 0 : }
304 :
305 : FilterUtility::HedgingParams
306 : FilterUtility::finalHedgingParams(const RouteEntry& route,
307 251 : Http::RequestHeaderMap& request_headers) {
308 251 : HedgingParams hedging_params;
309 251 : hedging_params.hedge_on_per_try_timeout_ = route.hedgePolicy().hedgeOnPerTryTimeout();
310 :
311 251 : const Http::HeaderEntry* hedge_on_per_try_timeout_entry =
312 251 : request_headers.EnvoyHedgeOnPerTryTimeout();
313 251 : if (hedge_on_per_try_timeout_entry) {
314 0 : if (hedge_on_per_try_timeout_entry->value() == "true") {
315 0 : hedging_params.hedge_on_per_try_timeout_ = true;
316 0 : }
317 0 : if (hedge_on_per_try_timeout_entry->value() == "false") {
318 0 : hedging_params.hedge_on_per_try_timeout_ = false;
319 0 : }
320 :
321 0 : request_headers.removeEnvoyHedgeOnPerTryTimeout();
322 0 : }
323 :
324 251 : return hedging_params;
325 251 : }
326 :
327 545 : Filter::~Filter() {
328 : // Upstream resources should already have been cleaned.
329 545 : ASSERT(upstream_requests_.empty());
330 545 : ASSERT(!retry_state_);
331 545 : }
332 :
333 : const FilterUtility::StrictHeaderChecker::HeaderCheckResult
334 : FilterUtility::StrictHeaderChecker::checkHeader(Http::RequestHeaderMap& headers,
335 0 : const Http::LowerCaseString& target_header) {
336 0 : if (target_header == Http::Headers::get().EnvoyUpstreamRequestTimeoutMs) {
337 0 : return isInteger(headers.EnvoyUpstreamRequestTimeoutMs());
338 0 : } else if (target_header == Http::Headers::get().EnvoyUpstreamRequestPerTryTimeoutMs) {
339 0 : return isInteger(headers.EnvoyUpstreamRequestPerTryTimeoutMs());
340 0 : } else if (target_header == Http::Headers::get().EnvoyMaxRetries) {
341 0 : return isInteger(headers.EnvoyMaxRetries());
342 0 : } else if (target_header == Http::Headers::get().EnvoyRetryOn) {
343 0 : return hasValidRetryFields(headers.EnvoyRetryOn(), &Router::RetryStateImpl::parseRetryOn);
344 0 : } else if (target_header == Http::Headers::get().EnvoyRetryGrpcOn) {
345 0 : return hasValidRetryFields(headers.EnvoyRetryGrpcOn(),
346 0 : &Router::RetryStateImpl::parseRetryGrpcOn);
347 0 : }
348 : // Should only validate headers for which we have implemented a validator.
349 0 : PANIC("unexpectedly reached");
350 0 : }
351 :
352 272 : Stats::StatName Filter::upstreamZone(Upstream::HostDescriptionConstSharedPtr upstream_host) {
353 272 : return upstream_host ? upstream_host->localityZoneStatName() : config_.empty_stat_name_;
354 272 : }
355 :
356 : void Filter::chargeUpstreamCode(uint64_t response_status_code,
357 : const Http::ResponseHeaderMap& response_headers,
358 : Upstream::HostDescriptionConstSharedPtr upstream_host,
359 199 : bool dropped) {
360 : // Passing the response_status_code explicitly is an optimization to avoid
361 : // multiple calls to slow Http::Utility::getResponseStatus.
362 199 : ASSERT(response_status_code == Http::Utility::getResponseStatus(response_headers));
363 199 : if (config_.emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck()) {
364 199 : const Http::HeaderEntry* upstream_canary_header = response_headers.EnvoyUpstreamCanary();
365 199 : const bool is_canary = (upstream_canary_header && upstream_canary_header->value() == "true") ||
366 199 : (upstream_host ? upstream_host->canary() : false);
367 199 : const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_);
368 :
369 199 : Stats::StatName upstream_zone = upstreamZone(upstream_host);
370 199 : Http::CodeStats::ResponseStatInfo info{
371 199 : config_.scope_,
372 199 : cluster_->statsScope(),
373 199 : config_.empty_stat_name_,
374 199 : response_status_code,
375 199 : internal_request,
376 199 : route_entry_->virtualHost().statName(),
377 199 : request_vcluster_ ? request_vcluster_->statName() : config_.empty_stat_name_,
378 199 : route_stats_context_.has_value() ? route_stats_context_->statName()
379 199 : : config_.empty_stat_name_,
380 199 : config_.zone_name_,
381 199 : upstream_zone,
382 199 : is_canary};
383 :
384 199 : Http::CodeStats& code_stats = httpContext().codeStats();
385 199 : code_stats.chargeResponseStat(info, exclude_http_code_stats_);
386 :
387 199 : if (alt_stat_prefix_ != nullptr) {
388 0 : Http::CodeStats::ResponseStatInfo alt_info{config_.scope_,
389 0 : cluster_->statsScope(),
390 0 : alt_stat_prefix_->statName(),
391 0 : response_status_code,
392 0 : internal_request,
393 0 : config_.empty_stat_name_,
394 0 : config_.empty_stat_name_,
395 0 : config_.empty_stat_name_,
396 0 : config_.zone_name_,
397 0 : upstream_zone,
398 0 : is_canary};
399 0 : code_stats.chargeResponseStat(alt_info, exclude_http_code_stats_);
400 0 : }
401 :
402 199 : if (dropped) {
403 0 : cluster_->loadReportStats().upstream_rq_dropped_.inc();
404 0 : }
405 199 : if (upstream_host && Http::CodeUtility::is5xx(response_status_code)) {
406 58 : upstream_host->stats().rq_error_.inc();
407 58 : }
408 199 : }
409 199 : }
410 :
411 : void Filter::chargeUpstreamCode(Http::Code code,
412 : Upstream::HostDescriptionConstSharedPtr upstream_host,
413 58 : bool dropped) {
414 58 : const uint64_t response_status_code = enumToInt(code);
415 58 : const auto fake_response_headers = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(
416 58 : {{Http::Headers::get().Status, std::to_string(response_status_code)}});
417 58 : chargeUpstreamCode(response_status_code, *fake_response_headers, upstream_host, dropped);
418 58 : }
419 :
420 459 : Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
421 459 : downstream_headers_ = &headers;
422 :
423 : // Extract debug configuration from filter state. This is used further along to determine whether
424 : // we should append cluster and host headers to the response, and whether to forward the request
425 : // upstream.
426 459 : const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
427 459 : const DebugConfig* debug_config = filter_state->getDataReadOnly<DebugConfig>(DebugConfig::key());
428 :
429 : // TODO: Maybe add a filter API for this.
430 459 : grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
431 459 : exclude_http_code_stats_ = grpc_request_ && config_.suppress_grpc_request_failure_code_stats_;
432 :
433 : // Only increment rq total stat if we actually decode headers here. This does not count requests
434 : // that get handled by earlier filters.
435 459 : stats_.rq_total_.inc();
436 :
437 : // Initialize the `modify_headers` function as a no-op (so we don't have to remember to check it
438 : // against nullptr before calling it), and feed it behavior later if/when we have cluster info
439 : // headers to append.
440 459 : std::function<void(Http::ResponseHeaderMap&)> modify_headers = [](Http::ResponseHeaderMap&) {};
441 :
442 : // Determine if there is a route entry or a direct response for the request.
443 459 : route_ = callbacks_->route();
444 459 : if (!route_) {
445 0 : stats_.no_route_.inc();
446 0 : ENVOY_STREAM_LOG(debug, "no route match for URL '{}'", *callbacks_, headers.getPathValue());
447 :
448 0 : callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound);
449 0 : callbacks_->sendLocalReply(Http::Code::NotFound, "", modify_headers, absl::nullopt,
450 0 : StreamInfo::ResponseCodeDetails::get().RouteNotFound);
451 0 : return Http::FilterHeadersStatus::StopIteration;
452 0 : }
453 :
454 : // Determine if there is a direct response for the request.
455 459 : const auto* direct_response = route_->directResponseEntry();
456 459 : if (direct_response != nullptr) {
457 202 : stats_.rq_direct_response_.inc();
458 202 : direct_response->rewritePathHeader(headers, !config_.suppress_envoy_headers_);
459 202 : callbacks_->sendLocalReply(
460 202 : direct_response->responseCode(), direct_response->responseBody(),
461 202 : [this, direct_response,
462 202 : &request_headers = headers](Http::ResponseHeaderMap& response_headers) -> void {
463 202 : std::string new_uri;
464 202 : if (request_headers.Path()) {
465 202 : new_uri = direct_response->newUri(request_headers);
466 202 : }
467 : // See https://tools.ietf.org/html/rfc7231#section-7.1.2.
468 202 : const auto add_location =
469 202 : direct_response->responseCode() == Http::Code::Created ||
470 202 : Http::CodeUtility::is3xx(enumToInt(direct_response->responseCode()));
471 202 : if (!new_uri.empty() && add_location) {
472 0 : response_headers.addReferenceKey(Http::Headers::get().Location, new_uri);
473 0 : }
474 202 : direct_response->finalizeResponseHeaders(response_headers, callbacks_->streamInfo());
475 202 : },
476 202 : absl::nullopt, StreamInfo::ResponseCodeDetails::get().DirectResponse);
477 202 : return Http::FilterHeadersStatus::StopIteration;
478 202 : }
479 :
480 : // A route entry matches for the request.
481 257 : route_entry_ = route_->routeEntry();
482 : // If there's a route specific limit and it's smaller than general downstream
483 : // limits, apply the new cap.
484 257 : retry_shadow_buffer_limit_ =
485 257 : std::min(retry_shadow_buffer_limit_, route_entry_->retryShadowBufferLimit());
486 257 : if (debug_config && debug_config->append_cluster_) {
487 : // The cluster name will be appended to any local or upstream responses from this point.
488 0 : modify_headers = [this, debug_config](Http::ResponseHeaderMap& headers) {
489 0 : headers.addCopy(debug_config->cluster_header_.value_or(Http::Headers::get().EnvoyCluster),
490 0 : route_entry_->clusterName());
491 0 : };
492 0 : }
493 257 : Upstream::ThreadLocalCluster* cluster =
494 257 : config_.cm_.getThreadLocalCluster(route_entry_->clusterName());
495 257 : if (!cluster) {
496 6 : stats_.no_cluster_.inc();
497 6 : ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName());
498 :
499 6 : callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoClusterFound);
500 6 : callbacks_->sendLocalReply(route_entry_->clusterNotFoundResponseCode(), "", modify_headers,
501 6 : absl::nullopt,
502 6 : StreamInfo::ResponseCodeDetails::get().ClusterNotFound);
503 6 : return Http::FilterHeadersStatus::StopIteration;
504 6 : }
505 251 : cluster_ = cluster->info();
506 :
507 : // Set up stat prefixes, etc.
508 251 : request_vcluster_ = route_entry_->virtualCluster(headers);
509 251 : if (request_vcluster_ != nullptr) {
510 0 : callbacks_->streamInfo().setVirtualClusterName(request_vcluster_->name());
511 0 : }
512 251 : route_stats_context_ = route_entry_->routeStatsContext();
513 251 : ENVOY_STREAM_LOG(debug, "cluster '{}' match for URL '{}'", *callbacks_,
514 251 : route_entry_->clusterName(), headers.getPathValue());
515 :
516 251 : if (config_.strict_check_headers_ != nullptr) {
517 0 : for (const auto& header : *config_.strict_check_headers_) {
518 0 : const auto res = FilterUtility::StrictHeaderChecker::checkHeader(headers, header);
519 0 : if (!res.valid_) {
520 0 : callbacks_->streamInfo().setResponseFlag(
521 0 : StreamInfo::ResponseFlag::InvalidEnvoyRequestHeaders);
522 0 : const std::string body = fmt::format("invalid header '{}' with value '{}'",
523 0 : std::string(res.entry_->key().getStringView()),
524 0 : std::string(res.entry_->value().getStringView()));
525 0 : const std::string details =
526 0 : absl::StrCat(StreamInfo::ResponseCodeDetails::get().InvalidEnvoyRequestHeaders, "{",
527 0 : StringUtil::replaceAllEmptySpace(res.entry_->key().getStringView()), "}");
528 0 : callbacks_->sendLocalReply(Http::Code::BadRequest, body, nullptr, absl::nullopt, details);
529 0 : return Http::FilterHeadersStatus::StopIteration;
530 0 : }
531 0 : }
532 0 : }
533 :
534 251 : const Http::HeaderEntry* request_alt_name = headers.EnvoyUpstreamAltStatName();
535 251 : if (request_alt_name) {
536 0 : alt_stat_prefix_ = std::make_unique<Stats::StatNameDynamicStorage>(
537 0 : request_alt_name->value().getStringView(), config_.scope_.symbolTable());
538 0 : headers.removeEnvoyUpstreamAltStatName();
539 0 : }
540 :
541 : // See if we are supposed to immediately kill some percentage of this cluster's traffic.
542 251 : if (cluster_->maintenanceMode()) {
543 0 : callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
544 0 : chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true);
545 0 : callbacks_->sendLocalReply(
546 0 : Http::Code::ServiceUnavailable, "maintenance mode",
547 0 : [modify_headers, this](Http::ResponseHeaderMap& headers) {
548 0 : if (!config_.suppress_envoy_headers_) {
549 0 : headers.addReference(Http::Headers::get().EnvoyOverloaded,
550 0 : Http::Headers::get().EnvoyOverloadedValues.True);
551 0 : }
552 : // Note: append_cluster_info does not respect suppress_envoy_headers.
553 0 : modify_headers(headers);
554 0 : },
555 0 : absl::nullopt, StreamInfo::ResponseCodeDetails::get().MaintenanceMode);
556 0 : cluster_->trafficStats()->upstream_rq_maintenance_mode_.inc();
557 0 : return Http::FilterHeadersStatus::StopIteration;
558 0 : }
559 :
560 : // Support DROP_OVERLOAD config from control plane to drop certain percentage of traffic.
561 251 : if (checkDropOverload(*cluster, modify_headers)) {
562 0 : return Http::FilterHeadersStatus::StopIteration;
563 0 : }
564 :
565 : // Fetch a connection pool for the upstream cluster.
566 251 : const auto& upstream_http_protocol_options = cluster_->upstreamHttpProtocolOptions();
567 :
568 251 : if (upstream_http_protocol_options.has_value() &&
569 251 : (upstream_http_protocol_options.value().auto_sni() ||
570 0 : upstream_http_protocol_options.value().auto_san_validation())) {
571 : // Default the header to Host/Authority header.
572 0 : absl::string_view header_value = headers.getHostValue();
573 :
574 : // Check whether `override_auto_sni_header` is specified.
575 0 : const auto override_auto_sni_header =
576 0 : upstream_http_protocol_options.value().override_auto_sni_header();
577 0 : if (!override_auto_sni_header.empty()) {
578 : // Use the header value from `override_auto_sni_header` to set the SNI value.
579 0 : const auto overridden_header_value = Http::HeaderUtility::getAllOfHeaderAsString(
580 0 : headers, Http::LowerCaseString(override_auto_sni_header));
581 0 : if (overridden_header_value.result().has_value() &&
582 0 : !overridden_header_value.result().value().empty()) {
583 0 : header_value = overridden_header_value.result().value();
584 0 : }
585 0 : }
586 0 : const auto parsed_authority = Http::Utility::parseAuthority(header_value);
587 0 : bool should_set_sni = !parsed_authority.is_ip_address_;
588 : // `host_` returns a string_view so doing this should be safe.
589 0 : absl::string_view sni_value = parsed_authority.host_;
590 :
591 0 : if (should_set_sni && upstream_http_protocol_options.value().auto_sni() &&
592 0 : !callbacks_->streamInfo().filterState()->hasDataWithName(
593 0 : Network::UpstreamServerName::key())) {
594 0 : callbacks_->streamInfo().filterState()->setData(
595 0 : Network::UpstreamServerName::key(),
596 0 : std::make_unique<Network::UpstreamServerName>(sni_value),
597 0 : StreamInfo::FilterState::StateType::Mutable);
598 0 : }
599 :
600 0 : if (upstream_http_protocol_options.value().auto_san_validation() &&
601 0 : !callbacks_->streamInfo().filterState()->hasDataWithName(
602 0 : Network::UpstreamSubjectAltNames::key())) {
603 0 : callbacks_->streamInfo().filterState()->setData(
604 0 : Network::UpstreamSubjectAltNames::key(),
605 0 : std::make_unique<Network::UpstreamSubjectAltNames>(
606 0 : std::vector<std::string>{std::string(sni_value)}),
607 0 : StreamInfo::FilterState::StateType::Mutable);
608 0 : }
609 0 : }
610 :
611 251 : transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState(
612 251 : *callbacks_->streamInfo().filterState());
613 :
614 251 : if (auto downstream_connection = downstreamConnection(); downstream_connection != nullptr) {
615 183 : if (auto typed_state = downstream_connection->streamInfo()
616 183 : .filterState()
617 183 : .getDataReadOnly<Network::UpstreamSocketOptionsFilterState>(
618 183 : Network::UpstreamSocketOptionsFilterState::key());
619 183 : typed_state != nullptr) {
620 0 : auto downstream_options = typed_state->value();
621 0 : if (!upstream_options_) {
622 0 : upstream_options_ = std::make_shared<Network::Socket::Options>();
623 0 : }
624 0 : Network::Socket::appendOptions(upstream_options_, downstream_options);
625 0 : }
626 183 : }
627 :
628 251 : if (upstream_options_ && callbacks_->getUpstreamSocketOptions()) {
629 0 : Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions());
630 0 : }
631 :
632 251 : std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
633 :
634 251 : if (!generic_conn_pool) {
635 0 : sendNoHealthyUpstreamResponse();
636 0 : return Http::FilterHeadersStatus::StopIteration;
637 0 : }
638 251 : Upstream::HostDescriptionConstSharedPtr host = generic_conn_pool->host();
639 :
640 251 : if (debug_config && debug_config->append_upstream_host_) {
641 : // The hostname and address will be appended to any local or upstream responses from this point,
642 : // possibly in addition to the cluster name.
643 0 : modify_headers = [modify_headers, debug_config, host](Http::ResponseHeaderMap& headers) {
644 0 : modify_headers(headers);
645 0 : headers.addCopy(
646 0 : debug_config->hostname_header_.value_or(Http::Headers::get().EnvoyUpstreamHostname),
647 0 : host->hostname());
648 0 : headers.addCopy(debug_config->host_address_header_.value_or(
649 0 : Http::Headers::get().EnvoyUpstreamHostAddress),
650 0 : host->address()->asString());
651 0 : };
652 0 : }
653 :
654 : // If we've been instructed not to forward the request upstream, send an empty local response.
655 251 : if (debug_config && debug_config->do_not_forward_) {
656 0 : modify_headers = [modify_headers, debug_config](Http::ResponseHeaderMap& headers) {
657 0 : modify_headers(headers);
658 0 : headers.addCopy(
659 0 : debug_config->not_forwarded_header_.value_or(Http::Headers::get().EnvoyNotForwarded),
660 0 : "true");
661 0 : };
662 0 : callbacks_->sendLocalReply(Http::Code::NoContent, "", modify_headers, absl::nullopt, "");
663 0 : return Http::FilterHeadersStatus::StopIteration;
664 0 : }
665 :
666 251 : hedging_params_ = FilterUtility::finalHedgingParams(*route_entry_, headers);
667 :
668 251 : timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_.suppress_envoy_headers_,
669 251 : grpc_request_, hedging_params_.hedge_on_per_try_timeout_,
670 251 : config_.respect_expected_rq_timeout_);
671 :
672 251 : const Http::HeaderEntry* header_max_stream_duration_entry =
673 251 : headers.EnvoyUpstreamStreamDurationMs();
674 251 : if (header_max_stream_duration_entry) {
675 0 : dynamic_max_stream_duration_ =
676 0 : FilterUtility::tryParseHeaderTimeout(*header_max_stream_duration_entry);
677 0 : headers.removeEnvoyUpstreamStreamDurationMs();
678 0 : }
679 :
680 : // If this header is set with any value, use an alternate response code on timeout
681 251 : if (headers.EnvoyUpstreamRequestTimeoutAltResponse()) {
682 0 : timeout_response_code_ = Http::Code::NoContent;
683 0 : headers.removeEnvoyUpstreamRequestTimeoutAltResponse();
684 0 : }
685 :
686 251 : include_attempt_count_in_request_ = route_entry_->includeAttemptCountInRequest();
687 251 : if (include_attempt_count_in_request_) {
688 0 : headers.setEnvoyAttemptCount(attempt_count_);
689 0 : }
690 :
691 : // The router has reached a point where it is going to try to send a request upstream,
692 : // so now modify_headers should attach x-envoy-attempt-count to the downstream response if the
693 : // config flag is true.
694 251 : if (route_entry_->includeAttemptCountInResponse()) {
695 0 : modify_headers = [modify_headers, this](Http::ResponseHeaderMap& headers) {
696 0 : modify_headers(headers);
697 :
698 : // This header is added without checking for config_.suppress_envoy_headers_ to mirror what is
699 : // done for upstream requests.
700 0 : headers.setEnvoyAttemptCount(attempt_count_);
701 0 : };
702 0 : }
703 251 : callbacks_->streamInfo().setAttemptCount(attempt_count_);
704 :
705 251 : route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(),
706 251 : !config_.suppress_envoy_headers_);
707 251 : FilterUtility::setUpstreamScheme(
708 251 : headers, callbacks_->streamInfo().downstreamAddressProvider().sslConnection() != nullptr);
709 :
710 : // Ensure an http transport scheme is selected before continuing with decoding.
711 251 : ASSERT(headers.Scheme());
712 :
713 251 : retry_state_ =
714 251 : createRetryState(route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_,
715 251 : route_stats_context_, config_.runtime_, config_.random_,
716 251 : callbacks_->dispatcher(), config_.timeSource(), route_entry_->priority());
717 :
718 : // Determine which shadow policies to use. It's possible that we don't do any shadowing due to
719 : // runtime keys. Also the method CONNECT doesn't support shadowing.
720 251 : auto method = headers.getMethodValue();
721 251 : if (method != Http::Headers::get().MethodValues.Connect) {
722 251 : for (const auto& shadow_policy : route_entry_->shadowPolicies()) {
723 0 : const auto& policy_ref = *shadow_policy;
724 0 : if (FilterUtility::shouldShadow(policy_ref, config_.runtime_, callbacks_->streamId())) {
725 0 : active_shadow_policies_.push_back(std::cref(policy_ref));
726 0 : shadow_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*downstream_headers_);
727 0 : }
728 0 : }
729 251 : }
730 :
731 251 : ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers);
732 :
733 : // Hang onto the modify_headers function for later use in handling upstream responses.
734 251 : modify_headers_ = modify_headers;
735 :
736 251 : const bool can_send_early_data =
737 251 : route_entry_->earlyDataPolicy().allowsEarlyDataForRequest(*downstream_headers_);
738 :
739 251 : include_timeout_retry_header_in_request_ =
740 251 : route_entry_->virtualHost().includeIsTimeoutRetryHeader();
741 :
742 : // Set initial HTTP/3 use based on the presence of HTTP/1.1 proxy config.
743 : // For retries etc, HTTP/3 usability may transition from true to false, but
744 : // will never transition from false to true.
745 251 : bool can_use_http3 =
746 251 : !transport_socket_options_ || !transport_socket_options_->http11ProxyInfo().has_value();
747 251 : UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
748 251 : *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3);
749 251 : LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
750 251 : upstream_requests_.front()->acceptHeadersFromRouter(end_stream);
751 251 : if (streaming_shadows_) {
752 : // start the shadow streams.
753 0 : for (const auto& shadow_policy_wrapper : active_shadow_policies_) {
754 0 : const auto& shadow_policy = shadow_policy_wrapper.get();
755 0 : const absl::optional<absl::string_view> shadow_cluster_name =
756 0 : getShadowCluster(shadow_policy, *downstream_headers_);
757 0 : if (!shadow_cluster_name.has_value()) {
758 0 : continue;
759 0 : }
760 0 : auto shadow_headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_);
761 0 : auto options =
762 0 : Http::AsyncClient::RequestOptions()
763 0 : .setTimeout(timeout_.global_timeout_)
764 0 : .setParentSpan(callbacks_->activeSpan())
765 0 : .setChildSpanName("mirror")
766 0 : .setSampled(shadow_policy.traceSampled())
767 0 : .setIsShadow(true)
768 0 : .setBufferAccount(callbacks_->account())
769 : // A buffer limit of 1 is set in the case that retry_shadow_buffer_limit_ == 0,
770 : // because a buffer limit of zero on async clients is interpreted as no buffer limit.
771 0 : .setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_);
772 0 : options.setFilterConfig(config_);
773 0 : if (end_stream) {
774 : // This is a header-only request, and can be dispatched immediately to the shadow
775 : // without waiting.
776 0 : Http::RequestMessagePtr request(new Http::RequestMessageImpl(
777 0 : Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_)));
778 0 : config_.shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request),
779 0 : options);
780 0 : } else {
781 0 : Http::AsyncClient::OngoingRequest* shadow_stream = config_.shadowWriter().streamingShadow(
782 0 : std::string(shadow_cluster_name.value()), std::move(shadow_headers), options);
783 0 : if (shadow_stream != nullptr) {
784 0 : shadow_streams_.insert(shadow_stream);
785 0 : shadow_stream->setDestructorCallback(
786 0 : [this, shadow_stream]() { shadow_streams_.erase(shadow_stream); });
787 0 : shadow_stream->setWatermarkCallbacks(*callbacks_);
788 0 : }
789 0 : }
790 0 : }
791 0 : }
792 251 : if (end_stream) {
793 92 : onRequestComplete();
794 92 : }
795 :
796 251 : return Http::FilterHeadersStatus::StopIteration;
797 251 : }
798 :
799 : std::unique_ptr<GenericConnPool>
800 251 : Filter::createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster) {
801 251 : GenericConnPoolFactory* factory = nullptr;
802 251 : if (cluster_->upstreamConfig().has_value()) {
803 0 : factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>(
804 0 : cluster_->upstreamConfig().ref());
805 0 : ENVOY_BUG(factory != nullptr,
806 0 : fmt::format("invalid factory type '{}', failing over to default upstream",
807 0 : cluster_->upstreamConfig().ref().DebugString()));
808 0 : }
809 251 : if (!factory) {
810 251 : factory = &config_.router_context_.genericConnPoolFactory();
811 251 : }
812 :
813 251 : using UpstreamProtocol = Envoy::Router::GenericConnPoolFactory::UpstreamProtocol;
814 251 : UpstreamProtocol upstream_protocol = UpstreamProtocol::HTTP;
815 251 : if (route_entry_->connectConfig().has_value()) {
816 0 : auto method = downstream_headers_->getMethodValue();
817 0 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.enable_connect_udp_support") &&
818 0 : Http::HeaderUtility::isConnectUdpRequest(*downstream_headers_)) {
819 0 : upstream_protocol = UpstreamProtocol::UDP;
820 0 : } else if (method == Http::Headers::get().MethodValues.Connect ||
821 0 : (route_entry_->connectConfig()->allow_post() &&
822 0 : method == Http::Headers::get().MethodValues.Post)) {
823 : // Allow POST for proxying raw TCP if it is configured.
824 0 : upstream_protocol = UpstreamProtocol::TCP;
825 0 : }
826 0 : }
827 251 : return factory->createGenericConnPool(thread_local_cluster, upstream_protocol,
828 251 : route_entry_->priority(),
829 251 : callbacks_->streamInfo().protocol(), this);
830 251 : }
831 :
832 0 : void Filter::sendNoHealthyUpstreamResponse() {
833 0 : callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream);
834 0 : chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, false);
835 0 : callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream", modify_headers_,
836 0 : absl::nullopt,
837 0 : StreamInfo::ResponseCodeDetails::get().NoHealthyUpstream);
838 0 : }
839 :
840 596 : Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) {
841 : // upstream_requests_.size() cannot be > 1 because that only happens when a per
842 : // try timeout occurs with hedge_on_per_try_timeout enabled but the per
843 : // try timeout timer is not started until onRequestComplete(). It could be zero
844 : // if the first request attempt has already failed and a retry is waiting for
845 : // a backoff timer.
846 596 : ASSERT(upstream_requests_.size() <= 1);
847 :
848 596 : bool buffering = (retry_state_ && retry_state_->enabled()) ||
849 596 : (!active_shadow_policies_.empty() && !streaming_shadows_) ||
850 596 : (route_entry_ && route_entry_->internalRedirectPolicy().enabled());
851 596 : if (buffering &&
852 596 : getLength(callbacks_->decodingBuffer()) + data.length() > retry_shadow_buffer_limit_) {
853 0 : ENVOY_LOG(debug,
854 0 : "The request payload has at least {} bytes data which exceeds buffer limit {}. Give "
855 0 : "up on the retry/shadow.",
856 0 : getLength(callbacks_->decodingBuffer()) + data.length(), retry_shadow_buffer_limit_);
857 0 : cluster_->trafficStats()->retry_or_shadow_abandoned_.inc();
858 0 : retry_state_.reset();
859 0 : buffering = false;
860 0 : active_shadow_policies_.clear();
861 0 : request_buffer_overflowed_ = true;
862 :
863 : // If we had to abandon buffering and there's no request in progress, abort the request and
864 : // clean up. This happens if the initial upstream request failed, and we are currently waiting
865 : // for a backoff timer before starting the next upstream attempt.
866 0 : if (upstream_requests_.empty()) {
867 0 : cleanup();
868 0 : callbacks_->sendLocalReply(
869 0 : Http::Code::InsufficientStorage, "exceeded request buffer limit while retrying upstream",
870 0 : modify_headers_, absl::nullopt,
871 0 : StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit);
872 0 : return Http::FilterDataStatus::StopIterationNoBuffer;
873 0 : }
874 0 : }
875 :
876 : // If we aren't buffering and there is no active request, an abort should have occurred
877 : // already.
878 596 : ASSERT(buffering || !upstream_requests_.empty());
879 :
880 596 : for (auto* shadow_stream : shadow_streams_) {
881 0 : if (end_stream) {
882 0 : shadow_stream->removeDestructorCallback();
883 0 : shadow_stream->removeWatermarkCallbacks();
884 0 : }
885 0 : Buffer::OwnedImpl copy(data);
886 0 : shadow_stream->sendData(copy, end_stream);
887 0 : }
888 596 : if (end_stream) {
889 104 : shadow_streams_.clear();
890 104 : }
891 596 : if (buffering) {
892 0 : if (!upstream_requests_.empty()) {
893 0 : Buffer::OwnedImpl copy(data);
894 0 : upstream_requests_.front()->acceptDataFromRouter(copy, end_stream);
895 0 : }
896 :
897 : // If we are potentially going to retry or buffer shadow this request we need to buffer.
898 : // This will not cause the connection manager to 413 because before we hit the
899 : // buffer limit we give up on retries and buffering. We must buffer using addDecodedData()
900 : // so that all buffered data is available by the time we do request complete processing and
901 : // potentially shadow. Additionally, we can't do a copy here because there's a check down
902 : // this stack for whether `data` is the same buffer as already buffered data.
903 0 : callbacks_->addDecodedData(data, true);
904 596 : } else {
905 596 : upstream_requests_.front()->acceptDataFromRouter(data, end_stream);
906 596 : }
907 :
908 596 : if (end_stream) {
909 104 : onRequestComplete();
910 104 : }
911 :
912 596 : return Http::FilterDataStatus::StopIterationNoBuffer;
913 596 : }
914 :
915 0 : Http::FilterTrailersStatus Filter::decodeTrailers(Http::RequestTrailerMap& trailers) {
916 0 : ENVOY_STREAM_LOG(debug, "router decoding trailers:\n{}", *callbacks_, trailers);
917 :
918 0 : if (shadow_headers_) {
919 0 : shadow_trailers_ = Http::createHeaderMap<Http::RequestTrailerMapImpl>(trailers);
920 0 : }
921 :
922 : // upstream_requests_.size() cannot be > 1 because that only happens when a per
923 : // try timeout occurs with hedge_on_per_try_timeout enabled but the per
924 : // try timeout timer is not started until onRequestComplete(). It could be zero
925 : // if the first request attempt has already failed and a retry is waiting for
926 : // a backoff timer.
927 0 : ASSERT(upstream_requests_.size() <= 1);
928 0 : downstream_trailers_ = &trailers;
929 0 : if (!upstream_requests_.empty()) {
930 0 : upstream_requests_.front()->acceptTrailersFromRouter(trailers);
931 0 : }
932 0 : for (auto* shadow_stream : shadow_streams_) {
933 0 : shadow_stream->removeDestructorCallback();
934 0 : shadow_stream->removeWatermarkCallbacks();
935 0 : shadow_stream->captureAndSendTrailers(
936 0 : Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_));
937 0 : }
938 0 : shadow_streams_.clear();
939 :
940 0 : onRequestComplete();
941 0 : return Http::FilterTrailersStatus::StopIteration;
942 0 : }
943 :
944 0 : Http::FilterMetadataStatus Filter::decodeMetadata(Http::MetadataMap& metadata_map) {
945 0 : Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
946 0 : if (!upstream_requests_.empty()) {
947 : // TODO(soya3129): Save metadata for retry, redirect and shadowing case.
948 0 : upstream_requests_.front()->acceptMetadataFromRouter(std::move(metadata_map_ptr));
949 0 : }
950 0 : return Http::FilterMetadataStatus::Continue;
951 0 : }
952 :
953 545 : void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
954 545 : callbacks_ = &callbacks;
955 : // As the decoder filter only pushes back via watermarks once data has reached
956 : // it, it can latch the current buffer limit and does not need to update the
957 : // limit if another filter increases it.
958 : //
959 : // The default is "do not limit". If there are configured (non-zero) buffer
960 : // limits, apply them here.
961 545 : if (callbacks_->decoderBufferLimit() != 0) {
962 471 : retry_shadow_buffer_limit_ = callbacks_->decoderBufferLimit();
963 471 : }
964 545 : }
965 :
966 769 : void Filter::cleanup() {
967 : // All callers of cleanup() should have cleaned out the upstream_requests_
968 : // list as appropriate.
969 769 : ASSERT(upstream_requests_.empty());
970 :
971 769 : retry_state_.reset();
972 769 : if (response_timeout_) {
973 181 : response_timeout_->disableTimer();
974 181 : response_timeout_.reset();
975 181 : }
976 769 : }
977 :
978 : absl::optional<absl::string_view> Filter::getShadowCluster(const ShadowPolicy& policy,
979 0 : const Http::HeaderMap& headers) const {
980 0 : if (!policy.cluster().empty()) {
981 0 : return policy.cluster();
982 0 : } else {
983 0 : ASSERT(!policy.clusterHeader().get().empty());
984 0 : const auto entry = headers.get(policy.clusterHeader());
985 0 : if (!entry.empty() && !entry[0]->value().empty()) {
986 0 : return entry[0]->value().getStringView();
987 0 : }
988 0 : ENVOY_STREAM_LOG(debug, "There is no cluster name in header: {}", *callbacks_,
989 0 : policy.clusterHeader());
990 0 : return absl::nullopt;
991 0 : }
992 0 : }
993 :
994 196 : void Filter::maybeDoShadowing() {
995 196 : for (const auto& shadow_policy_wrapper : active_shadow_policies_) {
996 0 : const auto& shadow_policy = shadow_policy_wrapper.get();
997 :
998 0 : const absl::optional<absl::string_view> shadow_cluster_name =
999 0 : getShadowCluster(shadow_policy, *downstream_headers_);
1000 :
1001 : // The cluster name got from headers is empty.
1002 0 : if (!shadow_cluster_name.has_value()) {
1003 0 : continue;
1004 0 : }
1005 :
1006 0 : Http::RequestMessagePtr request(new Http::RequestMessageImpl(
1007 0 : Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_)));
1008 0 : if (callbacks_->decodingBuffer()) {
1009 0 : request->body().add(*callbacks_->decodingBuffer());
1010 0 : }
1011 0 : if (shadow_trailers_) {
1012 0 : request->trailers(Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_));
1013 0 : }
1014 :
1015 0 : auto options = Http::AsyncClient::RequestOptions()
1016 0 : .setTimeout(timeout_.global_timeout_)
1017 0 : .setParentSpan(callbacks_->activeSpan())
1018 0 : .setChildSpanName("mirror")
1019 0 : .setSampled(shadow_policy.traceSampled())
1020 0 : .setIsShadow(true);
1021 0 : options.setFilterConfig(config_);
1022 0 : config_.shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request),
1023 0 : options);
1024 0 : }
1025 196 : }
1026 :
1027 196 : void Filter::onRequestComplete() {
1028 : // This should be called exactly once, when the downstream request has been received in full.
1029 196 : ASSERT(!downstream_end_stream_);
1030 196 : downstream_end_stream_ = true;
1031 196 : Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1032 196 : downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime();
1033 :
1034 : // Possible that we got an immediate reset.
1035 196 : if (!upstream_requests_.empty()) {
1036 : // Even if we got an immediate reset, we could still shadow, but that is a riskier change and
1037 : // seems unnecessary right now.
1038 196 : if (!streaming_shadows_) {
1039 196 : maybeDoShadowing();
1040 196 : }
1041 :
1042 196 : if (timeout_.global_timeout_.count() > 0) {
1043 181 : response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); });
1044 181 : response_timeout_->enableTimer(timeout_.global_timeout_);
1045 181 : }
1046 :
1047 196 : for (auto& upstream_request : upstream_requests_) {
1048 196 : if (upstream_request->createPerTryTimeoutOnRequestComplete()) {
1049 58 : upstream_request->setupPerTryTimeout();
1050 58 : }
1051 196 : }
1052 196 : }
1053 196 : }
1054 :
1055 585 : void Filter::onDestroy() {
1056 : // Reset any in-flight upstream requests.
1057 585 : resetAll();
1058 :
1059 : // Unregister from shadow stream notifications and cancel active streams.
1060 585 : for (auto* shadow_stream : shadow_streams_) {
1061 0 : shadow_stream->removeDestructorCallback();
1062 0 : shadow_stream->removeWatermarkCallbacks();
1063 0 : shadow_stream->cancel();
1064 0 : }
1065 :
1066 585 : cleanup();
1067 585 : }
1068 :
1069 0 : void Filter::onResponseTimeout() {
1070 0 : ENVOY_STREAM_LOG(debug, "upstream timeout", *callbacks_);
1071 :
1072 : // Reset any upstream requests that are still in flight.
1073 0 : while (!upstream_requests_.empty()) {
1074 0 : UpstreamRequestPtr upstream_request =
1075 0 : upstream_requests_.back()->removeFromList(upstream_requests_);
1076 :
1077 : // We want to record the upstream timeouts and increase the stats counters in all the cases.
1078 : // For example, we also want to record the stats in the case of BiDi streaming APIs where we
1079 : // might have already seen the headers.
1080 0 : cluster_->trafficStats()->upstream_rq_timeout_.inc();
1081 0 : if (request_vcluster_) {
1082 0 : request_vcluster_->stats().upstream_rq_timeout_.inc();
1083 0 : }
1084 0 : if (route_stats_context_.has_value()) {
1085 0 : route_stats_context_->stats().upstream_rq_timeout_.inc();
1086 0 : }
1087 :
1088 0 : if (upstream_request->upstreamHost()) {
1089 0 : upstream_request->upstreamHost()->stats().rq_timeout_.inc();
1090 0 : }
1091 :
1092 0 : if (upstream_request->awaitingHeaders()) {
1093 0 : if (cluster_->timeoutBudgetStats().has_value()) {
1094 : // Cancel firing per-try timeout information, because the per-try timeout did not come into
1095 : // play when the global timeout was hit.
1096 0 : upstream_request->recordTimeoutBudget(false);
1097 0 : }
1098 :
1099 : // If this upstream request already hit a "soft" timeout, then it
1100 : // already recorded a timeout into outlier detection. Don't do it again.
1101 0 : if (!upstream_request->outlierDetectionTimeoutRecorded()) {
1102 0 : updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, *upstream_request,
1103 0 : absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
1104 0 : }
1105 :
1106 0 : chargeUpstreamAbort(timeout_response_code_, false, *upstream_request);
1107 0 : }
1108 0 : upstream_request->resetStream();
1109 0 : }
1110 :
1111 0 : onUpstreamTimeoutAbort(StreamInfo::ResponseFlag::UpstreamRequestTimeout,
1112 0 : StreamInfo::ResponseCodeDetails::get().ResponseTimeout);
1113 0 : }
1114 :
1115 : // Called when the per try timeout is hit but we didn't reset the request
1116 : // (hedge_on_per_try_timeout enabled).
1117 0 : void Filter::onSoftPerTryTimeout(UpstreamRequest& upstream_request) {
1118 : // Track this as a timeout for outlier detection purposes even though we didn't
1119 : // cancel the request yet and might get a 2xx later.
1120 0 : updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request,
1121 0 : absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
1122 0 : upstream_request.outlierDetectionTimeoutRecorded(true);
1123 :
1124 0 : if (!downstream_response_started_ && retry_state_) {
1125 0 : RetryStatus retry_status = retry_state_->shouldHedgeRetryPerTryTimeout(
1126 0 : [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_]() -> void {
1127 : // Without any knowledge about what's going on in the connection pool, retry the request
1128 : // with the safest settings which is no early data but keep using or not using alt-svc as
1129 : // before. In this way, QUIC won't be falsely marked as broken.
1130 0 : doRetry(/*can_send_early_data*/ false, can_use_http3, TimeoutRetry::Yes);
1131 0 : });
1132 :
1133 0 : if (retry_status == RetryStatus::Yes) {
1134 0 : runRetryOptionsPredicates(upstream_request);
1135 0 : pending_retries_++;
1136 :
1137 : // Don't increment upstream_host->stats().rq_error_ here, we'll do that
1138 : // later if 1) we hit global timeout or 2) we get bad response headers
1139 : // back.
1140 0 : upstream_request.retried(true);
1141 :
1142 : // TODO: cluster stat for hedge attempted.
1143 0 : } else if (retry_status == RetryStatus::NoOverflow) {
1144 0 : callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
1145 0 : } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
1146 0 : callbacks_->streamInfo().setResponseFlag(
1147 0 : StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded);
1148 0 : }
1149 0 : }
1150 0 : }
1151 :
1152 0 : void Filter::onPerTryIdleTimeout(UpstreamRequest& upstream_request) {
1153 0 : onPerTryTimeoutCommon(upstream_request,
1154 0 : cluster_->trafficStats()->upstream_rq_per_try_idle_timeout_,
1155 0 : StreamInfo::ResponseCodeDetails::get().UpstreamPerTryIdleTimeout);
1156 0 : }
1157 :
1158 0 : void Filter::onPerTryTimeout(UpstreamRequest& upstream_request) {
1159 0 : onPerTryTimeoutCommon(upstream_request, cluster_->trafficStats()->upstream_rq_per_try_timeout_,
1160 0 : StreamInfo::ResponseCodeDetails::get().UpstreamPerTryTimeout);
1161 0 : }
1162 :
1163 : void Filter::onPerTryTimeoutCommon(UpstreamRequest& upstream_request, Stats::Counter& error_counter,
1164 0 : const std::string& response_code_details) {
1165 0 : if (hedging_params_.hedge_on_per_try_timeout_) {
1166 0 : onSoftPerTryTimeout(upstream_request);
1167 0 : return;
1168 0 : }
1169 :
1170 0 : error_counter.inc();
1171 0 : if (upstream_request.upstreamHost()) {
1172 0 : upstream_request.upstreamHost()->stats().rq_timeout_.inc();
1173 0 : }
1174 :
1175 0 : upstream_request.resetStream();
1176 :
1177 0 : updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request,
1178 0 : absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
1179 :
1180 0 : if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::Yes)) {
1181 0 : return;
1182 0 : }
1183 :
1184 0 : chargeUpstreamAbort(timeout_response_code_, false, upstream_request);
1185 :
1186 : // Remove this upstream request from the list now that we're done with it.
1187 0 : upstream_request.removeFromList(upstream_requests_);
1188 0 : onUpstreamTimeoutAbort(StreamInfo::ResponseFlag::UpstreamRequestTimeout, response_code_details);
1189 0 : }
1190 :
1191 0 : void Filter::onStreamMaxDurationReached(UpstreamRequest& upstream_request) {
1192 0 : upstream_request.resetStream();
1193 :
1194 0 : if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::No)) {
1195 0 : return;
1196 0 : }
1197 :
1198 0 : upstream_request.removeFromList(upstream_requests_);
1199 0 : cleanup();
1200 :
1201 0 : callbacks_->streamInfo().setResponseFlag(
1202 0 : StreamInfo::ResponseFlag::UpstreamMaxStreamDurationReached);
1203 : // Grab the const ref to call the const method of StreamInfo.
1204 0 : const auto& stream_info = callbacks_->streamInfo();
1205 0 : const bool downstream_decode_complete =
1206 0 : stream_info.downstreamTiming().has_value() &&
1207 0 : stream_info.downstreamTiming().value().get().lastDownstreamRxByteReceived().has_value();
1208 :
1209 : // sendLocalReply may instead reset the stream if downstream_response_started_ is true.
1210 0 : callbacks_->sendLocalReply(
1211 0 : Http::Utility::maybeRequestTimeoutCode(downstream_decode_complete),
1212 0 : "upstream max stream duration reached", modify_headers_, absl::nullopt,
1213 0 : StreamInfo::ResponseCodeDetails::get().UpstreamMaxStreamDurationReached);
1214 0 : }
1215 :
1216 : void Filter::updateOutlierDetection(Upstream::Outlier::Result result,
1217 : UpstreamRequest& upstream_request,
1218 86 : absl::optional<uint64_t> code) {
1219 86 : if (upstream_request.upstreamHost()) {
1220 86 : upstream_request.upstreamHost()->outlierDetector().putResult(result, code);
1221 86 : }
1222 86 : }
1223 :
1224 86 : void Filter::chargeUpstreamAbort(Http::Code code, bool dropped, UpstreamRequest& upstream_request) {
1225 86 : if (downstream_response_started_) {
1226 28 : if (upstream_request.grpcRqSuccessDeferred()) {
1227 28 : upstream_request.upstreamHost()->stats().rq_error_.inc();
1228 28 : stats_.rq_reset_after_downstream_response_started_.inc();
1229 28 : }
1230 86 : } else {
1231 58 : Upstream::HostDescriptionConstSharedPtr upstream_host = upstream_request.upstreamHost();
1232 :
1233 58 : chargeUpstreamCode(code, upstream_host, dropped);
1234 : // If we had non-5xx but still have been reset by backend or timeout before
1235 : // starting response, we treat this as an error. We only get non-5xx when
1236 : // timeout_response_code_ is used for code above, where this member can
1237 : // assume values such as 204 (NoContent).
1238 58 : if (upstream_host != nullptr && !Http::CodeUtility::is5xx(enumToInt(code))) {
1239 0 : upstream_host->stats().rq_error_.inc();
1240 0 : }
1241 58 : }
1242 86 : }
1243 :
1244 : void Filter::onUpstreamTimeoutAbort(StreamInfo::ResponseFlag response_flags,
1245 0 : absl::string_view details) {
1246 0 : Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats();
1247 0 : if (tb_stats.has_value()) {
1248 0 : Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1249 0 : std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
1250 0 : dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
1251 :
1252 0 : tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue(
1253 0 : FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_));
1254 0 : }
1255 :
1256 0 : const absl::string_view body =
1257 0 : timeout_response_code_ == Http::Code::GatewayTimeout ? "upstream request timeout" : "";
1258 0 : onUpstreamAbort(timeout_response_code_, response_flags, body, false, details);
1259 0 : }
1260 :
1261 : void Filter::onUpstreamAbort(Http::Code code, StreamInfo::ResponseFlag response_flags,
1262 86 : absl::string_view body, bool dropped, absl::string_view details) {
1263 : // If we have not yet sent anything downstream, send a response with an appropriate status code.
1264 : // Otherwise just reset the ongoing response.
1265 86 : callbacks_->streamInfo().setResponseFlag(response_flags);
1266 : // This will destroy any created retry timers.
1267 86 : cleanup();
1268 : // sendLocalReply may instead reset the stream if downstream_response_started_ is true.
1269 86 : callbacks_->sendLocalReply(
1270 86 : code, body,
1271 86 : [dropped, this](Http::ResponseHeaderMap& headers) {
1272 58 : if (dropped && !config_.suppress_envoy_headers_) {
1273 0 : headers.addReference(Http::Headers::get().EnvoyOverloaded,
1274 0 : Http::Headers::get().EnvoyOverloadedValues.True);
1275 0 : }
1276 58 : modify_headers_(headers);
1277 58 : },
1278 86 : absl::nullopt, details);
1279 86 : }
1280 :
1281 : bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason,
1282 86 : UpstreamRequest& upstream_request, TimeoutRetry is_timeout_retry) {
1283 : // We don't retry if we already started the response, don't have a retry policy defined,
1284 : // or if we've already retried this upstream request (currently only possible if a per
1285 : // try timeout occurred and hedge_on_per_try_timeout is enabled).
1286 86 : if (downstream_response_started_ || !retry_state_ || upstream_request.retried()) {
1287 86 : return false;
1288 86 : }
1289 0 : RetryState::Http3Used was_using_http3 = RetryState::Http3Used::Unknown;
1290 0 : if (upstream_request.hadUpstream()) {
1291 0 : was_using_http3 = (upstream_request.streamInfo().protocol().has_value() &&
1292 0 : upstream_request.streamInfo().protocol().value() == Http::Protocol::Http3)
1293 0 : ? RetryState::Http3Used::Yes
1294 0 : : RetryState::Http3Used::No;
1295 0 : }
1296 0 : const RetryStatus retry_status = retry_state_->shouldRetryReset(
1297 0 : reset_reason, was_using_http3,
1298 0 : [this, can_send_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_,
1299 0 : can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
1300 0 : is_timeout_retry](bool disable_http3) -> void {
1301 : // This retry might be because of ConnectionFailure of 0-RTT handshake. In this case, though
1302 : // the original request is retried with the same can_send_early_data setting, it will not be
1303 : // sent as early data by the underlying connection pool grid.
1304 0 : doRetry(can_send_early_data, disable_http3 ? false : can_use_http3, is_timeout_retry);
1305 0 : });
1306 0 : if (retry_status == RetryStatus::Yes) {
1307 0 : runRetryOptionsPredicates(upstream_request);
1308 0 : pending_retries_++;
1309 :
1310 0 : if (upstream_request.upstreamHost()) {
1311 0 : upstream_request.upstreamHost()->stats().rq_error_.inc();
1312 0 : }
1313 :
1314 0 : auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1315 0 : callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1316 0 : return true;
1317 0 : } else if (retry_status == RetryStatus::NoOverflow) {
1318 0 : callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
1319 0 : } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
1320 0 : callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded);
1321 0 : }
1322 :
1323 0 : return false;
1324 0 : }
1325 :
1326 : void Filter::onUpstreamReset(Http::StreamResetReason reset_reason,
1327 : absl::string_view transport_failure_reason,
1328 86 : UpstreamRequest& upstream_request) {
1329 86 : ENVOY_STREAM_LOG(debug, "upstream reset: reset reason: {}, transport failure reason: {}",
1330 86 : *callbacks_, Http::Utility::resetReasonToString(reset_reason),
1331 86 : transport_failure_reason);
1332 :
1333 86 : const bool dropped = reset_reason == Http::StreamResetReason::Overflow;
1334 :
1335 : // Ignore upstream reset caused by a resource overflow.
1336 : // Currently, circuit breakers can only produce this reset reason.
1337 : // It means that this reason is cluster-wise, not upstream-related.
1338 : // Therefore removing an upstream in the case of an overloaded cluster
1339 : // would make the situation even worse.
1340 : // https://github.com/envoyproxy/envoy/issues/25487
1341 86 : if (!dropped) {
1342 : // TODO: The reset may also come from upstream over the wire. In this case it should be
1343 : // treated as external origin error and distinguished from local origin error.
1344 : // This matters only when running OutlierDetection with split_external_local_origin_errors
1345 : // config param set to true.
1346 86 : updateOutlierDetection(Upstream::Outlier::Result::LocalOriginConnectFailed, upstream_request,
1347 86 : absl::nullopt);
1348 86 : }
1349 :
1350 86 : if (maybeRetryReset(reset_reason, upstream_request, TimeoutRetry::No)) {
1351 0 : return;
1352 0 : }
1353 :
1354 86 : const Http::Code error_code = (reset_reason == Http::StreamResetReason::ProtocolError)
1355 86 : ? Http::Code::BadGateway
1356 86 : : Http::Code::ServiceUnavailable;
1357 86 : chargeUpstreamAbort(error_code, dropped, upstream_request);
1358 86 : auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1359 86 : callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1360 :
1361 : // If there are other in-flight requests that might see an upstream response,
1362 : // don't return anything downstream.
1363 86 : if (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0) {
1364 0 : return;
1365 0 : }
1366 :
1367 86 : const StreamInfo::ResponseFlag response_flags = streamResetReasonToResponseFlag(reset_reason);
1368 :
1369 86 : const std::string body =
1370 86 : absl::StrCat("upstream connect error or disconnect/reset before headers. ",
1371 86 : (is_retry_ ? "retried and the latest " : ""),
1372 86 : "reset reason: ", Http::Utility::resetReasonToString(reset_reason),
1373 86 : !transport_failure_reason.empty() ? ", transport failure reason: " : "",
1374 86 : transport_failure_reason);
1375 86 : const std::string& basic_details =
1376 86 : downstream_response_started_ ? StreamInfo::ResponseCodeDetails::get().LateUpstreamReset
1377 86 : : StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset;
1378 86 : const std::string details = StringUtil::replaceAllEmptySpace(absl::StrCat(
1379 86 : basic_details, "{", Http::Utility::resetReasonToString(reset_reason),
1380 86 : transport_failure_reason.empty() ? "" : absl::StrCat(",", transport_failure_reason), "}"));
1381 86 : onUpstreamAbort(error_code, response_flags, body, dropped, details);
1382 86 : }
1383 :
1384 : void Filter::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
1385 208 : bool pool_success) {
1386 208 : if (retry_state_ && host) {
1387 0 : retry_state_->onHostAttempted(host);
1388 0 : }
1389 :
1390 208 : if (!pool_success) {
1391 6 : return;
1392 6 : }
1393 :
1394 202 : if (request_vcluster_) {
1395 : // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady
1396 : // callback. Hence, the upstream request increases the virtual cluster's upstream_rq_total_ stat
1397 : // here.
1398 0 : request_vcluster_->stats().upstream_rq_total_.inc();
1399 0 : }
1400 202 : if (route_stats_context_.has_value()) {
1401 : // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady
1402 : // callback. Hence, the upstream request increases the route level upstream_rq_total_ stat
1403 : // here.
1404 0 : route_stats_context_->stats().upstream_rq_total_.inc();
1405 0 : }
1406 202 : }
1407 :
1408 : StreamInfo::ResponseFlag
1409 172 : Filter::streamResetReasonToResponseFlag(Http::StreamResetReason reset_reason) {
1410 172 : switch (reset_reason) {
1411 0 : case Http::StreamResetReason::LocalConnectionFailure:
1412 12 : case Http::StreamResetReason::RemoteConnectionFailure:
1413 12 : case Http::StreamResetReason::ConnectionTimeout:
1414 12 : return StreamInfo::ResponseFlag::UpstreamConnectionFailure;
1415 86 : case Http::StreamResetReason::ConnectionTermination:
1416 86 : return StreamInfo::ResponseFlag::UpstreamConnectionTermination;
1417 0 : case Http::StreamResetReason::LocalReset:
1418 0 : case Http::StreamResetReason::LocalRefusedStreamReset:
1419 0 : return StreamInfo::ResponseFlag::LocalReset;
1420 0 : case Http::StreamResetReason::Overflow:
1421 0 : return StreamInfo::ResponseFlag::UpstreamOverflow;
1422 0 : case Http::StreamResetReason::RemoteReset:
1423 0 : case Http::StreamResetReason::RemoteRefusedStreamReset:
1424 0 : case Http::StreamResetReason::ConnectError:
1425 0 : return StreamInfo::ResponseFlag::UpstreamRemoteReset;
1426 74 : case Http::StreamResetReason::ProtocolError:
1427 74 : return StreamInfo::ResponseFlag::UpstreamProtocolError;
1428 0 : case Http::StreamResetReason::OverloadManager:
1429 0 : return StreamInfo::ResponseFlag::OverloadManager;
1430 172 : }
1431 :
1432 0 : PANIC_DUE_TO_CORRUPT_ENUM;
1433 0 : }
1434 :
1435 : void Filter::handleNon5xxResponseHeaders(absl::optional<Grpc::Status::GrpcStatus> grpc_status,
1436 : UpstreamRequest& upstream_request, bool end_stream,
1437 141 : uint64_t grpc_to_http_status) {
1438 : // We need to defer gRPC success until after we have processed grpc-status in
1439 : // the trailers.
1440 141 : if (grpc_request_) {
1441 68 : if (end_stream) {
1442 19 : if (grpc_status && !Http::CodeUtility::is5xx(grpc_to_http_status)) {
1443 19 : upstream_request.upstreamHost()->stats().rq_success_.inc();
1444 19 : } else {
1445 0 : upstream_request.upstreamHost()->stats().rq_error_.inc();
1446 0 : }
1447 49 : } else {
1448 49 : upstream_request.grpcRqSuccessDeferred(true);
1449 49 : }
1450 103 : } else {
1451 73 : upstream_request.upstreamHost()->stats().rq_success_.inc();
1452 73 : }
1453 141 : }
1454 :
1455 : void Filter::onUpstream1xxHeaders(Http::ResponseHeaderMapPtr&& headers,
1456 0 : UpstreamRequest& upstream_request) {
1457 0 : const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
1458 0 : chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false);
1459 0 : ENVOY_STREAM_LOG(debug, "upstream 1xx ({}).", *callbacks_, response_code);
1460 :
1461 0 : downstream_response_started_ = true;
1462 0 : final_upstream_request_ = &upstream_request;
1463 0 : resetOtherUpstreams(upstream_request);
1464 :
1465 : // Don't send retries after 100-Continue has been sent on. Arguably we could attempt to do a
1466 : // retry, assume the next upstream would also send an 100-Continue and swallow the second one
1467 : // but it's sketchy (as the subsequent upstream might not send a 100-Continue) and not worth
1468 : // the complexity until someone asks for it.
1469 0 : retry_state_.reset();
1470 :
1471 0 : callbacks_->encode1xxHeaders(std::move(headers));
1472 0 : }
1473 :
1474 585 : void Filter::resetAll() {
1475 652 : while (!upstream_requests_.empty()) {
1476 67 : auto request_ptr = upstream_requests_.back()->removeFromList(upstream_requests_);
1477 67 : request_ptr->resetStream();
1478 67 : callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1479 67 : }
1480 585 : }
1481 :
1482 141 : void Filter::resetOtherUpstreams(UpstreamRequest& upstream_request) {
1483 : // Pop each upstream request on the list and reset it if it's not the one
1484 : // provided. At the end we'll move it back into the list.
1485 141 : UpstreamRequestPtr final_upstream_request;
1486 282 : while (!upstream_requests_.empty()) {
1487 141 : UpstreamRequestPtr upstream_request_tmp =
1488 141 : upstream_requests_.back()->removeFromList(upstream_requests_);
1489 141 : if (upstream_request_tmp.get() != &upstream_request) {
1490 0 : upstream_request_tmp->resetStream();
1491 : // TODO: per-host stat for hedge abandoned.
1492 : // TODO: cluster stat for hedge abandoned.
1493 141 : } else {
1494 141 : final_upstream_request = std::move(upstream_request_tmp);
1495 141 : }
1496 141 : }
1497 :
1498 141 : ASSERT(final_upstream_request);
1499 : // Now put the final request back on this list.
1500 141 : LinkedList::moveIntoList(std::move(final_upstream_request), upstream_requests_);
1501 141 : }
1502 :
1503 : void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
1504 141 : UpstreamRequest& upstream_request, bool end_stream) {
1505 141 : ENVOY_STREAM_LOG(debug, "upstream headers complete: end_stream={}", *callbacks_, end_stream);
1506 :
1507 141 : modify_headers_(*headers);
1508 : // When grpc-status appears in response headers, convert grpc-status to HTTP status code
1509 : // for outlier detection. This does not currently change any stats or logging and does not
1510 : // handle the case when an error grpc-status is sent as a trailer.
1511 141 : absl::optional<Grpc::Status::GrpcStatus> grpc_status;
1512 141 : uint64_t grpc_to_http_status = 0;
1513 141 : if (grpc_request_) {
1514 68 : grpc_status = Grpc::Common::getGrpcStatus(*headers);
1515 68 : if (grpc_status.has_value()) {
1516 19 : grpc_to_http_status = Grpc::Utility::grpcToHttpStatus(grpc_status.value());
1517 19 : }
1518 68 : }
1519 :
1520 141 : if (grpc_status.has_value()) {
1521 19 : upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(grpc_to_http_status);
1522 122 : } else {
1523 122 : upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(response_code);
1524 122 : }
1525 :
1526 141 : if (headers->EnvoyImmediateHealthCheckFail() != nullptr) {
1527 0 : upstream_request.upstreamHost()->healthChecker().setUnhealthy(
1528 0 : Upstream::HealthCheckHostMonitor::UnhealthyType::ImmediateHealthCheckFail);
1529 0 : }
1530 :
1531 141 : bool could_not_retry = false;
1532 :
1533 : // Check if this upstream request was already retried, for instance after
1534 : // hitting a per try timeout. Don't retry it if we already have.
1535 141 : if (retry_state_) {
1536 0 : if (upstream_request.retried()) {
1537 : // We already retried this request (presumably for a per try timeout) so
1538 : // we definitely won't retry it again. Check if we would have retried it
1539 : // if we could.
1540 0 : bool retry_as_early_data; // Not going to be used as we are not retrying.
1541 0 : could_not_retry = retry_state_->wouldRetryFromHeaders(*headers, *downstream_headers_,
1542 0 : retry_as_early_data) !=
1543 0 : RetryState::RetryDecision::NoRetry;
1544 0 : } else {
1545 0 : const RetryStatus retry_status = retry_state_->shouldRetryHeaders(
1546 0 : *headers, *downstream_headers_,
1547 0 : [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
1548 0 : had_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_](
1549 0 : bool disable_early_data) -> void {
1550 0 : doRetry((disable_early_data ? false : had_early_data), can_use_http3, TimeoutRetry::No);
1551 0 : });
1552 0 : if (retry_status == RetryStatus::Yes) {
1553 0 : runRetryOptionsPredicates(upstream_request);
1554 0 : pending_retries_++;
1555 0 : upstream_request.upstreamHost()->stats().rq_error_.inc();
1556 0 : Http::CodeStats& code_stats = httpContext().codeStats();
1557 0 : code_stats.chargeBasicResponseStat(cluster_->statsScope(), stats_.stat_names_.retry_,
1558 0 : static_cast<Http::Code>(response_code),
1559 0 : exclude_http_code_stats_);
1560 :
1561 0 : if (!end_stream || !upstream_request.encodeComplete()) {
1562 0 : upstream_request.resetStream();
1563 0 : }
1564 0 : auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1565 0 : callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1566 0 : return;
1567 0 : } else if (retry_status == RetryStatus::NoOverflow) {
1568 0 : callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
1569 0 : could_not_retry = true;
1570 0 : } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
1571 0 : callbacks_->streamInfo().setResponseFlag(
1572 0 : StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded);
1573 0 : could_not_retry = true;
1574 0 : }
1575 0 : }
1576 0 : }
1577 :
1578 141 : if (route_entry_->internalRedirectPolicy().enabled() &&
1579 141 : route_entry_->internalRedirectPolicy().shouldRedirectForResponseCode(
1580 0 : static_cast<Http::Code>(response_code)) &&
1581 141 : setupRedirect(*headers)) {
1582 0 : return;
1583 : // If the redirect could not be handled, fail open and let it pass to the
1584 : // next downstream.
1585 0 : }
1586 :
1587 : // Check if we got a "bad" response, but there are still upstream requests in
1588 : // flight awaiting headers or scheduled retries. If so, exit to give them a
1589 : // chance to return before returning a response downstream.
1590 141 : if (could_not_retry && (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0)) {
1591 0 : upstream_request.upstreamHost()->stats().rq_error_.inc();
1592 :
1593 : // Reset the stream because there are other in-flight requests that we'll
1594 : // wait around for and we're not interested in consuming any body/trailers.
1595 0 : auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1596 0 : request_ptr->resetStream();
1597 0 : callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1598 0 : return;
1599 0 : }
1600 :
1601 : // Make sure any retry timers are destroyed since we may not call cleanup() if end_stream is
1602 : // false.
1603 141 : if (retry_state_) {
1604 0 : retry_state_.reset();
1605 0 : }
1606 :
1607 : // Only send upstream service time if we received the complete request and this is not a
1608 : // premature response.
1609 141 : if (DateUtil::timePointValid(downstream_request_complete_time_)) {
1610 73 : Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1611 73 : MonotonicTime response_received_time = dispatcher.timeSource().monotonicTime();
1612 73 : std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1613 73 : response_received_time - downstream_request_complete_time_);
1614 73 : if (!config_.suppress_envoy_headers_) {
1615 73 : headers->setEnvoyUpstreamServiceTime(ms.count());
1616 73 : }
1617 73 : }
1618 :
1619 141 : upstream_request.upstreamCanary(
1620 141 : (headers->EnvoyUpstreamCanary() && headers->EnvoyUpstreamCanary()->value() == "true") ||
1621 141 : upstream_request.upstreamHost()->canary());
1622 141 : chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false);
1623 141 : if (!Http::CodeUtility::is5xx(response_code)) {
1624 141 : handleNon5xxResponseHeaders(grpc_status, upstream_request, end_stream, grpc_to_http_status);
1625 141 : }
1626 :
1627 : // Append routing cookies
1628 141 : for (const auto& header_value : downstream_set_cookies_) {
1629 0 : headers->addReferenceKey(Http::Headers::get().SetCookie, header_value);
1630 0 : }
1631 :
1632 141 : callbacks_->streamInfo().setResponseCodeDetails(
1633 141 : StreamInfo::ResponseCodeDetails::get().ViaUpstream);
1634 :
1635 141 : if (Runtime::runtimeFeatureEnabled(
1636 141 : "envoy.reloadable_features.copy_response_code_to_downstream_stream_info")) {
1637 141 : callbacks_->streamInfo().setResponseCode(response_code);
1638 141 : }
1639 :
1640 : // TODO(zuercher): If access to response_headers_to_add (at any level) is ever needed outside
1641 : // Router::Filter we'll need to find a better location for this work. One possibility is to
1642 : // provide finalizeResponseHeaders functions on the Router::Config and VirtualHost interfaces.
1643 141 : route_entry_->finalizeResponseHeaders(*headers, callbacks_->streamInfo());
1644 :
1645 141 : downstream_response_started_ = true;
1646 141 : final_upstream_request_ = &upstream_request;
1647 : // Make sure that for request hedging, we end up with the correct final upstream info.
1648 141 : callbacks_->streamInfo().setUpstreamInfo(final_upstream_request_->streamInfo().upstreamInfo());
1649 141 : resetOtherUpstreams(upstream_request);
1650 141 : if (end_stream) {
1651 21 : onUpstreamComplete(upstream_request);
1652 21 : }
1653 :
1654 141 : callbacks_->encodeHeaders(std::move(headers), end_stream,
1655 141 : StreamInfo::ResponseCodeDetails::get().ViaUpstream);
1656 141 : }
1657 :
1658 : void Filter::onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request,
1659 395 : bool end_stream) {
1660 : // This should be true because when we saw headers we either reset the stream
1661 : // (hence wouldn't have made it to onUpstreamData) or all other in-flight
1662 : // streams.
1663 395 : ASSERT(upstream_requests_.size() == 1);
1664 395 : if (end_stream) {
1665 : // gRPC request termination without trailers is an error.
1666 71 : if (upstream_request.grpcRqSuccessDeferred()) {
1667 0 : upstream_request.upstreamHost()->stats().rq_error_.inc();
1668 0 : }
1669 71 : onUpstreamComplete(upstream_request);
1670 71 : }
1671 :
1672 395 : callbacks_->encodeData(data, end_stream);
1673 395 : }
1674 :
1675 : void Filter::onUpstreamTrailers(Http::ResponseTrailerMapPtr&& trailers,
1676 6 : UpstreamRequest& upstream_request) {
1677 : // This should be true because when we saw headers we either reset the stream
1678 : // (hence wouldn't have made it to onUpstreamTrailers) or all other in-flight
1679 : // streams.
1680 6 : ASSERT(upstream_requests_.size() == 1);
1681 :
1682 6 : if (upstream_request.grpcRqSuccessDeferred()) {
1683 6 : absl::optional<Grpc::Status::GrpcStatus> grpc_status = Grpc::Common::getGrpcStatus(*trailers);
1684 6 : if (grpc_status &&
1685 6 : !Http::CodeUtility::is5xx(Grpc::Utility::grpcToHttpStatus(grpc_status.value()))) {
1686 6 : upstream_request.upstreamHost()->stats().rq_success_.inc();
1687 6 : } else {
1688 0 : upstream_request.upstreamHost()->stats().rq_error_.inc();
1689 0 : }
1690 6 : }
1691 :
1692 6 : onUpstreamComplete(upstream_request);
1693 :
1694 6 : callbacks_->encodeTrailers(std::move(trailers));
1695 6 : }
1696 :
1697 2 : void Filter::onUpstreamMetadata(Http::MetadataMapPtr&& metadata_map) {
1698 2 : callbacks_->encodeMetadata(std::move(metadata_map));
1699 2 : }
1700 :
1701 98 : void Filter::onUpstreamComplete(UpstreamRequest& upstream_request) {
1702 98 : if (!downstream_end_stream_) {
1703 25 : upstream_request.resetStream();
1704 25 : }
1705 98 : Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1706 98 : std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
1707 98 : dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
1708 :
1709 98 : Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats();
1710 98 : if (tb_stats.has_value()) {
1711 0 : tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue(
1712 0 : FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_));
1713 0 : }
1714 :
1715 98 : if (config_.emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck() &&
1716 98 : DateUtil::timePointValid(downstream_request_complete_time_)) {
1717 73 : upstream_request.upstreamHost()->outlierDetector().putResponseTime(response_time);
1718 73 : const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_);
1719 :
1720 73 : Http::CodeStats& code_stats = httpContext().codeStats();
1721 73 : Http::CodeStats::ResponseTimingInfo info{
1722 73 : config_.scope_,
1723 73 : cluster_->statsScope(),
1724 73 : config_.empty_stat_name_,
1725 73 : response_time,
1726 73 : upstream_request.upstreamCanary(),
1727 73 : internal_request,
1728 73 : route_entry_->virtualHost().statName(),
1729 73 : request_vcluster_ ? request_vcluster_->statName() : config_.empty_stat_name_,
1730 73 : route_stats_context_.has_value() ? route_stats_context_->statName()
1731 73 : : config_.empty_stat_name_,
1732 73 : config_.zone_name_,
1733 73 : upstreamZone(upstream_request.upstreamHost())};
1734 :
1735 73 : code_stats.chargeResponseTiming(info);
1736 :
1737 73 : if (alt_stat_prefix_ != nullptr) {
1738 0 : Http::CodeStats::ResponseTimingInfo info{config_.scope_,
1739 0 : cluster_->statsScope(),
1740 0 : alt_stat_prefix_->statName(),
1741 0 : response_time,
1742 0 : upstream_request.upstreamCanary(),
1743 0 : internal_request,
1744 0 : config_.empty_stat_name_,
1745 0 : config_.empty_stat_name_,
1746 0 : config_.empty_stat_name_,
1747 0 : config_.zone_name_,
1748 0 : upstreamZone(upstream_request.upstreamHost())};
1749 :
1750 0 : code_stats.chargeResponseTiming(info);
1751 0 : }
1752 73 : }
1753 :
1754 : // Defer deletion as this is generally called under the stack of the upstream
1755 : // request, and immediate deletion is dangerous.
1756 98 : callbacks_->dispatcher().deferredDelete(upstream_request.removeFromList(upstream_requests_));
1757 98 : cleanup();
1758 98 : }
1759 :
1760 0 : bool Filter::setupRedirect(const Http::ResponseHeaderMap& headers) {
1761 0 : ENVOY_STREAM_LOG(debug, "attempting internal redirect", *callbacks_);
1762 0 : const Http::HeaderEntry* location = headers.Location();
1763 :
1764 0 : const uint64_t status_code = Http::Utility::getResponseStatus(headers);
1765 :
1766 : // Redirects are not supported for streaming requests yet.
1767 0 : if (downstream_end_stream_ && (!request_buffer_overflowed_ || !callbacks_->decodingBuffer()) &&
1768 0 : location != nullptr &&
1769 0 : convertRequestHeadersForInternalRedirect(*downstream_headers_, headers, *location,
1770 0 : status_code) &&
1771 0 : callbacks_->recreateStream(&headers)) {
1772 0 : ENVOY_STREAM_LOG(debug, "Internal redirect succeeded", *callbacks_);
1773 0 : cluster_->trafficStats()->upstream_internal_redirect_succeeded_total_.inc();
1774 0 : return true;
1775 0 : }
1776 : // convertRequestHeadersForInternalRedirect logs failure reasons but log
1777 : // details for other failure modes here.
1778 0 : if (!downstream_end_stream_) {
1779 0 : ENVOY_STREAM_LOG(trace, "Internal redirect failed: request incomplete", *callbacks_);
1780 0 : } else if (request_buffer_overflowed_) {
1781 0 : ENVOY_STREAM_LOG(trace, "Internal redirect failed: request body overflow", *callbacks_);
1782 0 : } else if (location == nullptr) {
1783 0 : ENVOY_STREAM_LOG(trace, "Internal redirect failed: missing location header", *callbacks_);
1784 0 : }
1785 :
1786 0 : cluster_->trafficStats()->upstream_internal_redirect_failed_total_.inc();
1787 0 : return false;
1788 0 : }
1789 :
1790 : bool Filter::convertRequestHeadersForInternalRedirect(
1791 : Http::RequestHeaderMap& downstream_headers, const Http::ResponseHeaderMap& upstream_headers,
1792 0 : const Http::HeaderEntry& internal_redirect, uint64_t status_code) {
1793 0 : if (!downstream_headers.Path()) {
1794 0 : ENVOY_STREAM_LOG(trace, "Internal redirect failed: no path in downstream_headers", *callbacks_);
1795 0 : return false;
1796 0 : }
1797 :
1798 0 : absl::string_view redirect_url = internal_redirect.value().getStringView();
1799 : // Make sure the redirect response contains a URL to redirect to.
1800 0 : if (redirect_url.empty()) {
1801 0 : stats_.passthrough_internal_redirect_bad_location_.inc();
1802 0 : ENVOY_STREAM_LOG(trace, "Internal redirect failed: empty location", *callbacks_);
1803 0 : return false;
1804 0 : }
1805 0 : Http::Utility::Url absolute_url;
1806 0 : if (!absolute_url.initialize(redirect_url, false)) {
1807 0 : stats_.passthrough_internal_redirect_bad_location_.inc();
1808 0 : ENVOY_STREAM_LOG(trace, "Internal redirect failed: invalid location {}", *callbacks_,
1809 0 : redirect_url);
1810 0 : return false;
1811 0 : }
1812 :
1813 0 : const auto& policy = route_entry_->internalRedirectPolicy();
1814 : // Don't change the scheme from the original request
1815 0 : const bool scheme_is_http = schemeIsHttp(downstream_headers, callbacks_->connection());
1816 0 : const bool target_is_http = Http::Utility::schemeIsHttp(absolute_url.scheme());
1817 0 : if (!policy.isCrossSchemeRedirectAllowed() && scheme_is_http != target_is_http) {
1818 0 : ENVOY_STREAM_LOG(trace, "Internal redirect failed: incorrect scheme for {}", *callbacks_,
1819 0 : redirect_url);
1820 0 : stats_.passthrough_internal_redirect_unsafe_scheme_.inc();
1821 0 : return false;
1822 0 : }
1823 :
1824 0 : const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
1825 : // Make sure that performing the redirect won't result in exceeding the configured number of
1826 : // redirects allowed for this route.
1827 0 : StreamInfo::UInt32Accessor* num_internal_redirect{};
1828 :
1829 0 : if (num_internal_redirect = filter_state->getDataMutable<StreamInfo::UInt32Accessor>(
1830 0 : NumInternalRedirectsFilterStateName);
1831 0 : num_internal_redirect == nullptr) {
1832 0 : auto state = std::make_shared<StreamInfo::UInt32AccessorImpl>(0);
1833 0 : num_internal_redirect = state.get();
1834 :
1835 0 : filter_state->setData(NumInternalRedirectsFilterStateName, std::move(state),
1836 0 : StreamInfo::FilterState::StateType::Mutable,
1837 0 : StreamInfo::FilterState::LifeSpan::Request);
1838 0 : }
1839 :
1840 0 : if (num_internal_redirect->value() >= policy.maxInternalRedirects()) {
1841 0 : ENVOY_STREAM_LOG(trace, "Internal redirect failed: redirect limits exceeded.", *callbacks_);
1842 0 : stats_.passthrough_internal_redirect_too_many_redirects_.inc();
1843 0 : return false;
1844 0 : }
1845 : // Copy the old values, so they can be restored if the redirect fails.
1846 0 : const bool scheme_is_set = (downstream_headers.Scheme() != nullptr);
1847 :
1848 0 : std::unique_ptr<Http::RequestHeaderMapImpl> saved_headers = Http::RequestHeaderMapImpl::create();
1849 0 : Http::RequestHeaderMapImpl::copyFrom(*saved_headers, downstream_headers);
1850 :
1851 0 : for (const Http::LowerCaseString& header :
1852 0 : route_entry_->internalRedirectPolicy().responseHeadersToCopy()) {
1853 0 : Http::HeaderMap::GetResult result = upstream_headers.get(header);
1854 0 : Http::HeaderMap::GetResult downstream_result = downstream_headers.get(header);
1855 0 : if (result.empty()) {
1856 : // Clear headers if present, else do nothing:
1857 0 : if (downstream_result.empty()) {
1858 0 : continue;
1859 0 : }
1860 0 : downstream_headers.remove(header);
1861 0 : } else {
1862 : // The header exists in the response, copy into the downstream headers
1863 0 : if (!downstream_result.empty()) {
1864 0 : downstream_headers.remove(header);
1865 0 : }
1866 0 : for (size_t idx = 0; idx < result.size(); idx++) {
1867 0 : downstream_headers.addCopy(header, result[idx]->value().getStringView());
1868 0 : }
1869 0 : }
1870 0 : }
1871 :
1872 0 : Cleanup restore_original_headers(
1873 0 : [&downstream_headers, scheme_is_set, scheme_is_http, &saved_headers]() {
1874 0 : downstream_headers.clear();
1875 0 : if (scheme_is_set) {
1876 0 : downstream_headers.setScheme(scheme_is_http ? Http::Headers::get().SchemeValues.Http
1877 0 : : Http::Headers::get().SchemeValues.Https);
1878 0 : }
1879 :
1880 0 : Http::RequestHeaderMapImpl::copyFrom(downstream_headers, *saved_headers);
1881 0 : });
1882 :
1883 : // Replace the original host, scheme and path.
1884 0 : downstream_headers.setScheme(absolute_url.scheme());
1885 0 : downstream_headers.setHost(absolute_url.hostAndPort());
1886 :
1887 0 : auto path_and_query = absolute_url.pathAndQueryParams();
1888 0 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http_reject_path_with_fragment")) {
1889 : // Envoy treats internal redirect as a new request and will reject it if URI path
1890 : // contains #fragment. However the Location header is allowed to have #fragment in URI path. To
1891 : // prevent Envoy from rejecting internal redirect, strip the #fragment from Location URI if it
1892 : // is present.
1893 0 : auto fragment_pos = path_and_query.find('#');
1894 0 : path_and_query = path_and_query.substr(0, fragment_pos);
1895 0 : }
1896 0 : downstream_headers.setPath(path_and_query);
1897 :
1898 : // Only clear the route cache if there are downstream callbacks. There aren't, for example,
1899 : // for async connections.
1900 0 : if (callbacks_->downstreamCallbacks()) {
1901 0 : callbacks_->downstreamCallbacks()->clearRouteCache();
1902 0 : }
1903 0 : const auto route = callbacks_->route();
1904 : // Don't allow a redirect to a non existing route.
1905 0 : if (!route) {
1906 0 : stats_.passthrough_internal_redirect_no_route_.inc();
1907 0 : ENVOY_STREAM_LOG(trace, "Internal redirect failed: no route found", *callbacks_);
1908 0 : return false;
1909 0 : }
1910 :
1911 0 : const auto& route_name = route->routeName();
1912 0 : for (const auto& predicate : policy.predicates()) {
1913 0 : if (!predicate->acceptTargetRoute(*filter_state, route_name, !scheme_is_http,
1914 0 : !target_is_http)) {
1915 0 : stats_.passthrough_internal_redirect_predicate_.inc();
1916 0 : ENVOY_STREAM_LOG(trace,
1917 0 : "Internal redirect failed: rejecting redirect targeting {}, by {} predicate",
1918 0 : *callbacks_, route_name, predicate->name());
1919 0 : return false;
1920 0 : }
1921 0 : }
1922 :
1923 : // See https://tools.ietf.org/html/rfc7231#section-6.4.4.
1924 0 : if (status_code == enumToInt(Http::Code::SeeOther) &&
1925 0 : downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Get &&
1926 0 : downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Head) {
1927 0 : downstream_headers.setMethod(Http::Headers::get().MethodValues.Get);
1928 0 : downstream_headers.remove(Http::Headers::get().ContentLength);
1929 0 : callbacks_->modifyDecodingBuffer([](Buffer::Instance& data) { data.drain(data.length()); });
1930 0 : }
1931 :
1932 0 : num_internal_redirect->increment();
1933 0 : restore_original_headers.cancel();
1934 : // Preserve the original request URL for the second pass.
1935 0 : downstream_headers.setEnvoyOriginalUrl(
1936 0 : absl::StrCat(scheme_is_http ? Http::Headers::get().SchemeValues.Http
1937 0 : : Http::Headers::get().SchemeValues.Https,
1938 0 : "://", saved_headers->getHostValue(), saved_headers->getPathValue()));
1939 0 : return true;
1940 0 : }
1941 :
1942 0 : void Filter::runRetryOptionsPredicates(UpstreamRequest& retriable_request) {
1943 0 : for (const auto& options_predicate : route_entry_->retryPolicy().retryOptionsPredicates()) {
1944 0 : const Upstream::RetryOptionsPredicate::UpdateOptionsParameters parameters{
1945 0 : retriable_request.streamInfo(), upstreamSocketOptions()};
1946 0 : auto ret = options_predicate->updateOptions(parameters);
1947 0 : if (ret.new_upstream_socket_options_.has_value()) {
1948 0 : upstream_options_ = ret.new_upstream_socket_options_.value();
1949 0 : }
1950 0 : }
1951 0 : }
1952 :
1953 0 : void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry) {
1954 0 : ENVOY_STREAM_LOG(debug, "performing retry", *callbacks_);
1955 :
1956 0 : is_retry_ = true;
1957 0 : attempt_count_++;
1958 0 : callbacks_->streamInfo().setAttemptCount(attempt_count_);
1959 0 : ASSERT(pending_retries_ > 0);
1960 0 : pending_retries_--;
1961 :
1962 : // Clusters can technically get removed by CDS during a retry. Make sure it still exists.
1963 0 : const auto cluster = config_.cm_.getThreadLocalCluster(route_entry_->clusterName());
1964 0 : std::unique_ptr<GenericConnPool> generic_conn_pool;
1965 0 : if (cluster != nullptr) {
1966 0 : cluster_ = cluster->info();
1967 0 : generic_conn_pool = createConnPool(*cluster);
1968 0 : }
1969 :
1970 0 : if (!generic_conn_pool) {
1971 0 : sendNoHealthyUpstreamResponse();
1972 0 : cleanup();
1973 0 : return;
1974 0 : }
1975 0 : UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
1976 0 : *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3);
1977 :
1978 0 : if (include_attempt_count_in_request_) {
1979 0 : downstream_headers_->setEnvoyAttemptCount(attempt_count_);
1980 0 : }
1981 :
1982 0 : if (include_timeout_retry_header_in_request_) {
1983 0 : downstream_headers_->setEnvoyIsTimeoutRetry(is_timeout_retry == TimeoutRetry::Yes ? "true"
1984 0 : : "false");
1985 0 : }
1986 :
1987 : // The request timeouts only account for time elapsed since the downstream request completed
1988 : // which might not have happened yet (in which case zero time has elapsed.)
1989 0 : std::chrono::milliseconds elapsed_time = std::chrono::milliseconds::zero();
1990 :
1991 0 : if (DateUtil::timePointValid(downstream_request_complete_time_)) {
1992 0 : Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1993 0 : elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(
1994 0 : dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
1995 0 : }
1996 :
1997 0 : FilterUtility::setTimeoutHeaders(elapsed_time.count(), timeout_, *route_entry_,
1998 0 : *downstream_headers_, !config_.suppress_envoy_headers_,
1999 0 : grpc_request_, hedging_params_.hedge_on_per_try_timeout_);
2000 :
2001 0 : UpstreamRequest* upstream_request_tmp = upstream_request.get();
2002 0 : LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
2003 0 : upstream_requests_.front()->acceptHeadersFromRouter(
2004 0 : !callbacks_->decodingBuffer() && !downstream_trailers_ && downstream_end_stream_);
2005 : // It's possible we got immediately reset which means the upstream request we just
2006 : // added to the front of the list might have been removed, so we need to check to make
2007 : // sure we don't send data on the wrong request.
2008 0 : if (!upstream_requests_.empty() && (upstream_requests_.front().get() == upstream_request_tmp)) {
2009 0 : if (callbacks_->decodingBuffer()) {
2010 : // If we are doing a retry we need to make a copy.
2011 0 : Buffer::OwnedImpl copy(*callbacks_->decodingBuffer());
2012 0 : upstream_requests_.front()->acceptDataFromRouter(copy, !downstream_trailers_ &&
2013 0 : downstream_end_stream_);
2014 0 : }
2015 :
2016 0 : if (downstream_trailers_) {
2017 0 : upstream_requests_.front()->acceptTrailersFromRouter(*downstream_trailers_);
2018 0 : }
2019 0 : }
2020 0 : }
2021 :
2022 86 : uint32_t Filter::numRequestsAwaitingHeaders() {
2023 86 : return std::count_if(upstream_requests_.begin(), upstream_requests_.end(),
2024 86 : [](const auto& req) -> bool { return req->awaitingHeaders(); });
2025 86 : }
2026 :
2027 : bool Filter::checkDropOverload(Upstream::ThreadLocalCluster& cluster,
2028 251 : std::function<void(Http::ResponseHeaderMap&)>& modify_headers) {
2029 251 : if (cluster.dropOverload().value()) {
2030 0 : ENVOY_STREAM_LOG(debug, "Router filter: cluster DROP_OVERLOAD configuration: {}", *callbacks_,
2031 0 : cluster.dropOverload().value());
2032 0 : if (config_.random_.bernoulli(cluster.dropOverload())) {
2033 0 : ENVOY_STREAM_LOG(debug, "The request is dropped by DROP_OVERLOAD", *callbacks_);
2034 0 : callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DropOverLoad);
2035 0 : chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true);
2036 0 : callbacks_->sendLocalReply(
2037 0 : Http::Code::ServiceUnavailable, "drop overload",
2038 0 : [modify_headers, this](Http::ResponseHeaderMap& headers) {
2039 0 : if (!config_.suppress_envoy_headers_) {
2040 0 : headers.addReference(Http::Headers::get().EnvoyDropOverload,
2041 0 : Http::Headers::get().EnvoyDropOverloadValues.True);
2042 0 : }
2043 0 : modify_headers(headers);
2044 0 : },
2045 0 : absl::nullopt, StreamInfo::ResponseCodeDetails::get().DropOverload);
2046 :
2047 0 : cluster.info()->loadReportStats().upstream_rq_drop_overload_.inc();
2048 0 : return true;
2049 0 : }
2050 0 : }
2051 251 : return false;
2052 251 : }
2053 :
2054 : RetryStatePtr
2055 : ProdFilter::createRetryState(const RetryPolicy& policy, Http::RequestHeaderMap& request_headers,
2056 : const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster,
2057 : RouteStatsContextOptRef route_stats_context, Runtime::Loader& runtime,
2058 : Random::RandomGenerator& random, Event::Dispatcher& dispatcher,
2059 251 : TimeSource& time_source, Upstream::ResourcePriority priority) {
2060 251 : std::unique_ptr<RetryStateImpl> retry_state =
2061 251 : RetryStateImpl::create(policy, request_headers, cluster, vcluster, route_stats_context,
2062 251 : runtime, random, dispatcher, time_source, priority);
2063 251 : if (retry_state != nullptr && retry_state->isAutomaticallyConfiguredForHttp3()) {
2064 : // Since doing retry will make Envoy to buffer the request body, if upstream using HTTP/3 is the
2065 : // only reason for doing retry, set the retry shadow buffer limit to 0 so that we don't retry or
2066 : // buffer safe requests with body which is not common.
2067 0 : setRetryShadowBufferLimit(0);
2068 0 : }
2069 251 : return retry_state;
2070 251 : }
2071 :
2072 : } // namespace Router
2073 : } // namespace Envoy
|