Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/router/router.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/router/router.h"
2
3
#include <algorithm>
4
#include <chrono>
5
#include <cstdint>
6
#include <functional>
7
#include <memory>
8
#include <string>
9
10
#include "envoy/event/dispatcher.h"
11
#include "envoy/event/timer.h"
12
#include "envoy/grpc/status.h"
13
#include "envoy/http/conn_pool.h"
14
#include "envoy/runtime/runtime.h"
15
#include "envoy/upstream/cluster_manager.h"
16
#include "envoy/upstream/health_check_host_monitor.h"
17
#include "envoy/upstream/upstream.h"
18
19
#include "source/common/common/assert.h"
20
#include "source/common/common/cleanup.h"
21
#include "source/common/common/empty_string.h"
22
#include "source/common/common/enum_to_int.h"
23
#include "source/common/common/scope_tracker.h"
24
#include "source/common/common/utility.h"
25
#include "source/common/config/utility.h"
26
#include "source/common/grpc/common.h"
27
#include "source/common/http/codes.h"
28
#include "source/common/http/header_map_impl.h"
29
#include "source/common/http/headers.h"
30
#include "source/common/http/message_impl.h"
31
#include "source/common/http/utility.h"
32
#include "source/common/network/application_protocol.h"
33
#include "source/common/network/socket_option_factory.h"
34
#include "source/common/network/transport_socket_options_impl.h"
35
#include "source/common/network/upstream_server_name.h"
36
#include "source/common/network/upstream_socket_options_filter_state.h"
37
#include "source/common/network/upstream_subject_alt_names.h"
38
#include "source/common/orca/orca_load_metrics.h"
39
#include "source/common/orca/orca_parser.h"
40
#include "source/common/router/config_impl.h"
41
#include "source/common/router/debug_config.h"
42
#include "source/common/router/retry_state_impl.h"
43
#include "source/common/runtime/runtime_features.h"
44
#include "source/common/stream_info/uint32_accessor_impl.h"
45
#include "source/common/tracing/http_tracer_impl.h"
46
47
namespace Envoy {
48
namespace Router {
49
namespace {
50
constexpr char NumInternalRedirectsFilterStateName[] = "num_internal_redirects";
51
52
0
uint32_t getLength(const Buffer::Instance* instance) { return instance ? instance->length() : 0; }
53
54
bool schemeIsHttp(const Http::RequestHeaderMap& downstream_headers,
55
0
                  OptRef<const Network::Connection> connection) {
56
0
  if (Http::Utility::schemeIsHttp(downstream_headers.getSchemeValue())) {
57
0
    return true;
58
0
  }
59
0
  if (connection.has_value() && !connection->ssl()) {
60
0
    return true;
61
0
  }
62
0
  return false;
63
0
}
64
65
constexpr uint64_t TimeoutPrecisionFactor = 100;
66
67
} // namespace
68
69
FilterConfig::FilterConfig(Stats::StatName stat_prefix,
70
                           Server::Configuration::FactoryContext& context,
71
                           ShadowWriterPtr&& shadow_writer,
72
                           const envoy::extensions::filters::http::router::v3::Router& config)
73
    : FilterConfig(
74
          context.serverFactoryContext(), stat_prefix, context.serverFactoryContext().localInfo(),
75
          context.scope(), context.serverFactoryContext().clusterManager(),
76
          context.serverFactoryContext().runtime(),
77
          context.serverFactoryContext().api().randomGenerator(), std::move(shadow_writer),
78
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, dynamic_stats, true), config.start_child_span(),
79
          config.suppress_envoy_headers(), config.respect_expected_rq_timeout(),
80
          config.suppress_grpc_request_failure_code_stats(),
81
          config.has_upstream_log_options()
82
              ? config.upstream_log_options().flush_upstream_log_on_upstream_stream()
83
              : false,
84
          config.strict_check_headers(), context.serverFactoryContext().api().timeSource(),
85
          context.serverFactoryContext().httpContext(),
86
2.06k
          context.serverFactoryContext().routerContext()) {
87
2.06k
  for (const auto& upstream_log : config.upstream_log()) {
88
12
    upstream_logs_.push_back(AccessLog::AccessLogFactory::fromProto(upstream_log, context));
89
12
  }
90
91
2.06k
  if (config.has_upstream_log_options() &&
92
2.06k
      config.upstream_log_options().has_upstream_log_flush_interval()) {
93
0
    upstream_log_flush_interval_ = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
94
0
        config.upstream_log_options().upstream_log_flush_interval()));
95
0
  }
96
97
2.06k
  if (config.upstream_http_filters_size() > 0) {
98
2
    auto& server_factory_ctx = context.serverFactoryContext();
99
2
    const Http::FilterChainUtility::FiltersList& upstream_http_filters =
100
2
        config.upstream_http_filters();
101
2
    std::shared_ptr<Http::UpstreamFilterConfigProviderManager> filter_config_provider_manager =
102
2
        Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager(
103
2
            server_factory_ctx);
104
2
    std::string prefix = context.scope().symbolTable().toString(context.scope().prefix());
105
2
    upstream_ctx_ = std::make_unique<Upstream::UpstreamFactoryContextImpl>(
106
2
        server_factory_ctx, context.initManager(), context.scope());
107
2
    Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext,
108
2
                            Server::Configuration::UpstreamHttpFilterConfigFactory>
109
2
        helper(*filter_config_provider_manager, server_factory_ctx,
110
2
               context.serverFactoryContext().clusterManager(), *upstream_ctx_, prefix);
111
2
    THROW_IF_NOT_OK(helper.processFilters(upstream_http_filters, "router upstream http",
112
2
                                          "router upstream http", upstream_http_filter_factories_));
113
2
  }
114
2.06k
}
115
116
// Express percentage as [0, TimeoutPrecisionFactor] because stats do not accept floating point
117
// values, and getting multiple significant figures on the histogram would be nice.
118
uint64_t FilterUtility::percentageOfTimeout(const std::chrono::milliseconds response_time,
119
5
                                            const std::chrono::milliseconds timeout) {
120
  // Timeouts of 0 are considered infinite. Any portion of an infinite timeout used is still
121
  // none of it.
122
5
  if (timeout.count() == 0) {
123
5
    return 0;
124
5
  }
125
126
0
  return static_cast<uint64_t>(response_time.count() * TimeoutPrecisionFactor / timeout.count());
127
5
}
128
129
void FilterUtility::setUpstreamScheme(Http::RequestHeaderMap& headers, bool downstream_ssl,
130
24.2k
                                      bool upstream_ssl, bool use_upstream) {
131
24.2k
  if (use_upstream) {
132
22.3k
    if (upstream_ssl) {
133
0
      headers.setReferenceScheme(Http::Headers::get().SchemeValues.Https);
134
22.3k
    } else {
135
22.3k
      headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http);
136
22.3k
    }
137
22.3k
    return;
138
22.3k
  }
139
140
1.81k
  if (Http::Utility::schemeIsValid(headers.getSchemeValue())) {
141
987
    return;
142
987
  }
143
  // After all the changes in https://github.com/envoyproxy/envoy/issues/14587
144
  // this path should only occur if a buggy filter has removed the :scheme
145
  // header. In that case best-effort set from X-Forwarded-Proto.
146
829
  absl::string_view xfp = headers.getForwardedProtoValue();
147
829
  if (Http::Utility::schemeIsValid(xfp)) {
148
0
    headers.setScheme(xfp);
149
0
    return;
150
0
  }
151
152
829
  if (downstream_ssl) {
153
0
    headers.setReferenceScheme(Http::Headers::get().SchemeValues.Https);
154
829
  } else {
155
829
    headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http);
156
829
  }
157
829
}
158
159
bool FilterUtility::shouldShadow(const ShadowPolicy& policy, Runtime::Loader& runtime,
160
0
                                 uint64_t stable_random) {
161
162
  // The policy's default value is set correctly regardless of whether there is a runtime key
163
  // or not, thus this call is sufficient for all cases (100% if no runtime set, otherwise
164
  // using the default value within the runtime fractional percent setting).
165
0
  return runtime.snapshot().featureEnabled(policy.runtimeKey(), policy.defaultValue(),
166
0
                                           stable_random);
167
0
}
168
169
TimeoutData FilterUtility::finalTimeout(const RouteEntry& route,
170
                                        Http::RequestHeaderMap& request_headers,
171
                                        bool insert_envoy_expected_request_timeout_ms,
172
                                        bool grpc_request, bool per_try_timeout_hedging_enabled,
173
1.81k
                                        bool respect_expected_rq_timeout) {
174
  // See if there is a user supplied timeout in a request header. If there is we take that.
175
  // Otherwise if the request is gRPC and a maximum gRPC timeout is configured we use the timeout
176
  // in the gRPC headers (or infinity when gRPC headers have no timeout), but cap that timeout to
177
  // the configured maximum gRPC timeout (which may also be infinity, represented by a 0 value),
178
  // or the default from the route config otherwise.
179
1.81k
  TimeoutData timeout;
180
1.81k
  if (!route.usingNewTimeouts()) {
181
1.81k
    if (grpc_request && route.maxGrpcTimeout()) {
182
0
      const std::chrono::milliseconds max_grpc_timeout = route.maxGrpcTimeout().value();
183
0
      auto header_timeout = Grpc::Common::getGrpcTimeout(request_headers);
184
0
      std::chrono::milliseconds grpc_timeout =
185
0
          header_timeout ? header_timeout.value() : std::chrono::milliseconds(0);
186
0
      if (route.grpcTimeoutOffset()) {
187
        // We only apply the offset if it won't result in grpc_timeout hitting 0 or below, as
188
        // setting it to 0 means infinity and a negative timeout makes no sense.
189
0
        const auto offset = *route.grpcTimeoutOffset();
190
0
        if (offset < grpc_timeout) {
191
0
          grpc_timeout -= offset;
192
0
        }
193
0
      }
194
195
      // Cap gRPC timeout to the configured maximum considering that 0 means infinity.
196
0
      if (max_grpc_timeout != std::chrono::milliseconds(0) &&
197
0
          (grpc_timeout == std::chrono::milliseconds(0) || grpc_timeout > max_grpc_timeout)) {
198
0
        grpc_timeout = max_grpc_timeout;
199
0
      }
200
0
      timeout.global_timeout_ = grpc_timeout;
201
1.81k
    } else {
202
1.81k
      timeout.global_timeout_ = route.timeout();
203
1.81k
    }
204
1.81k
  }
205
1.81k
  timeout.per_try_timeout_ = route.retryPolicy().perTryTimeout();
206
1.81k
  timeout.per_try_idle_timeout_ = route.retryPolicy().perTryIdleTimeout();
207
208
1.81k
  uint64_t header_timeout;
209
210
1.81k
  if (respect_expected_rq_timeout) {
211
    // Check if there is timeout set by egress Envoy.
212
    // If present, use that value as route timeout and don't override
213
    // *x-envoy-expected-rq-timeout-ms* header. At this point *x-envoy-upstream-rq-timeout-ms*
214
    // header should have been sanitized by egress Envoy.
215
0
    const Http::HeaderEntry* header_expected_timeout_entry =
216
0
        request_headers.EnvoyExpectedRequestTimeoutMs();
217
0
    if (header_expected_timeout_entry) {
218
0
      trySetGlobalTimeout(*header_expected_timeout_entry, timeout);
219
0
    } else {
220
0
      const Http::HeaderEntry* header_timeout_entry =
221
0
          request_headers.EnvoyUpstreamRequestTimeoutMs();
222
223
0
      if (header_timeout_entry) {
224
0
        trySetGlobalTimeout(*header_timeout_entry, timeout);
225
0
        request_headers.removeEnvoyUpstreamRequestTimeoutMs();
226
0
      }
227
0
    }
228
1.81k
  } else {
229
1.81k
    const Http::HeaderEntry* header_timeout_entry = request_headers.EnvoyUpstreamRequestTimeoutMs();
230
231
1.81k
    if (header_timeout_entry) {
232
0
      trySetGlobalTimeout(*header_timeout_entry, timeout);
233
0
      request_headers.removeEnvoyUpstreamRequestTimeoutMs();
234
0
    }
235
1.81k
  }
236
237
  // See if there is a per try/retry timeout. If it's >= global we just ignore it.
238
1.81k
  const absl::string_view per_try_timeout_entry =
239
1.81k
      request_headers.getEnvoyUpstreamRequestPerTryTimeoutMsValue();
240
1.81k
  if (!per_try_timeout_entry.empty()) {
241
0
    if (absl::SimpleAtoi(per_try_timeout_entry, &header_timeout)) {
242
0
      timeout.per_try_timeout_ = std::chrono::milliseconds(header_timeout);
243
0
    }
244
0
    request_headers.removeEnvoyUpstreamRequestPerTryTimeoutMs();
245
0
  }
246
247
1.81k
  if (timeout.per_try_timeout_ >= timeout.global_timeout_ && timeout.global_timeout_.count() != 0) {
248
0
    timeout.per_try_timeout_ = std::chrono::milliseconds(0);
249
0
  }
250
251
1.81k
  setTimeoutHeaders(0, timeout, route, request_headers, insert_envoy_expected_request_timeout_ms,
252
1.81k
                    grpc_request, per_try_timeout_hedging_enabled);
253
254
1.81k
  return timeout;
255
1.81k
}
256
257
void FilterUtility::setTimeoutHeaders(uint64_t elapsed_time, const TimeoutData& timeout,
258
                                      const RouteEntry& route,
259
                                      Http::RequestHeaderMap& request_headers,
260
                                      bool insert_envoy_expected_request_timeout_ms,
261
1.81k
                                      bool grpc_request, bool per_try_timeout_hedging_enabled) {
262
263
1.81k
  const uint64_t global_timeout = timeout.global_timeout_.count();
264
265
  // See if there is any timeout to write in the expected timeout header.
266
1.81k
  uint64_t expected_timeout = timeout.per_try_timeout_.count();
267
268
  // Use the global timeout if no per try timeout was specified or if we're
269
  // doing hedging when there are per try timeouts. Either of these scenarios
270
  // mean that the upstream server can use the full global timeout.
271
1.81k
  if (per_try_timeout_hedging_enabled || expected_timeout == 0) {
272
1.81k
    expected_timeout = global_timeout;
273
1.81k
  }
274
275
  // If the expected timeout is 0 set no timeout, as Envoy treats 0 as infinite timeout.
276
1.81k
  if (expected_timeout > 0) {
277
278
987
    if (global_timeout > 0) {
279
987
      if (elapsed_time >= global_timeout) {
280
        // We are out of time, but 0 would be an infinite timeout. So instead we send a 1ms timeout
281
        // and assume the timers armed by onRequestComplete() will fire very soon.
282
0
        expected_timeout = 1;
283
987
      } else {
284
987
        expected_timeout = std::min(expected_timeout, global_timeout - elapsed_time);
285
987
      }
286
987
    }
287
288
987
    if (insert_envoy_expected_request_timeout_ms) {
289
982
      request_headers.setEnvoyExpectedRequestTimeoutMs(expected_timeout);
290
982
    }
291
292
    // If we've configured max_grpc_timeout, override the grpc-timeout header with
293
    // the expected timeout. This ensures that the optional per try timeout is reflected
294
    // in grpc-timeout, ensuring that the upstream gRPC server is aware of the actual timeout.
295
987
    if (grpc_request && !route.usingNewTimeouts() && route.maxGrpcTimeout()) {
296
0
      Grpc::Common::toGrpcTimeout(std::chrono::milliseconds(expected_timeout), request_headers);
297
0
    }
298
987
  }
299
1.81k
}
300
301
absl::optional<std::chrono::milliseconds>
302
0
FilterUtility::tryParseHeaderTimeout(const Http::HeaderEntry& header_timeout_entry) {
303
0
  uint64_t header_timeout;
304
0
  if (absl::SimpleAtoi(header_timeout_entry.value().getStringView(), &header_timeout)) {
305
0
    return std::chrono::milliseconds(header_timeout);
306
0
  }
307
0
  return absl::nullopt;
308
0
}
309
310
void FilterUtility::trySetGlobalTimeout(const Http::HeaderEntry& header_timeout_entry,
311
0
                                        TimeoutData& timeout) {
312
0
  const auto timeout_ms = tryParseHeaderTimeout(header_timeout_entry);
313
0
  if (timeout_ms.has_value()) {
314
0
    timeout.global_timeout_ = timeout_ms.value();
315
0
  }
316
0
}
317
318
FilterUtility::HedgingParams
319
FilterUtility::finalHedgingParams(const RouteEntry& route,
320
1.81k
                                  Http::RequestHeaderMap& request_headers) {
321
1.81k
  HedgingParams hedging_params;
322
1.81k
  hedging_params.hedge_on_per_try_timeout_ = route.hedgePolicy().hedgeOnPerTryTimeout();
323
324
1.81k
  const Http::HeaderEntry* hedge_on_per_try_timeout_entry =
325
1.81k
      request_headers.EnvoyHedgeOnPerTryTimeout();
326
1.81k
  if (hedge_on_per_try_timeout_entry) {
327
0
    if (hedge_on_per_try_timeout_entry->value() == "true") {
328
0
      hedging_params.hedge_on_per_try_timeout_ = true;
329
0
    }
330
0
    if (hedge_on_per_try_timeout_entry->value() == "false") {
331
0
      hedging_params.hedge_on_per_try_timeout_ = false;
332
0
    }
333
334
0
    request_headers.removeEnvoyHedgeOnPerTryTimeout();
335
0
  }
336
337
1.81k
  return hedging_params;
338
1.81k
}
339
340
2.25k
Filter::~Filter() {
341
  // Upstream resources should already have been cleaned.
342
2.25k
  ASSERT(upstream_requests_.empty());
343
2.25k
  ASSERT(!retry_state_);
344
2.25k
}
345
346
const FilterUtility::StrictHeaderChecker::HeaderCheckResult
347
FilterUtility::StrictHeaderChecker::checkHeader(Http::RequestHeaderMap& headers,
348
0
                                                const Http::LowerCaseString& target_header) {
349
0
  if (target_header == Http::Headers::get().EnvoyUpstreamRequestTimeoutMs) {
350
0
    return isInteger(headers.EnvoyUpstreamRequestTimeoutMs());
351
0
  } else if (target_header == Http::Headers::get().EnvoyUpstreamRequestPerTryTimeoutMs) {
352
0
    return isInteger(headers.EnvoyUpstreamRequestPerTryTimeoutMs());
353
0
  } else if (target_header == Http::Headers::get().EnvoyMaxRetries) {
354
0
    return isInteger(headers.EnvoyMaxRetries());
355
0
  } else if (target_header == Http::Headers::get().EnvoyRetryOn) {
356
0
    return hasValidRetryFields(headers.EnvoyRetryOn(), &Router::RetryStateImpl::parseRetryOn);
357
0
  } else if (target_header == Http::Headers::get().EnvoyRetryGrpcOn) {
358
0
    return hasValidRetryFields(headers.EnvoyRetryGrpcOn(),
359
0
                               &Router::RetryStateImpl::parseRetryGrpcOn);
360
0
  }
361
  // Should only validate headers for which we have implemented a validator.
362
0
  PANIC("unexpectedly reached");
363
0
}
364
365
2.54k
Stats::StatName Filter::upstreamZone(Upstream::HostDescriptionConstSharedPtr upstream_host) {
366
2.54k
  return upstream_host ? upstream_host->localityZoneStatName() : config_->empty_stat_name_;
367
2.54k
}
368
369
void Filter::chargeUpstreamCode(uint64_t response_status_code,
370
                                const Http::ResponseHeaderMap& response_headers,
371
                                Upstream::HostDescriptionConstSharedPtr upstream_host,
372
1.74k
                                bool dropped) {
373
  // Passing the response_status_code explicitly is an optimization to avoid
374
  // multiple calls to slow Http::Utility::getResponseStatus.
375
1.74k
  ASSERT(response_status_code == Http::Utility::getResponseStatus(response_headers));
376
1.74k
  if (config_->emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck()) {
377
1.74k
    const Http::HeaderEntry* upstream_canary_header = response_headers.EnvoyUpstreamCanary();
378
1.74k
    const bool is_canary = (upstream_canary_header && upstream_canary_header->value() == "true") ||
379
1.74k
                           (upstream_host ? upstream_host->canary() : false);
380
1.74k
    const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_);
381
382
1.74k
    Stats::StatName upstream_zone = upstreamZone(upstream_host);
383
1.74k
    Http::CodeStats::ResponseStatInfo info{
384
1.74k
        config_->scope_,
385
1.74k
        cluster_->statsScope(),
386
1.74k
        config_->empty_stat_name_,
387
1.74k
        response_status_code,
388
1.74k
        internal_request,
389
1.74k
        route_->virtualHost().statName(),
390
1.74k
        request_vcluster_ ? request_vcluster_->statName() : config_->empty_stat_name_,
391
1.74k
        route_stats_context_.has_value() ? route_stats_context_->statName()
392
1.74k
                                         : config_->empty_stat_name_,
393
1.74k
        config_->zone_name_,
394
1.74k
        upstream_zone,
395
1.74k
        is_canary};
396
397
1.74k
    Http::CodeStats& code_stats = httpContext().codeStats();
398
1.74k
    code_stats.chargeResponseStat(info, exclude_http_code_stats_);
399
400
1.74k
    if (alt_stat_prefix_ != nullptr) {
401
0
      Http::CodeStats::ResponseStatInfo alt_info{config_->scope_,
402
0
                                                 cluster_->statsScope(),
403
0
                                                 alt_stat_prefix_->statName(),
404
0
                                                 response_status_code,
405
0
                                                 internal_request,
406
0
                                                 config_->empty_stat_name_,
407
0
                                                 config_->empty_stat_name_,
408
0
                                                 config_->empty_stat_name_,
409
0
                                                 config_->zone_name_,
410
0
                                                 upstream_zone,
411
0
                                                 is_canary};
412
0
      code_stats.chargeResponseStat(alt_info, exclude_http_code_stats_);
413
0
    }
414
415
1.74k
    if (dropped) {
416
0
      cluster_->loadReportStats().upstream_rq_dropped_.inc();
417
0
    }
418
1.74k
    if (upstream_host && Http::CodeUtility::is5xx(response_status_code)) {
419
107
      upstream_host->stats().rq_error_.inc();
420
107
    }
421
1.74k
  }
422
1.74k
}
423
424
void Filter::chargeUpstreamCode(Http::Code code,
425
                                Upstream::HostDescriptionConstSharedPtr upstream_host,
426
107
                                bool dropped) {
427
107
  const uint64_t response_status_code = enumToInt(code);
428
107
  const auto fake_response_headers = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(
429
107
      {{Http::Headers::get().Status, std::to_string(response_status_code)}});
430
107
  chargeUpstreamCode(response_status_code, *fake_response_headers, upstream_host, dropped);
431
107
}
432
433
1.90k
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
434
1.90k
  downstream_headers_ = &headers;
435
436
  // Extract debug configuration from filter state. This is used further along to determine whether
437
  // we should append cluster and host headers to the response, and whether to forward the request
438
  // upstream.
439
1.90k
  const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
440
1.90k
  const DebugConfig* debug_config = filter_state->getDataReadOnly<DebugConfig>(DebugConfig::key());
441
442
  // TODO: Maybe add a filter API for this.
443
1.90k
  grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
444
1.90k
  exclude_http_code_stats_ = grpc_request_ && config_->suppress_grpc_request_failure_code_stats_;
445
446
  // Only increment rq total stat if we actually decode headers here. This does not count requests
447
  // that get handled by earlier filters.
448
1.90k
  stats_.rq_total_.inc();
449
450
  // Initialize the `modify_headers` function as a no-op (so we don't have to remember to check it
451
  // against nullptr before calling it), and feed it behavior later if/when we have cluster info
452
  // headers to append.
453
1.90k
  std::function<void(Http::ResponseHeaderMap&)> modify_headers = [](Http::ResponseHeaderMap&) {};
454
455
  // Determine if there is a route entry or a direct response for the request.
456
1.90k
  route_ = callbacks_->route();
457
1.90k
  if (!route_) {
458
0
    stats_.no_route_.inc();
459
0
    ENVOY_STREAM_LOG(debug, "no route match for URL '{}'", *callbacks_, headers.getPathValue());
460
461
0
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoRouteFound);
462
0
    callbacks_->sendLocalReply(Http::Code::NotFound, "", modify_headers, absl::nullopt,
463
0
                               StreamInfo::ResponseCodeDetails::get().RouteNotFound);
464
0
    return Http::FilterHeadersStatus::StopIteration;
465
0
  }
466
467
  // Determine if there is a direct response for the request.
468
1.90k
  const auto* direct_response = route_->directResponseEntry();
469
1.90k
  if (direct_response != nullptr) {
470
89
    stats_.rq_direct_response_.inc();
471
89
    direct_response->rewritePathHeader(headers, !config_->suppress_envoy_headers_);
472
89
    callbacks_->sendLocalReply(
473
89
        direct_response->responseCode(), direct_response->responseBody(),
474
89
        [this, direct_response,
475
89
         &request_headers = headers](Http::ResponseHeaderMap& response_headers) -> void {
476
89
          std::string new_uri;
477
89
          if (request_headers.Path()) {
478
89
            new_uri = direct_response->newUri(request_headers);
479
89
          }
480
          // See https://tools.ietf.org/html/rfc7231#section-7.1.2.
481
89
          const auto add_location =
482
89
              direct_response->responseCode() == Http::Code::Created ||
483
89
              Http::CodeUtility::is3xx(enumToInt(direct_response->responseCode()));
484
89
          if (!new_uri.empty() && add_location) {
485
1
            response_headers.addReferenceKey(Http::Headers::get().Location, new_uri);
486
1
          }
487
89
          direct_response->finalizeResponseHeaders(response_headers, callbacks_->streamInfo());
488
89
        },
489
89
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().DirectResponse);
490
89
    return Http::FilterHeadersStatus::StopIteration;
491
89
  }
492
493
  // A route entry matches for the request.
494
1.82k
  route_entry_ = route_->routeEntry();
495
  // If there's a route specific limit and it's smaller than general downstream
496
  // limits, apply the new cap.
497
1.82k
  retry_shadow_buffer_limit_ =
498
1.82k
      std::min(retry_shadow_buffer_limit_, route_entry_->retryShadowBufferLimit());
499
1.82k
  if (debug_config && debug_config->append_cluster_) {
500
    // The cluster name will be appended to any local or upstream responses from this point.
501
0
    modify_headers = [this, debug_config](Http::ResponseHeaderMap& headers) {
502
0
      headers.addCopy(debug_config->cluster_header_.value_or(Http::Headers::get().EnvoyCluster),
503
0
                      route_entry_->clusterName());
504
0
    };
505
0
  }
506
1.82k
  Upstream::ThreadLocalCluster* cluster =
507
1.82k
      config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
508
1.82k
  if (!cluster) {
509
4
    stats_.no_cluster_.inc();
510
4
    ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName());
511
512
4
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoClusterFound);
513
4
    callbacks_->sendLocalReply(route_entry_->clusterNotFoundResponseCode(), "", modify_headers,
514
4
                               absl::nullopt,
515
4
                               StreamInfo::ResponseCodeDetails::get().ClusterNotFound);
516
4
    return Http::FilterHeadersStatus::StopIteration;
517
4
  }
518
1.81k
  cluster_ = cluster->info();
519
520
  // Set up stat prefixes, etc.
521
1.81k
  request_vcluster_ = route_->virtualHost().virtualCluster(headers);
522
1.81k
  if (request_vcluster_ != nullptr) {
523
5
    callbacks_->streamInfo().setVirtualClusterName(request_vcluster_->name());
524
5
  }
525
1.81k
  route_stats_context_ = route_entry_->routeStatsContext();
526
1.81k
  ENVOY_STREAM_LOG(debug, "cluster '{}' match for URL '{}'", *callbacks_,
527
1.81k
                   route_entry_->clusterName(), headers.getPathValue());
528
529
1.81k
  if (config_->strict_check_headers_ != nullptr) {
530
0
    for (const auto& header : *config_->strict_check_headers_) {
531
0
      const auto res = FilterUtility::StrictHeaderChecker::checkHeader(headers, header);
532
0
      if (!res.valid_) {
533
0
        callbacks_->streamInfo().setResponseFlag(
534
0
            StreamInfo::CoreResponseFlag::InvalidEnvoyRequestHeaders);
535
0
        const std::string body = fmt::format("invalid header '{}' with value '{}'",
536
0
                                             std::string(res.entry_->key().getStringView()),
537
0
                                             std::string(res.entry_->value().getStringView()));
538
0
        const std::string details =
539
0
            absl::StrCat(StreamInfo::ResponseCodeDetails::get().InvalidEnvoyRequestHeaders, "{",
540
0
                         StringUtil::replaceAllEmptySpace(res.entry_->key().getStringView()), "}");
541
0
        callbacks_->sendLocalReply(Http::Code::BadRequest, body, nullptr, absl::nullopt, details);
542
0
        return Http::FilterHeadersStatus::StopIteration;
543
0
      }
544
0
    }
545
0
  }
546
547
1.81k
  const Http::HeaderEntry* request_alt_name = headers.EnvoyUpstreamAltStatName();
548
1.81k
  if (request_alt_name) {
549
0
    alt_stat_prefix_ = std::make_unique<Stats::StatNameDynamicStorage>(
550
0
        request_alt_name->value().getStringView(), config_->scope_.symbolTable());
551
0
    headers.removeEnvoyUpstreamAltStatName();
552
0
  }
553
554
  // See if we are supposed to immediately kill some percentage of this cluster's traffic.
555
1.81k
  if (cluster_->maintenanceMode()) {
556
0
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
557
0
    chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true);
558
0
    callbacks_->sendLocalReply(
559
0
        Http::Code::ServiceUnavailable, "maintenance mode",
560
0
        [modify_headers, this](Http::ResponseHeaderMap& headers) {
561
0
          if (!config_->suppress_envoy_headers_) {
562
0
            headers.addReference(Http::Headers::get().EnvoyOverloaded,
563
0
                                 Http::Headers::get().EnvoyOverloadedValues.True);
564
0
          }
565
          // Note: append_cluster_info does not respect suppress_envoy_headers.
566
0
          modify_headers(headers);
567
0
        },
568
0
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().MaintenanceMode);
569
0
    cluster_->trafficStats()->upstream_rq_maintenance_mode_.inc();
570
0
    return Http::FilterHeadersStatus::StopIteration;
571
0
  }
572
573
  // Support DROP_OVERLOAD config from control plane to drop certain percentage of traffic.
574
1.81k
  if (checkDropOverload(*cluster, modify_headers)) {
575
0
    return Http::FilterHeadersStatus::StopIteration;
576
0
  }
577
578
  // Fetch a connection pool for the upstream cluster.
579
1.81k
  const auto& upstream_http_protocol_options = cluster_->upstreamHttpProtocolOptions();
580
581
1.81k
  if (upstream_http_protocol_options.has_value() &&
582
1.81k
      (upstream_http_protocol_options.value().auto_sni() ||
583
0
       upstream_http_protocol_options.value().auto_san_validation())) {
584
    // Default the header to Host/Authority header.
585
0
    std::string header_value = route_entry_->getRequestHostValue(headers);
586
0
    if (!Runtime::runtimeFeatureEnabled(
587
0
            "envoy.reloadable_features.use_route_host_mutation_for_auto_sni_san")) {
588
0
      header_value = std::string(headers.getHostValue());
589
0
    }
590
591
    // Check whether `override_auto_sni_header` is specified.
592
0
    const auto override_auto_sni_header =
593
0
        upstream_http_protocol_options.value().override_auto_sni_header();
594
0
    if (!override_auto_sni_header.empty()) {
595
      // Use the header value from `override_auto_sni_header` to set the SNI value.
596
0
      const auto overridden_header_value = Http::HeaderUtility::getAllOfHeaderAsString(
597
0
          headers, Http::LowerCaseString(override_auto_sni_header));
598
0
      if (overridden_header_value.result().has_value() &&
599
0
          !overridden_header_value.result().value().empty()) {
600
0
        header_value = overridden_header_value.result().value();
601
0
      }
602
0
    }
603
0
    const auto parsed_authority = Http::Utility::parseAuthority(header_value);
604
0
    bool should_set_sni = !parsed_authority.is_ip_address_;
605
    // `host_` returns a string_view so doing this should be safe.
606
0
    absl::string_view sni_value = parsed_authority.host_;
607
608
0
    if (should_set_sni && upstream_http_protocol_options.value().auto_sni() &&
609
0
        !callbacks_->streamInfo().filterState()->hasDataWithName(
610
0
            Network::UpstreamServerName::key())) {
611
0
      callbacks_->streamInfo().filterState()->setData(
612
0
          Network::UpstreamServerName::key(),
613
0
          std::make_unique<Network::UpstreamServerName>(sni_value),
614
0
          StreamInfo::FilterState::StateType::Mutable);
615
0
    }
616
617
0
    if (upstream_http_protocol_options.value().auto_san_validation() &&
618
0
        !callbacks_->streamInfo().filterState()->hasDataWithName(
619
0
            Network::UpstreamSubjectAltNames::key())) {
620
0
      callbacks_->streamInfo().filterState()->setData(
621
0
          Network::UpstreamSubjectAltNames::key(),
622
0
          std::make_unique<Network::UpstreamSubjectAltNames>(
623
0
              std::vector<std::string>{std::string(sni_value)}),
624
0
          StreamInfo::FilterState::StateType::Mutable);
625
0
    }
626
0
  }
627
628
1.81k
  transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState(
629
1.81k
      *callbacks_->streamInfo().filterState());
630
631
1.81k
  if (auto downstream_connection = downstreamConnection(); downstream_connection != nullptr) {
632
987
    if (auto typed_state = downstream_connection->streamInfo()
633
987
                               .filterState()
634
987
                               .getDataReadOnly<Network::UpstreamSocketOptionsFilterState>(
635
987
                                   Network::UpstreamSocketOptionsFilterState::key());
636
987
        typed_state != nullptr) {
637
0
      auto downstream_options = typed_state->value();
638
0
      if (!upstream_options_) {
639
0
        upstream_options_ = std::make_shared<Network::Socket::Options>();
640
0
      }
641
0
      Network::Socket::appendOptions(upstream_options_, downstream_options);
642
0
    }
643
987
  }
644
645
1.81k
  if (upstream_options_ && callbacks_->getUpstreamSocketOptions()) {
646
0
    Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions());
647
0
  }
648
649
1.81k
  std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
650
651
1.81k
  if (!generic_conn_pool) {
652
0
    sendNoHealthyUpstreamResponse();
653
0
    return Http::FilterHeadersStatus::StopIteration;
654
0
  }
655
1.81k
  Upstream::HostDescriptionConstSharedPtr host = generic_conn_pool->host();
656
657
1.81k
  if (debug_config && debug_config->append_upstream_host_) {
658
    // The hostname and address will be appended to any local or upstream responses from this point,
659
    // possibly in addition to the cluster name.
660
0
    modify_headers = [modify_headers, debug_config, host](Http::ResponseHeaderMap& headers) {
661
0
      modify_headers(headers);
662
0
      headers.addCopy(
663
0
          debug_config->hostname_header_.value_or(Http::Headers::get().EnvoyUpstreamHostname),
664
0
          host->hostname());
665
0
      headers.addCopy(debug_config->host_address_header_.value_or(
666
0
                          Http::Headers::get().EnvoyUpstreamHostAddress),
667
0
                      host->address()->asString());
668
0
    };
669
0
  }
670
671
  // If we've been instructed not to forward the request upstream, send an empty local response.
672
1.81k
  if (debug_config && debug_config->do_not_forward_) {
673
0
    modify_headers = [modify_headers, debug_config](Http::ResponseHeaderMap& headers) {
674
0
      modify_headers(headers);
675
0
      headers.addCopy(
676
0
          debug_config->not_forwarded_header_.value_or(Http::Headers::get().EnvoyNotForwarded),
677
0
          "true");
678
0
    };
679
0
    callbacks_->sendLocalReply(Http::Code::NoContent, "", modify_headers, absl::nullopt, "");
680
0
    return Http::FilterHeadersStatus::StopIteration;
681
0
  }
682
683
1.81k
  if (callbacks_->shouldLoadShed()) {
684
0
    callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "envoy overloaded", nullptr,
685
0
                               absl::nullopt, StreamInfo::ResponseCodeDetails::get().Overload);
686
0
    stats_.rq_overload_local_reply_.inc();
687
0
    return Http::FilterHeadersStatus::StopIteration;
688
0
  }
689
690
1.81k
  hedging_params_ = FilterUtility::finalHedgingParams(*route_entry_, headers);
691
692
1.81k
  timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_->suppress_envoy_headers_,
693
1.81k
                                         grpc_request_, hedging_params_.hedge_on_per_try_timeout_,
694
1.81k
                                         config_->respect_expected_rq_timeout_);
695
696
1.81k
  const Http::HeaderEntry* header_max_stream_duration_entry =
697
1.81k
      headers.EnvoyUpstreamStreamDurationMs();
698
1.81k
  if (header_max_stream_duration_entry) {
699
0
    dynamic_max_stream_duration_ =
700
0
        FilterUtility::tryParseHeaderTimeout(*header_max_stream_duration_entry);
701
0
    headers.removeEnvoyUpstreamStreamDurationMs();
702
0
  }
703
704
  // If this header is set with any value, use an alternate response code on timeout
705
1.81k
  if (headers.EnvoyUpstreamRequestTimeoutAltResponse()) {
706
0
    timeout_response_code_ = Http::Code::NoContent;
707
0
    headers.removeEnvoyUpstreamRequestTimeoutAltResponse();
708
0
  }
709
710
1.81k
  include_attempt_count_in_request_ = route_entry_->includeAttemptCountInRequest();
711
1.81k
  if (include_attempt_count_in_request_) {
712
0
    headers.setEnvoyAttemptCount(attempt_count_);
713
0
  }
714
715
  // The router has reached a point where it is going to try to send a request upstream,
716
  // so now modify_headers should attach x-envoy-attempt-count to the downstream response if the
717
  // config flag is true.
718
1.81k
  if (route_entry_->includeAttemptCountInResponse()) {
719
0
    modify_headers = [modify_headers, this](Http::ResponseHeaderMap& headers) {
720
0
      modify_headers(headers);
721
722
      // This header is added without checking for config_->suppress_envoy_headers_ to mirror what
723
      // is done for upstream requests.
724
0
      headers.setEnvoyAttemptCount(attempt_count_);
725
0
    };
726
0
  }
727
1.81k
  callbacks_->streamInfo().setAttemptCount(attempt_count_);
728
729
1.81k
  route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(),
730
1.81k
                                       !config_->suppress_envoy_headers_);
731
1.81k
  FilterUtility::setUpstreamScheme(
732
1.81k
      headers, callbacks_->streamInfo().downstreamAddressProvider().sslConnection() != nullptr,
733
1.81k
      host->transportSocketFactory().sslCtx() != nullptr,
734
1.81k
      callbacks_->streamInfo().shouldSchemeMatchUpstream());
735
736
  // Ensure an http transport scheme is selected before continuing with decoding.
737
1.81k
  ASSERT(headers.Scheme());
738
739
1.81k
  retry_state_ = createRetryState(
740
1.81k
      route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_, route_stats_context_,
741
1.81k
      config_->factory_context_, callbacks_->dispatcher(), route_entry_->priority());
742
743
  // Determine which shadow policies to use. It's possible that we don't do any shadowing due to
744
  // runtime keys. Also the method CONNECT doesn't support shadowing.
745
1.81k
  auto method = headers.getMethodValue();
746
1.81k
  if (method != Http::Headers::get().MethodValues.Connect) {
747
1.81k
    for (const auto& shadow_policy : route_entry_->shadowPolicies()) {
748
0
      const auto& policy_ref = *shadow_policy;
749
0
      if (FilterUtility::shouldShadow(policy_ref, config_->runtime_, callbacks_->streamId())) {
750
0
        active_shadow_policies_.push_back(std::cref(policy_ref));
751
0
        shadow_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*downstream_headers_);
752
0
      }
753
0
    }
754
1.81k
  }
755
756
1.81k
  ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers);
757
758
  // Hang onto the modify_headers function for later use in handling upstream responses.
759
1.81k
  modify_headers_ = modify_headers;
760
761
1.81k
  const bool can_send_early_data =
762
1.81k
      route_entry_->earlyDataPolicy().allowsEarlyDataForRequest(*downstream_headers_);
763
764
1.81k
  include_timeout_retry_header_in_request_ = route_->virtualHost().includeIsTimeoutRetryHeader();
765
766
  // Set initial HTTP/3 use based on the presence of HTTP/1.1 proxy config.
767
  // For retries etc, HTTP/3 usability may transition from true to false, but
768
  // will never transition from false to true.
769
1.81k
  bool can_use_http3 =
770
1.81k
      !transport_socket_options_ || !transport_socket_options_->http11ProxyInfo().has_value();
771
1.81k
  UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
772
1.81k
      *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3,
773
1.81k
      allow_multiplexed_upstream_half_close_ /*enable_half_close*/);
774
1.81k
  LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
775
1.81k
  upstream_requests_.front()->acceptHeadersFromRouter(end_stream);
776
1.81k
  if (streaming_shadows_) {
777
    // start the shadow streams.
778
0
    for (const auto& shadow_policy_wrapper : active_shadow_policies_) {
779
0
      const auto& shadow_policy = shadow_policy_wrapper.get();
780
0
      const absl::optional<absl::string_view> shadow_cluster_name =
781
0
          getShadowCluster(shadow_policy, *downstream_headers_);
782
0
      if (!shadow_cluster_name.has_value()) {
783
0
        continue;
784
0
      }
785
0
      auto shadow_headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_);
786
0
      auto options =
787
0
          Http::AsyncClient::RequestOptions()
788
0
              .setTimeout(timeout_.global_timeout_)
789
0
              .setParentSpan(callbacks_->activeSpan())
790
0
              .setChildSpanName("mirror")
791
0
              .setSampled(shadow_policy.traceSampled())
792
0
              .setIsShadow(true)
793
0
              .setIsShadowSuffixDisabled(shadow_policy.disableShadowHostSuffixAppend())
794
0
              .setBufferAccount(callbacks_->account())
795
              // A buffer limit of 1 is set in the case that retry_shadow_buffer_limit_ == 0,
796
              // because a buffer limit of zero on async clients is interpreted as no buffer limit.
797
0
              .setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_)
798
0
              .setDiscardResponseBody(true);
799
0
      options.setFilterConfig(config_);
800
0
      if (end_stream) {
801
        // This is a header-only request, and can be dispatched immediately to the shadow
802
        // without waiting.
803
0
        Http::RequestMessagePtr request(new Http::RequestMessageImpl(
804
0
            Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_)));
805
0
        config_->shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request),
806
0
                                       options);
807
0
      } else {
808
0
        Http::AsyncClient::OngoingRequest* shadow_stream = config_->shadowWriter().streamingShadow(
809
0
            std::string(shadow_cluster_name.value()), std::move(shadow_headers), options);
810
0
        if (shadow_stream != nullptr) {
811
0
          shadow_streams_.insert(shadow_stream);
812
0
          shadow_stream->setDestructorCallback(
813
0
              [this, shadow_stream]() { shadow_streams_.erase(shadow_stream); });
814
0
          shadow_stream->setWatermarkCallbacks(watermark_callbacks_);
815
0
        }
816
0
      }
817
0
    }
818
0
  }
819
1.81k
  if (end_stream) {
820
585
    onRequestComplete();
821
585
  }
822
823
1.81k
  return Http::FilterHeadersStatus::StopIteration;
824
1.81k
}
825
826
std::unique_ptr<GenericConnPool>
827
1.81k
Filter::createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster) {
828
1.81k
  GenericConnPoolFactory* factory = nullptr;
829
1.81k
  if (cluster_->upstreamConfig().has_value()) {
830
0
    factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>(
831
0
        cluster_->upstreamConfig().ref());
832
0
    ENVOY_BUG(factory != nullptr,
833
0
              fmt::format("invalid factory type '{}', failing over to default upstream",
834
0
                          cluster_->upstreamConfig().ref().DebugString()));
835
0
  }
836
1.81k
  if (!factory) {
837
1.81k
    factory = &config_->router_context_.genericConnPoolFactory();
838
1.81k
  }
839
840
1.81k
  using UpstreamProtocol = Envoy::Router::GenericConnPoolFactory::UpstreamProtocol;
841
1.81k
  UpstreamProtocol upstream_protocol = UpstreamProtocol::HTTP;
842
1.81k
  if (route_entry_->connectConfig().has_value()) {
843
0
    auto method = downstream_headers_->getMethodValue();
844
0
    if (Http::HeaderUtility::isConnectUdpRequest(*downstream_headers_)) {
845
0
      upstream_protocol = UpstreamProtocol::UDP;
846
0
    } else if (method == Http::Headers::get().MethodValues.Connect ||
847
0
               (route_entry_->connectConfig()->allow_post() &&
848
0
                method == Http::Headers::get().MethodValues.Post)) {
849
      // Allow POST for proxying raw TCP if it is configured.
850
0
      upstream_protocol = UpstreamProtocol::TCP;
851
0
    }
852
0
  }
853
1.81k
  return factory->createGenericConnPool(thread_local_cluster, upstream_protocol,
854
1.81k
                                        route_entry_->priority(),
855
1.81k
                                        callbacks_->streamInfo().protocol(), this);
856
1.81k
}
857
858
0
void Filter::sendNoHealthyUpstreamResponse() {
859
0
  callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoHealthyUpstream);
860
0
  chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, false);
861
0
  callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream", modify_headers_,
862
0
                             absl::nullopt,
863
0
                             StreamInfo::ResponseCodeDetails::get().NoHealthyUpstream);
864
0
}
865
866
3.05k
Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) {
867
  // upstream_requests_.size() cannot be > 1 because that only happens when a per
868
  // try timeout occurs with hedge_on_per_try_timeout enabled but the per
869
  // try timeout timer is not started until onRequestComplete(). It could be zero
870
  // if the first request attempt has already failed and a retry is waiting for
871
  // a backoff timer.
872
3.05k
  ASSERT(upstream_requests_.size() <= 1);
873
874
3.05k
  bool buffering = (retry_state_ && retry_state_->enabled()) ||
875
3.05k
                   (!active_shadow_policies_.empty() && !streaming_shadows_) ||
876
3.05k
                   (route_entry_ && route_entry_->internalRedirectPolicy().enabled());
877
3.05k
  if (buffering &&
878
3.05k
      getLength(callbacks_->decodingBuffer()) + data.length() > retry_shadow_buffer_limit_) {
879
0
    ENVOY_LOG(debug,
880
0
              "The request payload has at least {} bytes data which exceeds buffer limit {}. Give "
881
0
              "up on the retry/shadow.",
882
0
              getLength(callbacks_->decodingBuffer()) + data.length(), retry_shadow_buffer_limit_);
883
0
    cluster_->trafficStats()->retry_or_shadow_abandoned_.inc();
884
0
    retry_state_.reset();
885
0
    buffering = false;
886
0
    active_shadow_policies_.clear();
887
0
    request_buffer_overflowed_ = true;
888
889
    // If we had to abandon buffering and there's no request in progress, abort the request and
890
    // clean up. This happens if the initial upstream request failed, and we are currently waiting
891
    // for a backoff timer before starting the next upstream attempt.
892
0
    if (upstream_requests_.empty()) {
893
0
      cleanup();
894
0
      callbacks_->sendLocalReply(
895
0
          Http::Code::InsufficientStorage, "exceeded request buffer limit while retrying upstream",
896
0
          modify_headers_, absl::nullopt,
897
0
          StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit);
898
0
      return Http::FilterDataStatus::StopIterationNoBuffer;
899
0
    }
900
0
  }
901
902
3.05k
  for (auto* shadow_stream : shadow_streams_) {
903
0
    if (end_stream) {
904
0
      shadow_stream->removeDestructorCallback();
905
0
      shadow_stream->removeWatermarkCallbacks();
906
0
    }
907
0
    Buffer::OwnedImpl copy(data);
908
0
    shadow_stream->sendData(copy, end_stream);
909
0
  }
910
3.05k
  if (end_stream) {
911
1.04k
    shadow_streams_.clear();
912
1.04k
  }
913
3.05k
  if (buffering) {
914
0
    if (!upstream_requests_.empty()) {
915
0
      Buffer::OwnedImpl copy(data);
916
0
      upstream_requests_.front()->acceptDataFromRouter(copy, end_stream);
917
0
    }
918
919
    // If we are potentially going to retry or buffer shadow this request we need to buffer.
920
    // This will not cause the connection manager to 413 because before we hit the
921
    // buffer limit we give up on retries and buffering. We must buffer using addDecodedData()
922
    // so that all buffered data is available by the time we do request complete processing and
923
    // potentially shadow. Additionally, we can't do a copy here because there's a check down
924
    // this stack for whether `data` is the same buffer as already buffered data.
925
0
    callbacks_->addDecodedData(data, true);
926
3.05k
  } else {
927
3.05k
    if (!Runtime::runtimeFeatureEnabled(
928
3.05k
            "envoy.reloadable_features.send_local_reply_when_no_buffer_and_upstream_request")) {
929
0
      upstream_requests_.front()->acceptDataFromRouter(data, end_stream);
930
3.05k
    } else {
931
3.05k
      if (!upstream_requests_.empty()) {
932
3.05k
        upstream_requests_.front()->acceptDataFromRouter(data, end_stream);
933
3.05k
      } else {
934
        // not buffering any data for retry, shadow, and internal redirect, and there will be
935
        // no more upstream request, abort the request and clean up.
936
0
        cleanup();
937
0
        callbacks_->sendLocalReply(
938
0
            Http::Code::ServiceUnavailable,
939
0
            "upstream is closed prematurely during decoding data from downstream", modify_headers_,
940
0
            absl::nullopt, StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset);
941
0
        return Http::FilterDataStatus::StopIterationNoBuffer;
942
0
      }
943
3.05k
    }
944
3.05k
  }
945
946
3.05k
  if (end_stream) {
947
1.04k
    onRequestComplete();
948
1.04k
  }
949
950
3.05k
  return Http::FilterDataStatus::StopIterationNoBuffer;
951
3.05k
}
952
953
0
Http::FilterTrailersStatus Filter::decodeTrailers(Http::RequestTrailerMap& trailers) {
954
0
  ENVOY_STREAM_LOG(debug, "router decoding trailers:\n{}", *callbacks_, trailers);
955
956
0
  if (shadow_headers_) {
957
0
    shadow_trailers_ = Http::createHeaderMap<Http::RequestTrailerMapImpl>(trailers);
958
0
  }
959
960
  // upstream_requests_.size() cannot be > 1 because that only happens when a per
961
  // try timeout occurs with hedge_on_per_try_timeout enabled but the per
962
  // try timeout timer is not started until onRequestComplete(). It could be zero
963
  // if the first request attempt has already failed and a retry is waiting for
964
  // a backoff timer.
965
0
  ASSERT(upstream_requests_.size() <= 1);
966
0
  downstream_trailers_ = &trailers;
967
0
  if (!upstream_requests_.empty()) {
968
0
    upstream_requests_.front()->acceptTrailersFromRouter(trailers);
969
0
  }
970
0
  for (auto* shadow_stream : shadow_streams_) {
971
0
    shadow_stream->removeDestructorCallback();
972
0
    shadow_stream->removeWatermarkCallbacks();
973
0
    shadow_stream->captureAndSendTrailers(
974
0
        Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_));
975
0
  }
976
0
  shadow_streams_.clear();
977
978
0
  onRequestComplete();
979
0
  return Http::FilterTrailersStatus::StopIteration;
980
0
}
981
982
26
Http::FilterMetadataStatus Filter::decodeMetadata(Http::MetadataMap& metadata_map) {
983
26
  Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
984
26
  if (!upstream_requests_.empty()) {
985
    // TODO(soya3129): Save metadata for retry, redirect and shadowing case.
986
26
    upstream_requests_.front()->acceptMetadataFromRouter(std::move(metadata_map_ptr));
987
26
  }
988
26
  return Http::FilterMetadataStatus::Continue;
989
26
}
990
991
2.25k
void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
992
2.25k
  callbacks_ = &callbacks;
993
  // As the decoder filter only pushes back via watermarks once data has reached
994
  // it, it can latch the current buffer limit and does not need to update the
995
  // limit if another filter increases it.
996
  //
997
  // The default is "do not limit". If there are configured (non-zero) buffer
998
  // limits, apply them here.
999
2.25k
  if (callbacks_->decoderBufferLimit() != 0) {
1000
1.41k
    retry_shadow_buffer_limit_ = callbacks_->decoderBufferLimit();
1001
1.41k
  }
1002
1003
2.25k
  watermark_callbacks_.setDecoderFilterCallbacks(callbacks_);
1004
2.25k
}
1005
1006
4.08k
void Filter::cleanup() {
1007
  // All callers of cleanup() should have cleaned out the upstream_requests_
1008
  // list as appropriate.
1009
4.08k
  ASSERT(upstream_requests_.empty());
1010
1011
4.08k
  retry_state_.reset();
1012
4.08k
  if (response_timeout_) {
1013
916
    response_timeout_->disableTimer();
1014
916
    response_timeout_.reset();
1015
916
  }
1016
4.08k
}
1017
1018
absl::optional<absl::string_view> Filter::getShadowCluster(const ShadowPolicy& policy,
1019
0
                                                           const Http::HeaderMap& headers) const {
1020
0
  if (!policy.cluster().empty()) {
1021
0
    return policy.cluster();
1022
0
  } else {
1023
0
    ASSERT(!policy.clusterHeader().get().empty());
1024
0
    const auto entry = headers.get(policy.clusterHeader());
1025
0
    if (!entry.empty() && !entry[0]->value().empty()) {
1026
0
      return entry[0]->value().getStringView();
1027
0
    }
1028
0
    ENVOY_STREAM_LOG(debug, "There is no cluster name in header: {}", *callbacks_,
1029
0
                     policy.clusterHeader());
1030
0
    return absl::nullopt;
1031
0
  }
1032
0
}
1033
1034
1.62k
void Filter::maybeDoShadowing() {
1035
1.62k
  for (const auto& shadow_policy_wrapper : active_shadow_policies_) {
1036
0
    const auto& shadow_policy = shadow_policy_wrapper.get();
1037
1038
0
    const absl::optional<absl::string_view> shadow_cluster_name =
1039
0
        getShadowCluster(shadow_policy, *downstream_headers_);
1040
1041
    // The cluster name got from headers is empty.
1042
0
    if (!shadow_cluster_name.has_value()) {
1043
0
      continue;
1044
0
    }
1045
1046
0
    Http::RequestMessagePtr request(new Http::RequestMessageImpl(
1047
0
        Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_)));
1048
0
    if (callbacks_->decodingBuffer()) {
1049
0
      request->body().add(*callbacks_->decodingBuffer());
1050
0
    }
1051
0
    if (shadow_trailers_) {
1052
0
      request->trailers(Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_));
1053
0
    }
1054
1055
0
    auto options = Http::AsyncClient::RequestOptions()
1056
0
                       .setTimeout(timeout_.global_timeout_)
1057
0
                       .setParentSpan(callbacks_->activeSpan())
1058
0
                       .setChildSpanName("mirror")
1059
0
                       .setSampled(shadow_policy.traceSampled())
1060
0
                       .setIsShadow(true)
1061
0
                       .setIsShadowSuffixDisabled(shadow_policy.disableShadowHostSuffixAppend());
1062
0
    options.setFilterConfig(config_);
1063
0
    config_->shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request),
1064
0
                                   options);
1065
0
  }
1066
1.62k
}
1067
1068
1.62k
void Filter::onRequestComplete() {
1069
  // This should be called exactly once, when the downstream request has been received in full.
1070
1.62k
  ASSERT(!downstream_end_stream_);
1071
1.62k
  downstream_end_stream_ = true;
1072
1.62k
  Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1073
1.62k
  downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime();
1074
1075
  // Possible that we got an immediate reset.
1076
1.62k
  if (!upstream_requests_.empty()) {
1077
    // Even if we got an immediate reset, we could still shadow, but that is a riskier change and
1078
    // seems unnecessary right now.
1079
1.62k
    if (!streaming_shadows_) {
1080
1.62k
      maybeDoShadowing();
1081
1.62k
    }
1082
1083
1.62k
    if (timeout_.global_timeout_.count() > 0) {
1084
916
      response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); });
1085
916
      response_timeout_->enableTimer(timeout_.global_timeout_);
1086
916
    }
1087
1088
1.62k
    for (auto& upstream_request : upstream_requests_) {
1089
1.62k
      if (upstream_request->createPerTryTimeoutOnRequestComplete()) {
1090
1.15k
        upstream_request->setupPerTryTimeout();
1091
1.15k
      }
1092
1.62k
    }
1093
1.62k
  }
1094
1.62k
}
1095
1096
3.06k
void Filter::onDestroy() {
1097
  // Reset any in-flight upstream requests.
1098
3.06k
  resetAll();
1099
1100
  // Unregister from shadow stream notifications and cancel active streams.
1101
3.06k
  for (auto* shadow_stream : shadow_streams_) {
1102
0
    shadow_stream->removeDestructorCallback();
1103
0
    shadow_stream->removeWatermarkCallbacks();
1104
0
    shadow_stream->cancel();
1105
0
  }
1106
1107
3.06k
  cleanup();
1108
3.06k
}
1109
1110
0
void Filter::onResponseTimeout() {
1111
0
  ENVOY_STREAM_LOG(debug, "upstream timeout", *callbacks_);
1112
1113
  // Reset any upstream requests that are still in flight.
1114
0
  while (!upstream_requests_.empty()) {
1115
0
    UpstreamRequestPtr upstream_request =
1116
0
        upstream_requests_.back()->removeFromList(upstream_requests_);
1117
1118
    // We want to record the upstream timeouts and increase the stats counters in all the cases.
1119
    // For example, we also want to record the stats in the case of BiDi streaming APIs where we
1120
    // might have already seen the headers.
1121
0
    cluster_->trafficStats()->upstream_rq_timeout_.inc();
1122
0
    if (request_vcluster_) {
1123
0
      request_vcluster_->stats().upstream_rq_timeout_.inc();
1124
0
    }
1125
0
    if (route_stats_context_.has_value()) {
1126
0
      route_stats_context_->stats().upstream_rq_timeout_.inc();
1127
0
    }
1128
1129
0
    if (upstream_request->upstreamHost()) {
1130
0
      upstream_request->upstreamHost()->stats().rq_timeout_.inc();
1131
0
    }
1132
1133
0
    if (upstream_request->awaitingHeaders()) {
1134
0
      if (cluster_->timeoutBudgetStats().has_value()) {
1135
        // Cancel firing per-try timeout information, because the per-try timeout did not come into
1136
        // play when the global timeout was hit.
1137
0
        upstream_request->recordTimeoutBudget(false);
1138
0
      }
1139
1140
      // If this upstream request already hit a "soft" timeout, then it
1141
      // already recorded a timeout into outlier detection. Don't do it again.
1142
0
      if (!upstream_request->outlierDetectionTimeoutRecorded()) {
1143
0
        updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, *upstream_request,
1144
0
                               absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
1145
0
      }
1146
1147
0
      chargeUpstreamAbort(timeout_response_code_, false, *upstream_request);
1148
0
    }
1149
0
    upstream_request->resetStream();
1150
0
  }
1151
1152
0
  onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag::UpstreamRequestTimeout,
1153
0
                         StreamInfo::ResponseCodeDetails::get().ResponseTimeout);
1154
0
}
1155
1156
// Called when the per try timeout is hit but we didn't reset the request
1157
// (hedge_on_per_try_timeout enabled).
1158
0
void Filter::onSoftPerTryTimeout(UpstreamRequest& upstream_request) {
1159
0
  ASSERT(!upstream_request.retried());
1160
  // Track this as a timeout for outlier detection purposes even though we didn't
1161
  // cancel the request yet and might get a 2xx later.
1162
0
  updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request,
1163
0
                         absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
1164
0
  upstream_request.outlierDetectionTimeoutRecorded(true);
1165
1166
0
  if (!downstream_response_started_ && retry_state_) {
1167
0
    RetryStatus retry_status = retry_state_->shouldHedgeRetryPerTryTimeout(
1168
0
        [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_]() -> void {
1169
          // Without any knowledge about what's going on in the connection pool, retry the request
1170
          // with the safest settings which is no early data but keep using or not using alt-svc as
1171
          // before. In this way, QUIC won't be falsely marked as broken.
1172
0
          doRetry(/*can_send_early_data*/ false, can_use_http3, TimeoutRetry::Yes);
1173
0
        });
1174
1175
0
    if (retry_status == RetryStatus::Yes) {
1176
0
      runRetryOptionsPredicates(upstream_request);
1177
0
      pending_retries_++;
1178
1179
      // Don't increment upstream_host->stats().rq_error_ here, we'll do that
1180
      // later if 1) we hit global timeout or 2) we get bad response headers
1181
      // back.
1182
0
      upstream_request.retried(true);
1183
1184
      // TODO: cluster stat for hedge attempted.
1185
0
    } else if (retry_status == RetryStatus::NoOverflow) {
1186
0
      callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
1187
0
    } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
1188
0
      callbacks_->streamInfo().setResponseFlag(
1189
0
          StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
1190
0
    }
1191
0
  }
1192
0
}
1193
1194
0
void Filter::onPerTryIdleTimeout(UpstreamRequest& upstream_request) {
1195
0
  onPerTryTimeoutCommon(upstream_request,
1196
0
                        cluster_->trafficStats()->upstream_rq_per_try_idle_timeout_,
1197
0
                        StreamInfo::ResponseCodeDetails::get().UpstreamPerTryIdleTimeout);
1198
0
}
1199
1200
0
void Filter::onPerTryTimeout(UpstreamRequest& upstream_request) {
1201
0
  onPerTryTimeoutCommon(upstream_request, cluster_->trafficStats()->upstream_rq_per_try_timeout_,
1202
0
                        StreamInfo::ResponseCodeDetails::get().UpstreamPerTryTimeout);
1203
0
}
1204
1205
void Filter::onPerTryTimeoutCommon(UpstreamRequest& upstream_request, Stats::Counter& error_counter,
1206
0
                                   const std::string& response_code_details) {
1207
0
  if (hedging_params_.hedge_on_per_try_timeout_) {
1208
0
    onSoftPerTryTimeout(upstream_request);
1209
0
    return;
1210
0
  }
1211
1212
0
  error_counter.inc();
1213
0
  if (upstream_request.upstreamHost()) {
1214
0
    upstream_request.upstreamHost()->stats().rq_timeout_.inc();
1215
0
  }
1216
1217
0
  upstream_request.resetStream();
1218
1219
0
  updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request,
1220
0
                         absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
1221
1222
0
  if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::Yes)) {
1223
0
    return;
1224
0
  }
1225
1226
0
  chargeUpstreamAbort(timeout_response_code_, false, upstream_request);
1227
1228
  // Remove this upstream request from the list now that we're done with it.
1229
0
  upstream_request.removeFromList(upstream_requests_);
1230
0
  onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag::UpstreamRequestTimeout,
1231
0
                         response_code_details);
1232
0
}
1233
1234
0
void Filter::onStreamMaxDurationReached(UpstreamRequest& upstream_request) {
1235
0
  upstream_request.resetStream();
1236
1237
0
  if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::No)) {
1238
0
    return;
1239
0
  }
1240
1241
0
  upstream_request.removeFromList(upstream_requests_);
1242
0
  cleanup();
1243
1244
0
  callbacks_->streamInfo().setResponseFlag(
1245
0
      StreamInfo::CoreResponseFlag::UpstreamMaxStreamDurationReached);
1246
  // Grab the const ref to call the const method of StreamInfo.
1247
0
  const auto& stream_info = callbacks_->streamInfo();
1248
0
  const bool downstream_decode_complete =
1249
0
      stream_info.downstreamTiming().has_value() &&
1250
0
      stream_info.downstreamTiming().value().get().lastDownstreamRxByteReceived().has_value();
1251
1252
  // sendLocalReply may instead reset the stream if downstream_response_started_ is true.
1253
0
  callbacks_->sendLocalReply(
1254
0
      Http::Utility::maybeRequestTimeoutCode(downstream_decode_complete),
1255
0
      "upstream max stream duration reached", modify_headers_, absl::nullopt,
1256
0
      StreamInfo::ResponseCodeDetails::get().UpstreamMaxStreamDurationReached);
1257
0
}
1258
1259
void Filter::updateOutlierDetection(Upstream::Outlier::Result result,
1260
                                    UpstreamRequest& upstream_request,
1261
121
                                    absl::optional<uint64_t> code) {
1262
121
  if (upstream_request.upstreamHost()) {
1263
121
    upstream_request.upstreamHost()->outlierDetector().putResult(result, code);
1264
121
  }
1265
121
}
1266
1267
121
void Filter::chargeUpstreamAbort(Http::Code code, bool dropped, UpstreamRequest& upstream_request) {
1268
121
  if (downstream_response_started_) {
1269
14
    if (upstream_request.grpcRqSuccessDeferred()) {
1270
14
      upstream_request.upstreamHost()->stats().rq_error_.inc();
1271
14
      stats_.rq_reset_after_downstream_response_started_.inc();
1272
14
    }
1273
107
  } else {
1274
107
    Upstream::HostDescriptionConstSharedPtr upstream_host = upstream_request.upstreamHost();
1275
1276
107
    chargeUpstreamCode(code, upstream_host, dropped);
1277
    // If we had non-5xx but still have been reset by backend or timeout before
1278
    // starting response, we treat this as an error. We only get non-5xx when
1279
    // timeout_response_code_ is used for code above, where this member can
1280
    // assume values such as 204 (NoContent).
1281
107
    if (upstream_host != nullptr && !Http::CodeUtility::is5xx(enumToInt(code))) {
1282
0
      upstream_host->stats().rq_error_.inc();
1283
0
    }
1284
107
  }
1285
121
}
1286
1287
void Filter::onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag response_flags,
1288
0
                                    absl::string_view details) {
1289
0
  Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats();
1290
0
  if (tb_stats.has_value()) {
1291
0
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1292
0
    std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
1293
0
        dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
1294
1295
0
    tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue(
1296
0
        FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_));
1297
0
  }
1298
1299
0
  const absl::string_view body =
1300
0
      timeout_response_code_ == Http::Code::GatewayTimeout ? "upstream request timeout" : "";
1301
0
  onUpstreamAbort(timeout_response_code_, response_flags, body, false, details);
1302
0
}
1303
1304
void Filter::onUpstreamAbort(Http::Code code, StreamInfo::CoreResponseFlag response_flags,
1305
121
                             absl::string_view body, bool dropped, absl::string_view details) {
1306
  // If we have not yet sent anything downstream, send a response with an appropriate status code.
1307
  // Otherwise just reset the ongoing response.
1308
121
  callbacks_->streamInfo().setResponseFlag(response_flags);
1309
  // This will destroy any created retry timers.
1310
121
  cleanup();
1311
  // sendLocalReply may instead reset the stream if downstream_response_started_ is true.
1312
121
  callbacks_->sendLocalReply(
1313
121
      code, body,
1314
121
      [dropped, this](Http::ResponseHeaderMap& headers) {
1315
107
        if (dropped && !config_->suppress_envoy_headers_) {
1316
0
          headers.addReference(Http::Headers::get().EnvoyOverloaded,
1317
0
                               Http::Headers::get().EnvoyOverloadedValues.True);
1318
0
        }
1319
107
        modify_headers_(headers);
1320
107
      },
1321
121
      absl::nullopt, details);
1322
121
}
1323
1324
bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason,
1325
121
                             UpstreamRequest& upstream_request, TimeoutRetry is_timeout_retry) {
1326
  // We don't retry if we already started the response, don't have a retry policy defined,
1327
  // or if we've already retried this upstream request (currently only possible if a per
1328
  // try timeout occurred and hedge_on_per_try_timeout is enabled).
1329
121
  if (downstream_response_started_ || !retry_state_ || upstream_request.retried()) {
1330
121
    return false;
1331
121
  }
1332
0
  RetryState::Http3Used was_using_http3 = RetryState::Http3Used::Unknown;
1333
0
  if (upstream_request.hadUpstream()) {
1334
0
    was_using_http3 = (upstream_request.streamInfo().protocol().has_value() &&
1335
0
                       upstream_request.streamInfo().protocol().value() == Http::Protocol::Http3)
1336
0
                          ? RetryState::Http3Used::Yes
1337
0
                          : RetryState::Http3Used::No;
1338
0
  }
1339
1340
  // If the current request in this router has sent data to the upstream, we consider the request
1341
  // started.
1342
0
  upstream_request_started_ |= upstream_request.streamInfo()
1343
0
                                   .upstreamInfo()
1344
0
                                   ->upstreamTiming()
1345
0
                                   .first_upstream_tx_byte_sent_.has_value();
1346
1347
0
  const RetryStatus retry_status = retry_state_->shouldRetryReset(
1348
0
      reset_reason, was_using_http3,
1349
0
      [this, can_send_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_,
1350
0
       can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
1351
0
       is_timeout_retry](bool disable_http3) -> void {
1352
        // This retry might be because of ConnectionFailure of 0-RTT handshake. In this case, though
1353
        // the original request is retried with the same can_send_early_data setting, it will not be
1354
        // sent as early data by the underlying connection pool grid.
1355
0
        doRetry(can_send_early_data, disable_http3 ? false : can_use_http3, is_timeout_retry);
1356
0
      },
1357
0
      upstream_request_started_);
1358
0
  if (retry_status == RetryStatus::Yes) {
1359
0
    runRetryOptionsPredicates(upstream_request);
1360
0
    pending_retries_++;
1361
1362
0
    if (upstream_request.upstreamHost()) {
1363
0
      upstream_request.upstreamHost()->stats().rq_error_.inc();
1364
0
    }
1365
1366
0
    auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1367
0
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1368
0
    return true;
1369
0
  } else if (retry_status == RetryStatus::NoOverflow) {
1370
0
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
1371
0
  } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
1372
0
    callbacks_->streamInfo().setResponseFlag(
1373
0
        StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
1374
0
  }
1375
1376
0
  return false;
1377
0
}
1378
1379
void Filter::onUpstreamReset(Http::StreamResetReason reset_reason,
1380
                             absl::string_view transport_failure_reason,
1381
121
                             UpstreamRequest& upstream_request) {
1382
121
  ENVOY_STREAM_LOG(debug, "upstream reset: reset reason: {}, transport failure reason: {}",
1383
121
                   *callbacks_, Http::Utility::resetReasonToString(reset_reason),
1384
121
                   transport_failure_reason);
1385
1386
121
  const bool dropped = reset_reason == Http::StreamResetReason::Overflow;
1387
1388
  // Ignore upstream reset caused by a resource overflow.
1389
  // Currently, circuit breakers can only produce this reset reason.
1390
  // It means that this reason is cluster-wise, not upstream-related.
1391
  // Therefore removing an upstream in the case of an overloaded cluster
1392
  // would make the situation even worse.
1393
  // https://github.com/envoyproxy/envoy/issues/25487
1394
121
  if (!dropped) {
1395
    // TODO: The reset may also come from upstream over the wire. In this case it should be
1396
    // treated as external origin error and distinguished from local origin error.
1397
    // This matters only when running OutlierDetection with split_external_local_origin_errors
1398
    // config param set to true.
1399
121
    updateOutlierDetection(Upstream::Outlier::Result::LocalOriginConnectFailed, upstream_request,
1400
121
                           absl::nullopt);
1401
121
  }
1402
1403
121
  if (maybeRetryReset(reset_reason, upstream_request, TimeoutRetry::No)) {
1404
0
    return;
1405
0
  }
1406
1407
121
  const Http::Code error_code = (reset_reason == Http::StreamResetReason::ProtocolError)
1408
121
                                    ? Http::Code::BadGateway
1409
121
                                    : Http::Code::ServiceUnavailable;
1410
121
  chargeUpstreamAbort(error_code, dropped, upstream_request);
1411
121
  auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1412
121
  callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1413
1414
  // If there are other in-flight requests that might see an upstream response,
1415
  // don't return anything downstream.
1416
121
  if (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0) {
1417
0
    return;
1418
0
  }
1419
1420
121
  const StreamInfo::CoreResponseFlag response_flags = streamResetReasonToResponseFlag(reset_reason);
1421
1422
121
  const std::string body =
1423
121
      absl::StrCat("upstream connect error or disconnect/reset before headers. ",
1424
121
                   (is_retry_ ? "retried and the latest " : ""),
1425
121
                   "reset reason: ", Http::Utility::resetReasonToString(reset_reason),
1426
121
                   !transport_failure_reason.empty() ? ", transport failure reason: " : "",
1427
121
                   transport_failure_reason);
1428
121
  const std::string& basic_details =
1429
121
      downstream_response_started_ ? StreamInfo::ResponseCodeDetails::get().LateUpstreamReset
1430
121
                                   : StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset;
1431
121
  const std::string details = StringUtil::replaceAllEmptySpace(absl::StrCat(
1432
121
      basic_details, "{", Http::Utility::resetReasonToString(reset_reason),
1433
121
      transport_failure_reason.empty() ? "" : absl::StrCat("|", transport_failure_reason), "}"));
1434
121
  onUpstreamAbort(error_code, response_flags, body, dropped, details);
1435
121
}
1436
1437
void Filter::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
1438
1.77k
                                    bool pool_success) {
1439
1.77k
  if (retry_state_ && host) {
1440
5
    retry_state_->onHostAttempted(host);
1441
5
  }
1442
1443
1.77k
  if (!pool_success) {
1444
0
    return;
1445
0
  }
1446
1447
1.77k
  if (request_vcluster_) {
1448
    // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady
1449
    // callback. Hence, the upstream request increases the virtual cluster's upstream_rq_total_ stat
1450
    // here.
1451
5
    request_vcluster_->stats().upstream_rq_total_.inc();
1452
5
  }
1453
1.77k
  if (route_stats_context_.has_value()) {
1454
    // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady
1455
    // callback. Hence, the upstream request increases the route level upstream_rq_total_ stat
1456
    // here.
1457
0
    route_stats_context_->stats().upstream_rq_total_.inc();
1458
0
  }
1459
1.77k
}
1460
1461
StreamInfo::CoreResponseFlag
1462
242
Filter::streamResetReasonToResponseFlag(Http::StreamResetReason reset_reason) {
1463
242
  switch (reset_reason) {
1464
0
  case Http::StreamResetReason::LocalConnectionFailure:
1465
0
  case Http::StreamResetReason::RemoteConnectionFailure:
1466
0
  case Http::StreamResetReason::ConnectionTimeout:
1467
0
    return StreamInfo::CoreResponseFlag::UpstreamConnectionFailure;
1468
40
  case Http::StreamResetReason::ConnectionTermination:
1469
40
    return StreamInfo::CoreResponseFlag::UpstreamConnectionTermination;
1470
0
  case Http::StreamResetReason::LocalReset:
1471
0
  case Http::StreamResetReason::LocalRefusedStreamReset:
1472
0
  case Http::StreamResetReason::Http1PrematureUpstreamHalfClose:
1473
0
    return StreamInfo::CoreResponseFlag::LocalReset;
1474
0
  case Http::StreamResetReason::Overflow:
1475
0
    return StreamInfo::CoreResponseFlag::UpstreamOverflow;
1476
0
  case Http::StreamResetReason::RemoteReset:
1477
54
  case Http::StreamResetReason::RemoteRefusedStreamReset:
1478
54
  case Http::StreamResetReason::ConnectError:
1479
54
    return StreamInfo::CoreResponseFlag::UpstreamRemoteReset;
1480
148
  case Http::StreamResetReason::ProtocolError:
1481
148
    return StreamInfo::CoreResponseFlag::UpstreamProtocolError;
1482
0
  case Http::StreamResetReason::OverloadManager:
1483
0
    return StreamInfo::CoreResponseFlag::OverloadManager;
1484
242
  }
1485
1486
0
  PANIC_DUE_TO_CORRUPT_ENUM;
1487
0
}
1488
1489
void Filter::handleNon5xxResponseHeaders(absl::optional<Grpc::Status::GrpcStatus> grpc_status,
1490
                                         UpstreamRequest& upstream_request, bool end_stream,
1491
1.63k
                                         uint64_t grpc_to_http_status) {
1492
  // We need to defer gRPC success until after we have processed grpc-status in
1493
  // the trailers.
1494
1.63k
  if (grpc_request_) {
1495
829
    if (end_stream) {
1496
100
      if (grpc_status && !Http::CodeUtility::is5xx(grpc_to_http_status)) {
1497
100
        upstream_request.upstreamHost()->stats().rq_success_.inc();
1498
100
      } else {
1499
0
        upstream_request.upstreamHost()->stats().rq_error_.inc();
1500
0
      }
1501
729
    } else {
1502
729
      upstream_request.grpcRqSuccessDeferred(true);
1503
729
    }
1504
829
  } else {
1505
805
    upstream_request.upstreamHost()->stats().rq_success_.inc();
1506
805
  }
1507
1.63k
}
1508
1509
void Filter::onUpstream1xxHeaders(Http::ResponseHeaderMapPtr&& headers,
1510
1
                                  UpstreamRequest& upstream_request) {
1511
1
  const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
1512
1
  chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false);
1513
1
  ENVOY_STREAM_LOG(debug, "upstream 1xx ({}).", *callbacks_, response_code);
1514
1515
1
  downstream_response_started_ = true;
1516
1
  final_upstream_request_ = &upstream_request;
1517
1
  resetOtherUpstreams(upstream_request);
1518
1519
  // Don't send retries after 100-Continue has been sent on. Arguably we could attempt to do a
1520
  // retry, assume the next upstream would also send an 100-Continue and swallow the second one
1521
  // but it's sketchy (as the subsequent upstream might not send a 100-Continue) and not worth
1522
  // the complexity until someone asks for it.
1523
1
  retry_state_.reset();
1524
1525
1
  callbacks_->encode1xxHeaders(std::move(headers));
1526
1
}
1527
1528
3.06k
void Filter::resetAll() {
1529
3.85k
  while (!upstream_requests_.empty()) {
1530
793
    auto request_ptr = upstream_requests_.back()->removeFromList(upstream_requests_);
1531
793
    request_ptr->resetStream();
1532
793
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1533
793
  }
1534
3.06k
}
1535
1536
1.63k
void Filter::resetOtherUpstreams(UpstreamRequest& upstream_request) {
1537
  // Pop each upstream request on the list and reset it if it's not the one
1538
  // provided. At the end we'll move it back into the list.
1539
1.63k
  UpstreamRequestPtr final_upstream_request;
1540
3.27k
  while (!upstream_requests_.empty()) {
1541
1.63k
    UpstreamRequestPtr upstream_request_tmp =
1542
1.63k
        upstream_requests_.back()->removeFromList(upstream_requests_);
1543
1.63k
    if (upstream_request_tmp.get() != &upstream_request) {
1544
0
      upstream_request_tmp->resetStream();
1545
      // TODO: per-host stat for hedge abandoned.
1546
      // TODO: cluster stat for hedge abandoned.
1547
1.63k
    } else {
1548
1.63k
      final_upstream_request = std::move(upstream_request_tmp);
1549
1.63k
    }
1550
1.63k
  }
1551
1552
1.63k
  ASSERT(final_upstream_request);
1553
  // Now put the final request back on this list.
1554
1.63k
  LinkedList::moveIntoList(std::move(final_upstream_request), upstream_requests_);
1555
1.63k
}
1556
1557
void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
1558
1.63k
                               UpstreamRequest& upstream_request, bool end_stream) {
1559
1.63k
  ENVOY_STREAM_LOG(debug, "upstream headers complete: end_stream={}", *callbacks_, end_stream);
1560
1561
1.63k
  modify_headers_(*headers);
1562
  // When grpc-status appears in response headers, convert grpc-status to HTTP status code
1563
  // for outlier detection. This does not currently change any stats or logging and does not
1564
  // handle the case when an error grpc-status is sent as a trailer.
1565
1.63k
  absl::optional<Grpc::Status::GrpcStatus> grpc_status;
1566
1.63k
  uint64_t grpc_to_http_status = 0;
1567
1.63k
  if (grpc_request_) {
1568
829
    grpc_status = Grpc::Common::getGrpcStatus(*headers);
1569
829
    if (grpc_status.has_value()) {
1570
100
      grpc_to_http_status = Grpc::Utility::grpcToHttpStatus(grpc_status.value());
1571
100
    }
1572
829
  }
1573
1574
1.63k
  maybeProcessOrcaLoadReport(*headers, upstream_request);
1575
1576
1.63k
  if (grpc_status.has_value()) {
1577
100
    upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(grpc_to_http_status);
1578
1.53k
  } else {
1579
1.53k
    upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(response_code);
1580
1.53k
  }
1581
1582
1.63k
  if (headers->EnvoyImmediateHealthCheckFail() != nullptr) {
1583
0
    upstream_request.upstreamHost()->healthChecker().setUnhealthy(
1584
0
        Upstream::HealthCheckHostMonitor::UnhealthyType::ImmediateHealthCheckFail);
1585
0
  }
1586
1587
1.63k
  bool could_not_retry = false;
1588
1589
  // Check if this upstream request was already retried, for instance after
1590
  // hitting a per try timeout. Don't retry it if we already have.
1591
1.63k
  if (retry_state_) {
1592
0
    if (upstream_request.retried()) {
1593
      // We already retried this request (presumably for a per try timeout) so
1594
      // we definitely won't retry it again. Check if we would have retried it
1595
      // if we could.
1596
0
      bool retry_as_early_data; // Not going to be used as we are not retrying.
1597
0
      could_not_retry = retry_state_->wouldRetryFromHeaders(*headers, *downstream_headers_,
1598
0
                                                            retry_as_early_data) !=
1599
0
                        RetryState::RetryDecision::NoRetry;
1600
0
    } else {
1601
0
      const RetryStatus retry_status = retry_state_->shouldRetryHeaders(
1602
0
          *headers, *downstream_headers_,
1603
0
          [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
1604
0
           had_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_](
1605
0
              bool disable_early_data) -> void {
1606
0
            doRetry((disable_early_data ? false : had_early_data), can_use_http3, TimeoutRetry::No);
1607
0
          });
1608
0
      if (retry_status == RetryStatus::Yes) {
1609
0
        runRetryOptionsPredicates(upstream_request);
1610
0
        pending_retries_++;
1611
0
        upstream_request.upstreamHost()->stats().rq_error_.inc();
1612
0
        Http::CodeStats& code_stats = httpContext().codeStats();
1613
0
        code_stats.chargeBasicResponseStat(cluster_->statsScope(), stats_.stat_names_.retry_,
1614
0
                                           static_cast<Http::Code>(response_code),
1615
0
                                           exclude_http_code_stats_);
1616
1617
0
        if (!end_stream || !upstream_request.encodeComplete()) {
1618
0
          upstream_request.resetStream();
1619
0
        }
1620
0
        auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1621
0
        callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1622
0
        return;
1623
0
      } else if (retry_status == RetryStatus::NoOverflow) {
1624
0
        callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
1625
0
        could_not_retry = true;
1626
0
      } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
1627
0
        callbacks_->streamInfo().setResponseFlag(
1628
0
            StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
1629
0
        could_not_retry = true;
1630
0
      }
1631
0
    }
1632
0
  }
1633
1634
1.63k
  if (route_entry_->internalRedirectPolicy().enabled() &&
1635
1.63k
      route_entry_->internalRedirectPolicy().shouldRedirectForResponseCode(
1636
0
          static_cast<Http::Code>(response_code)) &&
1637
1.63k
      setupRedirect(*headers)) {
1638
0
    return;
1639
    // If the redirect could not be handled, fail open and let it pass to the
1640
    // next downstream.
1641
0
  }
1642
1643
  // Check if we got a "bad" response, but there are still upstream requests in
1644
  // flight awaiting headers or scheduled retries. If so, exit to give them a
1645
  // chance to return before returning a response downstream.
1646
1.63k
  if (could_not_retry && (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0)) {
1647
0
    upstream_request.upstreamHost()->stats().rq_error_.inc();
1648
1649
    // Reset the stream because there are other in-flight requests that we'll
1650
    // wait around for and we're not interested in consuming any body/trailers.
1651
0
    auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1652
0
    request_ptr->resetStream();
1653
0
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1654
0
    return;
1655
0
  }
1656
1657
  // Make sure any retry timers are destroyed since we may not call cleanup() if end_stream is
1658
  // false.
1659
1.63k
  if (retry_state_) {
1660
0
    retry_state_.reset();
1661
0
  }
1662
1663
  // Only send upstream service time if we received the complete request and this is not a
1664
  // premature response.
1665
1.63k
  if (DateUtil::timePointValid(downstream_request_complete_time_)) {
1666
803
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1667
803
    MonotonicTime response_received_time = dispatcher.timeSource().monotonicTime();
1668
803
    std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1669
803
        response_received_time - downstream_request_complete_time_);
1670
803
    if (!config_->suppress_envoy_headers_) {
1671
803
      headers->setEnvoyUpstreamServiceTime(ms.count());
1672
803
    }
1673
803
  }
1674
1675
1.63k
  upstream_request.upstreamCanary(
1676
1.63k
      (headers->EnvoyUpstreamCanary() && headers->EnvoyUpstreamCanary()->value() == "true") ||
1677
1.63k
      upstream_request.upstreamHost()->canary());
1678
1.63k
  chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false);
1679
1.63k
  if (!Http::CodeUtility::is5xx(response_code)) {
1680
1.63k
    handleNon5xxResponseHeaders(grpc_status, upstream_request, end_stream, grpc_to_http_status);
1681
1.63k
  }
1682
1683
  // Append routing cookies
1684
1.63k
  for (const auto& header_value : downstream_set_cookies_) {
1685
0
    headers->addReferenceKey(Http::Headers::get().SetCookie, header_value);
1686
0
  }
1687
1688
1.63k
  callbacks_->streamInfo().setResponseCodeDetails(
1689
1.63k
      StreamInfo::ResponseCodeDetails::get().ViaUpstream);
1690
1691
1.63k
  callbacks_->streamInfo().setResponseCode(response_code);
1692
1693
  // TODO(zuercher): If access to response_headers_to_add (at any level) is ever needed outside
1694
  // Router::Filter we'll need to find a better location for this work. One possibility is to
1695
  // provide finalizeResponseHeaders functions on the Router::Config and VirtualHost interfaces.
1696
1.63k
  route_entry_->finalizeResponseHeaders(*headers, callbacks_->streamInfo());
1697
1698
1.63k
  downstream_response_started_ = true;
1699
1.63k
  final_upstream_request_ = &upstream_request;
1700
  // Make sure that for request hedging, we end up with the correct final upstream info.
1701
1.63k
  callbacks_->streamInfo().setUpstreamInfo(final_upstream_request_->streamInfo().upstreamInfo());
1702
1.63k
  resetOtherUpstreams(upstream_request);
1703
1.63k
  if (end_stream) {
1704
117
    onUpstreamComplete(upstream_request);
1705
117
  }
1706
1707
1.63k
  callbacks_->encodeHeaders(std::move(headers), end_stream,
1708
1.63k
                            StreamInfo::ResponseCodeDetails::get().ViaUpstream);
1709
1.63k
}
1710
1711
void Filter::onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request,
1712
3.11k
                            bool end_stream) {
1713
  // This should be true because when we saw headers we either reset the stream
1714
  // (hence wouldn't have made it to onUpstreamData) or all other in-flight
1715
  // streams.
1716
3.11k
  ASSERT(upstream_requests_.size() == 1);
1717
3.11k
  if (end_stream) {
1718
    // gRPC request termination without trailers is an error.
1719
784
    if (upstream_request.grpcRqSuccessDeferred()) {
1720
0
      upstream_request.upstreamHost()->stats().rq_error_.inc();
1721
0
    }
1722
784
    onUpstreamComplete(upstream_request);
1723
784
  }
1724
1725
3.11k
  callbacks_->encodeData(data, end_stream);
1726
3.11k
}
1727
1728
void Filter::onUpstreamTrailers(Http::ResponseTrailerMapPtr&& trailers,
1729
1
                                UpstreamRequest& upstream_request) {
1730
  // This should be true because when we saw headers we either reset the stream
1731
  // (hence wouldn't have made it to onUpstreamTrailers) or all other in-flight
1732
  // streams.
1733
1
  ASSERT(upstream_requests_.size() == 1);
1734
1735
1
  if (upstream_request.grpcRqSuccessDeferred()) {
1736
1
    absl::optional<Grpc::Status::GrpcStatus> grpc_status = Grpc::Common::getGrpcStatus(*trailers);
1737
1
    if (grpc_status &&
1738
1
        !Http::CodeUtility::is5xx(Grpc::Utility::grpcToHttpStatus(grpc_status.value()))) {
1739
1
      upstream_request.upstreamHost()->stats().rq_success_.inc();
1740
1
    } else {
1741
0
      upstream_request.upstreamHost()->stats().rq_error_.inc();
1742
0
    }
1743
1
  }
1744
1745
1
  maybeProcessOrcaLoadReport(*trailers, upstream_request);
1746
1747
1
  onUpstreamComplete(upstream_request);
1748
1749
1
  callbacks_->encodeTrailers(std::move(trailers));
1750
1
}
1751
1752
293
void Filter::onUpstreamMetadata(Http::MetadataMapPtr&& metadata_map) {
1753
293
  callbacks_->encodeMetadata(std::move(metadata_map));
1754
293
}
1755
1756
902
void Filter::onUpstreamComplete(UpstreamRequest& upstream_request) {
1757
902
  if (!downstream_end_stream_) {
1758
103
    if (allow_multiplexed_upstream_half_close_) {
1759
      // Continue request if downstream is not done yet.
1760
0
      return;
1761
0
    }
1762
103
    upstream_request.resetStream();
1763
103
  }
1764
902
  Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1765
902
  std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
1766
902
      dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
1767
1768
902
  Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats();
1769
902
  if (tb_stats.has_value()) {
1770
0
    tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue(
1771
0
        FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_));
1772
0
  }
1773
1774
902
  if (config_->emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck() &&
1775
902
      DateUtil::timePointValid(downstream_request_complete_time_)) {
1776
799
    upstream_request.upstreamHost()->outlierDetector().putResponseTime(response_time);
1777
799
    const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_);
1778
1779
799
    Http::CodeStats& code_stats = httpContext().codeStats();
1780
799
    Http::CodeStats::ResponseTimingInfo info{
1781
799
        config_->scope_,
1782
799
        cluster_->statsScope(),
1783
799
        config_->empty_stat_name_,
1784
799
        response_time,
1785
799
        upstream_request.upstreamCanary(),
1786
799
        internal_request,
1787
799
        route_->virtualHost().statName(),
1788
799
        request_vcluster_ ? request_vcluster_->statName() : config_->empty_stat_name_,
1789
799
        route_stats_context_.has_value() ? route_stats_context_->statName()
1790
799
                                         : config_->empty_stat_name_,
1791
799
        config_->zone_name_,
1792
799
        upstreamZone(upstream_request.upstreamHost())};
1793
1794
799
    code_stats.chargeResponseTiming(info);
1795
1796
799
    if (alt_stat_prefix_ != nullptr) {
1797
0
      Http::CodeStats::ResponseTimingInfo info{config_->scope_,
1798
0
                                               cluster_->statsScope(),
1799
0
                                               alt_stat_prefix_->statName(),
1800
0
                                               response_time,
1801
0
                                               upstream_request.upstreamCanary(),
1802
0
                                               internal_request,
1803
0
                                               config_->empty_stat_name_,
1804
0
                                               config_->empty_stat_name_,
1805
0
                                               config_->empty_stat_name_,
1806
0
                                               config_->zone_name_,
1807
0
                                               upstreamZone(upstream_request.upstreamHost())};
1808
1809
0
      code_stats.chargeResponseTiming(info);
1810
0
    }
1811
799
  }
1812
1813
  // Defer deletion as this is generally called under the stack of the upstream
1814
  // request, and immediate deletion is dangerous.
1815
902
  callbacks_->dispatcher().deferredDelete(upstream_request.removeFromList(upstream_requests_));
1816
902
  cleanup();
1817
902
}
1818
1819
0
bool Filter::setupRedirect(const Http::ResponseHeaderMap& headers) {
1820
0
  ENVOY_STREAM_LOG(debug, "attempting internal redirect", *callbacks_);
1821
0
  const Http::HeaderEntry* location = headers.Location();
1822
1823
0
  const uint64_t status_code = Http::Utility::getResponseStatus(headers);
1824
1825
  // Redirects are not supported for streaming requests yet.
1826
0
  if (downstream_end_stream_ && (!request_buffer_overflowed_ || !callbacks_->decodingBuffer()) &&
1827
0
      location != nullptr &&
1828
0
      convertRequestHeadersForInternalRedirect(*downstream_headers_, headers, *location,
1829
0
                                               status_code) &&
1830
0
      callbacks_->recreateStream(&headers)) {
1831
0
    ENVOY_STREAM_LOG(debug, "Internal redirect succeeded", *callbacks_);
1832
0
    cluster_->trafficStats()->upstream_internal_redirect_succeeded_total_.inc();
1833
0
    return true;
1834
0
  }
1835
  // convertRequestHeadersForInternalRedirect logs failure reasons but log
1836
  // details for other failure modes here.
1837
0
  if (!downstream_end_stream_) {
1838
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: request incomplete", *callbacks_);
1839
0
  } else if (request_buffer_overflowed_) {
1840
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: request body overflow", *callbacks_);
1841
0
  } else if (location == nullptr) {
1842
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: missing location header", *callbacks_);
1843
0
  }
1844
1845
0
  cluster_->trafficStats()->upstream_internal_redirect_failed_total_.inc();
1846
0
  return false;
1847
0
}
1848
1849
bool Filter::convertRequestHeadersForInternalRedirect(
1850
    Http::RequestHeaderMap& downstream_headers, const Http::ResponseHeaderMap& upstream_headers,
1851
0
    const Http::HeaderEntry& internal_redirect, uint64_t status_code) {
1852
0
  if (!downstream_headers.Path()) {
1853
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: no path in downstream_headers", *callbacks_);
1854
0
    return false;
1855
0
  }
1856
1857
0
  absl::string_view redirect_url = internal_redirect.value().getStringView();
1858
  // Make sure the redirect response contains a URL to redirect to.
1859
0
  if (redirect_url.empty()) {
1860
0
    stats_.passthrough_internal_redirect_bad_location_.inc();
1861
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: empty location", *callbacks_);
1862
0
    return false;
1863
0
  }
1864
0
  Http::Utility::Url absolute_url;
1865
0
  if (!absolute_url.initialize(redirect_url, false)) {
1866
0
    stats_.passthrough_internal_redirect_bad_location_.inc();
1867
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: invalid location {}", *callbacks_,
1868
0
                     redirect_url);
1869
0
    return false;
1870
0
  }
1871
1872
0
  const auto& policy = route_entry_->internalRedirectPolicy();
1873
  // Don't change the scheme from the original request
1874
0
  const bool scheme_is_http = schemeIsHttp(downstream_headers, callbacks_->connection());
1875
0
  const bool target_is_http = Http::Utility::schemeIsHttp(absolute_url.scheme());
1876
0
  if (!policy.isCrossSchemeRedirectAllowed() && scheme_is_http != target_is_http) {
1877
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: incorrect scheme for {}", *callbacks_,
1878
0
                     redirect_url);
1879
0
    stats_.passthrough_internal_redirect_unsafe_scheme_.inc();
1880
0
    return false;
1881
0
  }
1882
1883
0
  const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
1884
  // Make sure that performing the redirect won't result in exceeding the configured number of
1885
  // redirects allowed for this route.
1886
0
  StreamInfo::UInt32Accessor* num_internal_redirect{};
1887
1888
0
  if (num_internal_redirect = filter_state->getDataMutable<StreamInfo::UInt32Accessor>(
1889
0
          NumInternalRedirectsFilterStateName);
1890
0
      num_internal_redirect == nullptr) {
1891
0
    auto state = std::make_shared<StreamInfo::UInt32AccessorImpl>(0);
1892
0
    num_internal_redirect = state.get();
1893
1894
0
    filter_state->setData(NumInternalRedirectsFilterStateName, std::move(state),
1895
0
                          StreamInfo::FilterState::StateType::Mutable,
1896
0
                          StreamInfo::FilterState::LifeSpan::Request);
1897
0
  }
1898
1899
0
  if (num_internal_redirect->value() >= policy.maxInternalRedirects()) {
1900
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: redirect limits exceeded.", *callbacks_);
1901
0
    stats_.passthrough_internal_redirect_too_many_redirects_.inc();
1902
0
    return false;
1903
0
  }
1904
  // Copy the old values, so they can be restored if the redirect fails.
1905
0
  const bool scheme_is_set = (downstream_headers.Scheme() != nullptr);
1906
1907
0
  std::unique_ptr<Http::RequestHeaderMapImpl> saved_headers = Http::RequestHeaderMapImpl::create();
1908
0
  Http::RequestHeaderMapImpl::copyFrom(*saved_headers, downstream_headers);
1909
1910
0
  for (const Http::LowerCaseString& header :
1911
0
       route_entry_->internalRedirectPolicy().responseHeadersToCopy()) {
1912
0
    Http::HeaderMap::GetResult result = upstream_headers.get(header);
1913
0
    Http::HeaderMap::GetResult downstream_result = downstream_headers.get(header);
1914
0
    if (result.empty()) {
1915
      // Clear headers if present, else do nothing:
1916
0
      if (downstream_result.empty()) {
1917
0
        continue;
1918
0
      }
1919
0
      downstream_headers.remove(header);
1920
0
    } else {
1921
      // The header exists in the response, copy into the downstream headers
1922
0
      if (!downstream_result.empty()) {
1923
0
        downstream_headers.remove(header);
1924
0
      }
1925
0
      for (size_t idx = 0; idx < result.size(); idx++) {
1926
0
        downstream_headers.addCopy(header, result[idx]->value().getStringView());
1927
0
      }
1928
0
    }
1929
0
  }
1930
1931
0
  Cleanup restore_original_headers(
1932
0
      [&downstream_headers, scheme_is_set, scheme_is_http, &saved_headers]() {
1933
0
        downstream_headers.clear();
1934
0
        if (scheme_is_set) {
1935
0
          downstream_headers.setScheme(scheme_is_http ? Http::Headers::get().SchemeValues.Http
1936
0
                                                      : Http::Headers::get().SchemeValues.Https);
1937
0
        }
1938
1939
0
        Http::RequestHeaderMapImpl::copyFrom(downstream_headers, *saved_headers);
1940
0
      });
1941
1942
  // Replace the original host, scheme and path.
1943
0
  downstream_headers.setScheme(absolute_url.scheme());
1944
0
  downstream_headers.setHost(absolute_url.hostAndPort());
1945
1946
0
  auto path_and_query = absolute_url.pathAndQueryParams();
1947
0
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http_reject_path_with_fragment")) {
1948
    // Envoy treats internal redirect as a new request and will reject it if URI path
1949
    // contains #fragment. However the Location header is allowed to have #fragment in URI path. To
1950
    // prevent Envoy from rejecting internal redirect, strip the #fragment from Location URI if it
1951
    // is present.
1952
0
    auto fragment_pos = path_and_query.find('#');
1953
0
    path_and_query = path_and_query.substr(0, fragment_pos);
1954
0
  }
1955
0
  downstream_headers.setPath(path_and_query);
1956
1957
  // Only clear the route cache if there are downstream callbacks. There aren't, for example,
1958
  // for async connections.
1959
0
  if (callbacks_->downstreamCallbacks()) {
1960
0
    callbacks_->downstreamCallbacks()->clearRouteCache();
1961
0
  }
1962
0
  const auto route = callbacks_->route();
1963
  // Don't allow a redirect to a non existing route.
1964
0
  if (!route) {
1965
0
    stats_.passthrough_internal_redirect_no_route_.inc();
1966
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: no route found", *callbacks_);
1967
0
    return false;
1968
0
  }
1969
1970
0
  const auto& route_name = route->routeName();
1971
0
  for (const auto& predicate : policy.predicates()) {
1972
0
    if (!predicate->acceptTargetRoute(*filter_state, route_name, !scheme_is_http,
1973
0
                                      !target_is_http)) {
1974
0
      stats_.passthrough_internal_redirect_predicate_.inc();
1975
0
      ENVOY_STREAM_LOG(trace,
1976
0
                       "Internal redirect failed: rejecting redirect targeting {}, by {} predicate",
1977
0
                       *callbacks_, route_name, predicate->name());
1978
0
      return false;
1979
0
    }
1980
0
  }
1981
1982
  // See https://tools.ietf.org/html/rfc7231#section-6.4.4.
1983
0
  if (status_code == enumToInt(Http::Code::SeeOther) &&
1984
0
      downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Get &&
1985
0
      downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Head) {
1986
0
    downstream_headers.setMethod(Http::Headers::get().MethodValues.Get);
1987
0
    downstream_headers.remove(Http::Headers::get().ContentLength);
1988
0
    callbacks_->modifyDecodingBuffer([](Buffer::Instance& data) { data.drain(data.length()); });
1989
0
  }
1990
1991
0
  num_internal_redirect->increment();
1992
0
  restore_original_headers.cancel();
1993
  // Preserve the original request URL for the second pass.
1994
0
  downstream_headers.setEnvoyOriginalUrl(
1995
0
      absl::StrCat(scheme_is_http ? Http::Headers::get().SchemeValues.Http
1996
0
                                  : Http::Headers::get().SchemeValues.Https,
1997
0
                   "://", saved_headers->getHostValue(), saved_headers->getPathValue()));
1998
0
  return true;
1999
0
}
2000
2001
0
void Filter::runRetryOptionsPredicates(UpstreamRequest& retriable_request) {
2002
0
  for (const auto& options_predicate : route_entry_->retryPolicy().retryOptionsPredicates()) {
2003
0
    const Upstream::RetryOptionsPredicate::UpdateOptionsParameters parameters{
2004
0
        retriable_request.streamInfo(), upstreamSocketOptions()};
2005
0
    auto ret = options_predicate->updateOptions(parameters);
2006
0
    if (ret.new_upstream_socket_options_.has_value()) {
2007
0
      upstream_options_ = ret.new_upstream_socket_options_.value();
2008
0
    }
2009
0
  }
2010
0
}
2011
2012
0
void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry) {
2013
0
  ENVOY_STREAM_LOG(debug, "performing retry", *callbacks_);
2014
2015
0
  is_retry_ = true;
2016
0
  attempt_count_++;
2017
0
  callbacks_->streamInfo().setAttemptCount(attempt_count_);
2018
0
  ASSERT(pending_retries_ > 0);
2019
0
  pending_retries_--;
2020
2021
  // Clusters can technically get removed by CDS during a retry. Make sure it still exists.
2022
0
  const auto cluster = config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
2023
0
  std::unique_ptr<GenericConnPool> generic_conn_pool;
2024
0
  if (cluster != nullptr) {
2025
0
    cluster_ = cluster->info();
2026
0
    generic_conn_pool = createConnPool(*cluster);
2027
0
  }
2028
2029
0
  if (!generic_conn_pool) {
2030
0
    sendNoHealthyUpstreamResponse();
2031
0
    cleanup();
2032
0
    return;
2033
0
  }
2034
0
  UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
2035
0
      *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3,
2036
0
      allow_multiplexed_upstream_half_close_ /*enable_half_close*/);
2037
2038
0
  if (include_attempt_count_in_request_) {
2039
0
    downstream_headers_->setEnvoyAttemptCount(attempt_count_);
2040
0
  }
2041
2042
0
  if (include_timeout_retry_header_in_request_) {
2043
0
    downstream_headers_->setEnvoyIsTimeoutRetry(is_timeout_retry == TimeoutRetry::Yes ? "true"
2044
0
                                                                                      : "false");
2045
0
  }
2046
2047
  // The request timeouts only account for time elapsed since the downstream request completed
2048
  // which might not have happened yet (in which case zero time has elapsed.)
2049
0
  std::chrono::milliseconds elapsed_time = std::chrono::milliseconds::zero();
2050
2051
0
  if (DateUtil::timePointValid(downstream_request_complete_time_)) {
2052
0
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
2053
0
    elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(
2054
0
        dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
2055
0
  }
2056
2057
0
  FilterUtility::setTimeoutHeaders(elapsed_time.count(), timeout_, *route_entry_,
2058
0
                                   *downstream_headers_, !config_->suppress_envoy_headers_,
2059
0
                                   grpc_request_, hedging_params_.hedge_on_per_try_timeout_);
2060
2061
0
  UpstreamRequest* upstream_request_tmp = upstream_request.get();
2062
0
  LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
2063
0
  upstream_requests_.front()->acceptHeadersFromRouter(
2064
0
      !callbacks_->decodingBuffer() && !downstream_trailers_ && downstream_end_stream_);
2065
  // It's possible we got immediately reset which means the upstream request we just
2066
  // added to the front of the list might have been removed, so we need to check to make
2067
  // sure we don't send data on the wrong request.
2068
0
  if (!upstream_requests_.empty() && (upstream_requests_.front().get() == upstream_request_tmp)) {
2069
0
    if (callbacks_->decodingBuffer()) {
2070
      // If we are doing a retry we need to make a copy.
2071
0
      Buffer::OwnedImpl copy(*callbacks_->decodingBuffer());
2072
0
      upstream_requests_.front()->acceptDataFromRouter(copy, !downstream_trailers_ &&
2073
0
                                                                 downstream_end_stream_);
2074
0
    }
2075
2076
0
    if (downstream_trailers_) {
2077
0
      upstream_requests_.front()->acceptTrailersFromRouter(*downstream_trailers_);
2078
0
    }
2079
0
  }
2080
0
}
2081
2082
121
uint32_t Filter::numRequestsAwaitingHeaders() {
2083
121
  return std::count_if(upstream_requests_.begin(), upstream_requests_.end(),
2084
121
                       [](const auto& req) -> bool { return req->awaitingHeaders(); });
2085
121
}
2086
2087
bool Filter::checkDropOverload(Upstream::ThreadLocalCluster& cluster,
2088
1.81k
                               std::function<void(Http::ResponseHeaderMap&)>& modify_headers) {
2089
1.81k
  if (cluster.dropOverload().value()) {
2090
0
    ENVOY_STREAM_LOG(debug, "Router filter: cluster DROP_OVERLOAD configuration: {}", *callbacks_,
2091
0
                     cluster.dropOverload().value());
2092
0
    if (config_->random_.bernoulli(cluster.dropOverload())) {
2093
0
      ENVOY_STREAM_LOG(debug, "The request is dropped by DROP_OVERLOAD", *callbacks_);
2094
0
      callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::DropOverLoad);
2095
0
      chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true);
2096
0
      callbacks_->sendLocalReply(
2097
0
          Http::Code::ServiceUnavailable, "drop overload",
2098
0
          [modify_headers, this](Http::ResponseHeaderMap& headers) {
2099
0
            if (!config_->suppress_envoy_headers_) {
2100
0
              headers.addReference(Http::Headers::get().EnvoyDropOverload,
2101
0
                                   Http::Headers::get().EnvoyDropOverloadValues.True);
2102
0
            }
2103
0
            modify_headers(headers);
2104
0
          },
2105
0
          absl::nullopt, StreamInfo::ResponseCodeDetails::get().DropOverload);
2106
2107
0
      cluster.info()->loadReportStats().upstream_rq_drop_overload_.inc();
2108
0
      return true;
2109
0
    }
2110
0
  }
2111
1.81k
  return false;
2112
1.81k
}
2113
2114
void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or_trailers,
2115
1.63k
                                        UpstreamRequest& upstream_request) {
2116
  // Process the load report only once, so if response has report in headers,
2117
  // then don't process it in trailers.
2118
1.63k
  if (orca_load_report_received_) {
2119
0
    return;
2120
0
  }
2121
  // Check whether we need to send the load report to the LRS or invoke the ORCA
2122
  // callbacks.
2123
1.63k
  auto host = upstream_request.upstreamHost();
2124
1.63k
  const bool need_to_send_load_report =
2125
1.63k
      (host != nullptr) && cluster_->lrsReportMetricNames().has_value();
2126
1.63k
  if (!need_to_send_load_report && !orca_load_report_callbacks_.has_value()) {
2127
1.63k
    return;
2128
1.63k
  }
2129
2130
0
  absl::StatusOr<xds::data::orca::v3::OrcaLoadReport> orca_load_report =
2131
0
      Envoy::Orca::parseOrcaLoadReportHeaders(headers_or_trailers);
2132
0
  if (!orca_load_report.ok()) {
2133
0
    ENVOY_STREAM_LOG(trace, "Headers don't have orca load report: {}", *callbacks_,
2134
0
                     orca_load_report.status().message());
2135
0
    return;
2136
0
  }
2137
2138
0
  orca_load_report_received_ = true;
2139
2140
0
  if (need_to_send_load_report) {
2141
0
    ENVOY_STREAM_LOG(trace, "Adding ORCA load report {} to load metrics", *callbacks_,
2142
0
                     orca_load_report->DebugString());
2143
0
    Envoy::Orca::addOrcaLoadReportToLoadMetricStats(cluster_->lrsReportMetricNames().value(),
2144
0
                                                    orca_load_report.value(),
2145
0
                                                    host->loadMetricStats());
2146
0
  }
2147
0
  if (orca_load_report_callbacks_.has_value()) {
2148
0
    const absl::Status status = orca_load_report_callbacks_->onOrcaLoadReport(*orca_load_report);
2149
0
    if (!status.ok()) {
2150
0
      ENVOY_STREAM_LOG(error, "Failed to invoke OrcaLoadReportCallbacks: {}", *callbacks_,
2151
0
                       status.message());
2152
0
    }
2153
0
  }
2154
0
}
2155
2156
RetryStatePtr
2157
ProdFilter::createRetryState(const RetryPolicy& policy, Http::RequestHeaderMap& request_headers,
2158
                             const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster,
2159
                             RouteStatsContextOptRef route_stats_context,
2160
                             Server::Configuration::CommonFactoryContext& context,
2161
1.81k
                             Event::Dispatcher& dispatcher, Upstream::ResourcePriority priority) {
2162
1.81k
  std::unique_ptr<RetryStateImpl> retry_state =
2163
1.81k
      RetryStateImpl::create(policy, request_headers, cluster, vcluster, route_stats_context,
2164
1.81k
                             context, dispatcher, priority);
2165
1.81k
  if (retry_state != nullptr && retry_state->isAutomaticallyConfiguredForHttp3()) {
2166
    // Since doing retry will make Envoy to buffer the request body, if upstream using HTTP/3 is the
2167
    // only reason for doing retry, set the retry shadow buffer limit to 0 so that we don't retry or
2168
    // buffer safe requests with body which is not common.
2169
0
    setRetryShadowBufferLimit(0);
2170
0
  }
2171
1.81k
  return retry_state;
2172
1.81k
}
2173
2174
} // namespace Router
2175
} // namespace Envoy