Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/router/router.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/router/router.h"
2
3
#include <algorithm>
4
#include <chrono>
5
#include <cstdint>
6
#include <functional>
7
#include <memory>
8
#include <string>
9
10
#include "envoy/event/dispatcher.h"
11
#include "envoy/event/timer.h"
12
#include "envoy/grpc/status.h"
13
#include "envoy/http/conn_pool.h"
14
#include "envoy/runtime/runtime.h"
15
#include "envoy/upstream/cluster_manager.h"
16
#include "envoy/upstream/health_check_host_monitor.h"
17
#include "envoy/upstream/upstream.h"
18
19
#include "source/common/common/assert.h"
20
#include "source/common/common/cleanup.h"
21
#include "source/common/common/empty_string.h"
22
#include "source/common/common/enum_to_int.h"
23
#include "source/common/common/scope_tracker.h"
24
#include "source/common/common/utility.h"
25
#include "source/common/config/utility.h"
26
#include "source/common/grpc/common.h"
27
#include "source/common/http/codes.h"
28
#include "source/common/http/header_map_impl.h"
29
#include "source/common/http/headers.h"
30
#include "source/common/http/message_impl.h"
31
#include "source/common/http/utility.h"
32
#include "source/common/network/application_protocol.h"
33
#include "source/common/network/socket_option_factory.h"
34
#include "source/common/network/transport_socket_options_impl.h"
35
#include "source/common/network/upstream_server_name.h"
36
#include "source/common/network/upstream_socket_options_filter_state.h"
37
#include "source/common/network/upstream_subject_alt_names.h"
38
#include "source/common/router/config_impl.h"
39
#include "source/common/router/debug_config.h"
40
#include "source/common/router/retry_state_impl.h"
41
#include "source/common/runtime/runtime_features.h"
42
#include "source/common/stream_info/uint32_accessor_impl.h"
43
#include "source/common/tracing/http_tracer_impl.h"
44
45
namespace Envoy {
46
namespace Router {
47
namespace {
48
constexpr char NumInternalRedirectsFilterStateName[] = "num_internal_redirects";
49
50
0
uint32_t getLength(const Buffer::Instance* instance) { return instance ? instance->length() : 0; }
51
52
bool schemeIsHttp(const Http::RequestHeaderMap& downstream_headers,
53
0
                  OptRef<const Network::Connection> connection) {
54
0
  if (Http::Utility::schemeIsHttp(downstream_headers.getSchemeValue())) {
55
0
    return true;
56
0
  }
57
0
  if (connection.has_value() && !connection->ssl()) {
58
0
    return true;
59
0
  }
60
0
  return false;
61
0
}
62
63
constexpr uint64_t TimeoutPrecisionFactor = 100;
64
65
} // namespace
66
67
FilterConfig::FilterConfig(Stats::StatName stat_prefix,
68
                           Server::Configuration::FactoryContext& context,
69
                           ShadowWriterPtr&& shadow_writer,
70
                           const envoy::extensions::filters::http::router::v3::Router& config)
71
    : FilterConfig(stat_prefix, context.localInfo(), context.scope(), context.clusterManager(),
72
                   context.runtime(), context.api().randomGenerator(), std::move(shadow_writer),
73
                   PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, dynamic_stats, true),
74
                   config.start_child_span(), config.suppress_envoy_headers(),
75
                   config.respect_expected_rq_timeout(),
76
                   config.suppress_grpc_request_failure_code_stats(),
77
                   config.has_upstream_log_options()
78
                       ? config.upstream_log_options().flush_upstream_log_on_upstream_stream()
79
                       : false,
80
                   config.strict_check_headers(), context.api().timeSource(), context.httpContext(),
81
2.91k
                   context.routerContext()) {
82
2.91k
  for (const auto& upstream_log : config.upstream_log()) {
83
125
    upstream_logs_.push_back(AccessLog::AccessLogFactory::fromProto(upstream_log, context));
84
125
  }
85
86
2.91k
  if (config.has_upstream_log_options() &&
87
2.91k
      config.upstream_log_options().has_upstream_log_flush_interval()) {
88
0
    upstream_log_flush_interval_ = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
89
0
        config.upstream_log_options().upstream_log_flush_interval()));
90
0
  }
91
92
2.91k
  if (config.upstream_http_filters_size() > 0) {
93
63
    auto& server_factory_ctx = context.getServerFactoryContext();
94
63
    const Http::FilterChainUtility::FiltersList& upstream_http_filters =
95
63
        config.upstream_http_filters();
96
63
    std::shared_ptr<Http::UpstreamFilterConfigProviderManager> filter_config_provider_manager =
97
63
        Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager(
98
63
            server_factory_ctx);
99
63
    std::string prefix = context.scope().symbolTable().toString(context.scope().prefix());
100
63
    upstream_ctx_ = std::make_unique<Upstream::UpstreamFactoryContextImpl>(
101
63
        server_factory_ctx, context.initManager(), context.scope());
102
63
    Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext,
103
63
                            Server::Configuration::UpstreamHttpFilterConfigFactory>
104
63
        helper(*filter_config_provider_manager, server_factory_ctx, context.clusterManager(),
105
63
               *upstream_ctx_, prefix);
106
63
    THROW_IF_NOT_OK(helper.processFilters(upstream_http_filters, "router upstream http",
107
63
                                          "router upstream http", upstream_http_filter_factories_));
108
59
  }
109
2.91k
}
110
111
// Express percentage as [0, TimeoutPrecisionFactor] because stats do not accept floating point
112
// values, and getting multiple significant figures on the histogram would be nice.
113
uint64_t FilterUtility::percentageOfTimeout(const std::chrono::milliseconds response_time,
114
6
                                            const std::chrono::milliseconds timeout) {
115
  // Timeouts of 0 are considered infinite. Any portion of an infinite timeout used is still
116
  // none of it.
117
6
  if (timeout.count() == 0) {
118
6
    return 0;
119
6
  }
120
121
0
  return static_cast<uint64_t>(response_time.count() * TimeoutPrecisionFactor / timeout.count());
122
6
}
123
124
24.7k
void FilterUtility::setUpstreamScheme(Http::RequestHeaderMap& headers, bool downstream_secure) {
125
24.7k
  if (Http::Utility::schemeIsValid(headers.getSchemeValue())) {
126
1.28k
    return;
127
1.28k
  }
128
  // After all the changes in https://github.com/envoyproxy/envoy/issues/14587
129
  // this path should only occur if a buggy filter has removed the :scheme
130
  // header. In that case best-effort set from X-Forwarded-Proto.
131
23.5k
  absl::string_view xfp = headers.getForwardedProtoValue();
132
23.5k
  if (Http::Utility::schemeIsValid(xfp)) {
133
0
    headers.setScheme(xfp);
134
0
    return;
135
0
  }
136
137
23.5k
  if (downstream_secure) {
138
0
    headers.setReferenceScheme(Http::Headers::get().SchemeValues.Https);
139
23.5k
  } else {
140
23.5k
    headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http);
141
23.5k
  }
142
23.5k
}
143
144
bool FilterUtility::shouldShadow(const ShadowPolicy& policy, Runtime::Loader& runtime,
145
0
                                 uint64_t stable_random) {
146
147
  // The policy's default value is set correctly regardless of whether there is a runtime key
148
  // or not, thus this call is sufficient for all cases (100% if no runtime set, otherwise
149
  // using the default value within the runtime fractional percent setting).
150
0
  return runtime.snapshot().featureEnabled(policy.runtimeKey(), policy.defaultValue(),
151
0
                                           stable_random);
152
0
}
153
154
TimeoutData FilterUtility::finalTimeout(const RouteEntry& route,
155
                                        Http::RequestHeaderMap& request_headers,
156
                                        bool insert_envoy_expected_request_timeout_ms,
157
                                        bool grpc_request, bool per_try_timeout_hedging_enabled,
158
2.40k
                                        bool respect_expected_rq_timeout) {
159
  // See if there is a user supplied timeout in a request header. If there is we take that.
160
  // Otherwise if the request is gRPC and a maximum gRPC timeout is configured we use the timeout
161
  // in the gRPC headers (or infinity when gRPC headers have no timeout), but cap that timeout to
162
  // the configured maximum gRPC timeout (which may also be infinity, represented by a 0 value),
163
  // or the default from the route config otherwise.
164
2.40k
  TimeoutData timeout;
165
2.40k
  if (!route.usingNewTimeouts()) {
166
2.40k
    if (grpc_request && route.maxGrpcTimeout()) {
167
0
      const std::chrono::milliseconds max_grpc_timeout = route.maxGrpcTimeout().value();
168
0
      auto header_timeout = Grpc::Common::getGrpcTimeout(request_headers);
169
0
      std::chrono::milliseconds grpc_timeout =
170
0
          header_timeout ? header_timeout.value() : std::chrono::milliseconds(0);
171
0
      if (route.grpcTimeoutOffset()) {
172
        // We only apply the offset if it won't result in grpc_timeout hitting 0 or below, as
173
        // setting it to 0 means infinity and a negative timeout makes no sense.
174
0
        const auto offset = *route.grpcTimeoutOffset();
175
0
        if (offset < grpc_timeout) {
176
0
          grpc_timeout -= offset;
177
0
        }
178
0
      }
179
180
      // Cap gRPC timeout to the configured maximum considering that 0 means infinity.
181
0
      if (max_grpc_timeout != std::chrono::milliseconds(0) &&
182
0
          (grpc_timeout == std::chrono::milliseconds(0) || grpc_timeout > max_grpc_timeout)) {
183
0
        grpc_timeout = max_grpc_timeout;
184
0
      }
185
0
      timeout.global_timeout_ = grpc_timeout;
186
2.40k
    } else {
187
2.40k
      timeout.global_timeout_ = route.timeout();
188
2.40k
    }
189
2.40k
  }
190
2.40k
  timeout.per_try_timeout_ = route.retryPolicy().perTryTimeout();
191
2.40k
  timeout.per_try_idle_timeout_ = route.retryPolicy().perTryIdleTimeout();
192
193
2.40k
  uint64_t header_timeout;
194
195
2.40k
  if (respect_expected_rq_timeout) {
196
    // Check if there is timeout set by egress Envoy.
197
    // If present, use that value as route timeout and don't override
198
    // *x-envoy-expected-rq-timeout-ms* header. At this point *x-envoy-upstream-rq-timeout-ms*
199
    // header should have been sanitized by egress Envoy.
200
0
    const Http::HeaderEntry* header_expected_timeout_entry =
201
0
        request_headers.EnvoyExpectedRequestTimeoutMs();
202
0
    if (header_expected_timeout_entry) {
203
0
      trySetGlobalTimeout(*header_expected_timeout_entry, timeout);
204
0
    } else {
205
0
      const Http::HeaderEntry* header_timeout_entry =
206
0
          request_headers.EnvoyUpstreamRequestTimeoutMs();
207
208
0
      if (header_timeout_entry) {
209
0
        trySetGlobalTimeout(*header_timeout_entry, timeout);
210
0
        request_headers.removeEnvoyUpstreamRequestTimeoutMs();
211
0
      }
212
0
    }
213
2.40k
  } else {
214
2.40k
    const Http::HeaderEntry* header_timeout_entry = request_headers.EnvoyUpstreamRequestTimeoutMs();
215
216
2.40k
    if (header_timeout_entry) {
217
0
      trySetGlobalTimeout(*header_timeout_entry, timeout);
218
0
      request_headers.removeEnvoyUpstreamRequestTimeoutMs();
219
0
    }
220
2.40k
  }
221
222
  // See if there is a per try/retry timeout. If it's >= global we just ignore it.
223
2.40k
  const absl::string_view per_try_timeout_entry =
224
2.40k
      request_headers.getEnvoyUpstreamRequestPerTryTimeoutMsValue();
225
2.40k
  if (!per_try_timeout_entry.empty()) {
226
0
    if (absl::SimpleAtoi(per_try_timeout_entry, &header_timeout)) {
227
0
      timeout.per_try_timeout_ = std::chrono::milliseconds(header_timeout);
228
0
    }
229
0
    request_headers.removeEnvoyUpstreamRequestPerTryTimeoutMs();
230
0
  }
231
232
2.40k
  if (timeout.per_try_timeout_ >= timeout.global_timeout_ && timeout.global_timeout_.count() != 0) {
233
0
    timeout.per_try_timeout_ = std::chrono::milliseconds(0);
234
0
  }
235
236
2.40k
  setTimeoutHeaders(0, timeout, route, request_headers, insert_envoy_expected_request_timeout_ms,
237
2.40k
                    grpc_request, per_try_timeout_hedging_enabled);
238
239
2.40k
  return timeout;
240
2.40k
}
241
242
void FilterUtility::setTimeoutHeaders(uint64_t elapsed_time, const TimeoutData& timeout,
243
                                      const RouteEntry& route,
244
                                      Http::RequestHeaderMap& request_headers,
245
                                      bool insert_envoy_expected_request_timeout_ms,
246
2.40k
                                      bool grpc_request, bool per_try_timeout_hedging_enabled) {
247
248
2.40k
  const uint64_t global_timeout = timeout.global_timeout_.count();
249
250
  // See if there is any timeout to write in the expected timeout header.
251
2.40k
  uint64_t expected_timeout = timeout.per_try_timeout_.count();
252
253
  // Use the global timeout if no per try timeout was specified or if we're
254
  // doing hedging when there are per try timeouts. Either of these scenarios
255
  // mean that the upstream server can use the full global timeout.
256
2.40k
  if (per_try_timeout_hedging_enabled || expected_timeout == 0) {
257
2.40k
    expected_timeout = global_timeout;
258
2.40k
  }
259
260
  // If the expected timeout is 0 set no timeout, as Envoy treats 0 as infinite timeout.
261
2.40k
  if (expected_timeout > 0) {
262
263
1.28k
    if (global_timeout > 0) {
264
1.28k
      if (elapsed_time >= global_timeout) {
265
        // We are out of time, but 0 would be an infinite timeout. So instead we send a 1ms timeout
266
        // and assume the timers armed by onRequestComplete() will fire very soon.
267
0
        expected_timeout = 1;
268
1.28k
      } else {
269
1.28k
        expected_timeout = std::min(expected_timeout, global_timeout - elapsed_time);
270
1.28k
      }
271
1.28k
    }
272
273
1.28k
    if (insert_envoy_expected_request_timeout_ms) {
274
1.27k
      request_headers.setEnvoyExpectedRequestTimeoutMs(expected_timeout);
275
1.27k
    }
276
277
    // If we've configured max_grpc_timeout, override the grpc-timeout header with
278
    // the expected timeout. This ensures that the optional per try timeout is reflected
279
    // in grpc-timeout, ensuring that the upstream gRPC server is aware of the actual timeout.
280
1.28k
    if (grpc_request && !route.usingNewTimeouts() && route.maxGrpcTimeout()) {
281
0
      Grpc::Common::toGrpcTimeout(std::chrono::milliseconds(expected_timeout), request_headers);
282
0
    }
283
1.28k
  }
284
2.40k
}
285
286
absl::optional<std::chrono::milliseconds>
287
0
FilterUtility::tryParseHeaderTimeout(const Http::HeaderEntry& header_timeout_entry) {
288
0
  uint64_t header_timeout;
289
0
  if (absl::SimpleAtoi(header_timeout_entry.value().getStringView(), &header_timeout)) {
290
0
    return std::chrono::milliseconds(header_timeout);
291
0
  }
292
0
  return absl::nullopt;
293
0
}
294
295
void FilterUtility::trySetGlobalTimeout(const Http::HeaderEntry& header_timeout_entry,
296
0
                                        TimeoutData& timeout) {
297
0
  const auto timeout_ms = tryParseHeaderTimeout(header_timeout_entry);
298
0
  if (timeout_ms.has_value()) {
299
0
    timeout.global_timeout_ = timeout_ms.value();
300
0
  }
301
0
}
302
303
FilterUtility::HedgingParams
304
FilterUtility::finalHedgingParams(const RouteEntry& route,
305
2.40k
                                  Http::RequestHeaderMap& request_headers) {
306
2.40k
  HedgingParams hedging_params;
307
2.40k
  hedging_params.hedge_on_per_try_timeout_ = route.hedgePolicy().hedgeOnPerTryTimeout();
308
309
2.40k
  const Http::HeaderEntry* hedge_on_per_try_timeout_entry =
310
2.40k
      request_headers.EnvoyHedgeOnPerTryTimeout();
311
2.40k
  if (hedge_on_per_try_timeout_entry) {
312
0
    if (hedge_on_per_try_timeout_entry->value() == "true") {
313
0
      hedging_params.hedge_on_per_try_timeout_ = true;
314
0
    }
315
0
    if (hedge_on_per_try_timeout_entry->value() == "false") {
316
0
      hedging_params.hedge_on_per_try_timeout_ = false;
317
0
    }
318
319
0
    request_headers.removeEnvoyHedgeOnPerTryTimeout();
320
0
  }
321
322
2.40k
  return hedging_params;
323
2.40k
}
324
325
2.88k
Filter::~Filter() {
326
  // Upstream resources should already have been cleaned.
327
2.88k
  ASSERT(upstream_requests_.empty());
328
2.88k
  ASSERT(!retry_state_);
329
330
  // Unregister from shadow stream notifications and cancel active streams.
331
2.88k
  for (auto* shadow_stream : shadow_streams_) {
332
0
    shadow_stream->removeDestructorCallback();
333
0
    shadow_stream->removeWatermarkCallbacks();
334
0
    shadow_stream->cancel();
335
0
  }
336
2.88k
}
337
338
const FilterUtility::StrictHeaderChecker::HeaderCheckResult
339
FilterUtility::StrictHeaderChecker::checkHeader(Http::RequestHeaderMap& headers,
340
0
                                                const Http::LowerCaseString& target_header) {
341
0
  if (target_header == Http::Headers::get().EnvoyUpstreamRequestTimeoutMs) {
342
0
    return isInteger(headers.EnvoyUpstreamRequestTimeoutMs());
343
0
  } else if (target_header == Http::Headers::get().EnvoyUpstreamRequestPerTryTimeoutMs) {
344
0
    return isInteger(headers.EnvoyUpstreamRequestPerTryTimeoutMs());
345
0
  } else if (target_header == Http::Headers::get().EnvoyMaxRetries) {
346
0
    return isInteger(headers.EnvoyMaxRetries());
347
0
  } else if (target_header == Http::Headers::get().EnvoyRetryOn) {
348
0
    return hasValidRetryFields(headers.EnvoyRetryOn(), &Router::RetryStateImpl::parseRetryOn);
349
0
  } else if (target_header == Http::Headers::get().EnvoyRetryGrpcOn) {
350
0
    return hasValidRetryFields(headers.EnvoyRetryGrpcOn(),
351
0
                               &Router::RetryStateImpl::parseRetryGrpcOn);
352
0
  }
353
  // Should only validate headers for which we have implemented a validator.
354
0
  PANIC("unexpectedly reached");
355
0
}
356
357
3.36k
Stats::StatName Filter::upstreamZone(Upstream::HostDescriptionConstSharedPtr upstream_host) {
358
3.36k
  return upstream_host ? upstream_host->localityZoneStatName() : config_.empty_stat_name_;
359
3.36k
}
360
361
void Filter::chargeUpstreamCode(uint64_t response_status_code,
362
                                const Http::ResponseHeaderMap& response_headers,
363
                                Upstream::HostDescriptionConstSharedPtr upstream_host,
364
2.27k
                                bool dropped) {
365
  // Passing the response_status_code explicitly is an optimization to avoid
366
  // multiple calls to slow Http::Utility::getResponseStatus.
367
2.27k
  ASSERT(response_status_code == Http::Utility::getResponseStatus(response_headers));
368
2.27k
  if (config_.emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck()) {
369
2.27k
    const Http::HeaderEntry* upstream_canary_header = response_headers.EnvoyUpstreamCanary();
370
2.27k
    const bool is_canary = (upstream_canary_header && upstream_canary_header->value() == "true") ||
371
2.27k
                           (upstream_host ? upstream_host->canary() : false);
372
2.27k
    const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_);
373
374
2.27k
    Stats::StatName upstream_zone = upstreamZone(upstream_host);
375
2.27k
    Http::CodeStats::ResponseStatInfo info{
376
2.27k
        config_.scope_,
377
2.27k
        cluster_->statsScope(),
378
2.27k
        config_.empty_stat_name_,
379
2.27k
        response_status_code,
380
2.27k
        internal_request,
381
2.27k
        route_entry_->virtualHost().statName(),
382
2.27k
        request_vcluster_ ? request_vcluster_->statName() : config_.empty_stat_name_,
383
2.27k
        route_stats_context_.has_value() ? route_stats_context_->statName()
384
2.27k
                                         : config_.empty_stat_name_,
385
2.27k
        config_.zone_name_,
386
2.27k
        upstream_zone,
387
2.27k
        is_canary};
388
389
2.27k
    Http::CodeStats& code_stats = httpContext().codeStats();
390
2.27k
    code_stats.chargeResponseStat(info, exclude_http_code_stats_);
391
392
2.27k
    if (alt_stat_prefix_ != nullptr) {
393
0
      Http::CodeStats::ResponseStatInfo alt_info{config_.scope_,
394
0
                                                 cluster_->statsScope(),
395
0
                                                 alt_stat_prefix_->statName(),
396
0
                                                 response_status_code,
397
0
                                                 internal_request,
398
0
                                                 config_.empty_stat_name_,
399
0
                                                 config_.empty_stat_name_,
400
0
                                                 config_.empty_stat_name_,
401
0
                                                 config_.zone_name_,
402
0
                                                 upstream_zone,
403
0
                                                 is_canary};
404
0
      code_stats.chargeResponseStat(alt_info, exclude_http_code_stats_);
405
0
    }
406
407
2.27k
    if (dropped) {
408
0
      cluster_->loadReportStats().upstream_rq_dropped_.inc();
409
0
    }
410
2.27k
    if (upstream_host && Http::CodeUtility::is5xx(response_status_code)) {
411
58
      upstream_host->stats().rq_error_.inc();
412
58
    }
413
2.27k
  }
414
2.27k
}
415
416
void Filter::chargeUpstreamCode(Http::Code code,
417
                                Upstream::HostDescriptionConstSharedPtr upstream_host,
418
58
                                bool dropped) {
419
58
  const uint64_t response_status_code = enumToInt(code);
420
58
  const auto fake_response_headers = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(
421
58
      {{Http::Headers::get().Status, std::to_string(response_status_code)}});
422
58
  chargeUpstreamCode(response_status_code, *fake_response_headers, upstream_host, dropped);
423
58
}
424
425
2.58k
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
426
2.58k
  downstream_headers_ = &headers;
427
428
  // Extract debug configuration from filter state. This is used further along to determine whether
429
  // we should append cluster and host headers to the response, and whether to forward the request
430
  // upstream.
431
2.58k
  const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
432
2.58k
  const DebugConfig* debug_config = filter_state->getDataReadOnly<DebugConfig>(DebugConfig::key());
433
434
  // TODO: Maybe add a filter API for this.
435
2.58k
  grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
436
2.58k
  exclude_http_code_stats_ = grpc_request_ && config_.suppress_grpc_request_failure_code_stats_;
437
438
  // Only increment rq total stat if we actually decode headers here. This does not count requests
439
  // that get handled by earlier filters.
440
2.58k
  stats_.rq_total_.inc();
441
442
  // Initialize the `modify_headers` function as a no-op (so we don't have to remember to check it
443
  // against nullptr before calling it), and feed it behavior later if/when we have cluster info
444
  // headers to append.
445
2.58k
  std::function<void(Http::ResponseHeaderMap&)> modify_headers = [](Http::ResponseHeaderMap&) {};
446
447
  // Determine if there is a route entry or a direct response for the request.
448
2.58k
  route_ = callbacks_->route();
449
2.58k
  if (!route_) {
450
0
    stats_.no_route_.inc();
451
0
    ENVOY_STREAM_LOG(debug, "no route match for URL '{}'", *callbacks_, headers.getPathValue());
452
453
0
    callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound);
454
0
    callbacks_->sendLocalReply(Http::Code::NotFound, "", modify_headers, absl::nullopt,
455
0
                               StreamInfo::ResponseCodeDetails::get().RouteNotFound);
456
0
    return Http::FilterHeadersStatus::StopIteration;
457
0
  }
458
459
  // Determine if there is a direct response for the request.
460
2.58k
  const auto* direct_response = route_->directResponseEntry();
461
2.58k
  if (direct_response != nullptr) {
462
171
    stats_.rq_direct_response_.inc();
463
171
    direct_response->rewritePathHeader(headers, !config_.suppress_envoy_headers_);
464
171
    callbacks_->sendLocalReply(
465
171
        direct_response->responseCode(), direct_response->responseBody(),
466
171
        [this, direct_response,
467
171
         &request_headers = headers](Http::ResponseHeaderMap& response_headers) -> void {
468
171
          std::string new_uri;
469
171
          if (request_headers.Path()) {
470
171
            new_uri = direct_response->newUri(request_headers);
471
171
          }
472
          // See https://tools.ietf.org/html/rfc7231#section-7.1.2.
473
171
          const auto add_location =
474
171
              direct_response->responseCode() == Http::Code::Created ||
475
171
              Http::CodeUtility::is3xx(enumToInt(direct_response->responseCode()));
476
171
          if (!new_uri.empty() && add_location) {
477
1
            response_headers.addReferenceKey(Http::Headers::get().Location, new_uri);
478
1
          }
479
171
          direct_response->finalizeResponseHeaders(response_headers, callbacks_->streamInfo());
480
171
        },
481
171
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().DirectResponse);
482
171
    return Http::FilterHeadersStatus::StopIteration;
483
171
  }
484
485
  // A route entry matches for the request.
486
2.41k
  route_entry_ = route_->routeEntry();
487
  // If there's a route specific limit and it's smaller than general downstream
488
  // limits, apply the new cap.
489
2.41k
  retry_shadow_buffer_limit_ =
490
2.41k
      std::min(retry_shadow_buffer_limit_, route_entry_->retryShadowBufferLimit());
491
2.41k
  if (debug_config && debug_config->append_cluster_) {
492
    // The cluster name will be appended to any local or upstream responses from this point.
493
0
    modify_headers = [this, debug_config](Http::ResponseHeaderMap& headers) {
494
0
      headers.addCopy(debug_config->cluster_header_.value_or(Http::Headers::get().EnvoyCluster),
495
0
                      route_entry_->clusterName());
496
0
    };
497
0
  }
498
2.41k
  Upstream::ThreadLocalCluster* cluster =
499
2.41k
      config_.cm_.getThreadLocalCluster(route_entry_->clusterName());
500
2.41k
  if (!cluster) {
501
18
    stats_.no_cluster_.inc();
502
18
    ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName());
503
504
18
    callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoClusterFound);
505
18
    callbacks_->sendLocalReply(route_entry_->clusterNotFoundResponseCode(), "", modify_headers,
506
18
                               absl::nullopt,
507
18
                               StreamInfo::ResponseCodeDetails::get().ClusterNotFound);
508
18
    return Http::FilterHeadersStatus::StopIteration;
509
18
  }
510
2.40k
  cluster_ = cluster->info();
511
512
  // Set up stat prefixes, etc.
513
2.40k
  request_vcluster_ = route_entry_->virtualCluster(headers);
514
2.40k
  if (request_vcluster_ != nullptr) {
515
6
    callbacks_->streamInfo().setVirtualClusterName(request_vcluster_->name());
516
6
  }
517
2.40k
  route_stats_context_ = route_entry_->routeStatsContext();
518
2.40k
  ENVOY_STREAM_LOG(debug, "cluster '{}' match for URL '{}'", *callbacks_,
519
2.40k
                   route_entry_->clusterName(), headers.getPathValue());
520
521
2.40k
  if (config_.strict_check_headers_ != nullptr) {
522
0
    for (const auto& header : *config_.strict_check_headers_) {
523
0
      const auto res = FilterUtility::StrictHeaderChecker::checkHeader(headers, header);
524
0
      if (!res.valid_) {
525
0
        callbacks_->streamInfo().setResponseFlag(
526
0
            StreamInfo::ResponseFlag::InvalidEnvoyRequestHeaders);
527
0
        const std::string body = fmt::format("invalid header '{}' with value '{}'",
528
0
                                             std::string(res.entry_->key().getStringView()),
529
0
                                             std::string(res.entry_->value().getStringView()));
530
0
        const std::string details =
531
0
            absl::StrCat(StreamInfo::ResponseCodeDetails::get().InvalidEnvoyRequestHeaders, "{",
532
0
                         StringUtil::replaceAllEmptySpace(res.entry_->key().getStringView()), "}");
533
0
        callbacks_->sendLocalReply(Http::Code::BadRequest, body, nullptr, absl::nullopt, details);
534
0
        return Http::FilterHeadersStatus::StopIteration;
535
0
      }
536
0
    }
537
0
  }
538
539
2.40k
  const Http::HeaderEntry* request_alt_name = headers.EnvoyUpstreamAltStatName();
540
2.40k
  if (request_alt_name) {
541
0
    alt_stat_prefix_ = std::make_unique<Stats::StatNameDynamicStorage>(
542
0
        request_alt_name->value().getStringView(), config_.scope_.symbolTable());
543
0
    headers.removeEnvoyUpstreamAltStatName();
544
0
  }
545
546
  // See if we are supposed to immediately kill some percentage of this cluster's traffic.
547
2.40k
  if (cluster_->maintenanceMode()) {
548
0
    callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
549
0
    chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true);
550
0
    callbacks_->sendLocalReply(
551
0
        Http::Code::ServiceUnavailable, "maintenance mode",
552
0
        [modify_headers, this](Http::ResponseHeaderMap& headers) {
553
0
          if (!config_.suppress_envoy_headers_) {
554
0
            headers.addReference(Http::Headers::get().EnvoyOverloaded,
555
0
                                 Http::Headers::get().EnvoyOverloadedValues.True);
556
0
          }
557
          // Note: append_cluster_info does not respect suppress_envoy_headers.
558
0
          modify_headers(headers);
559
0
        },
560
0
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().MaintenanceMode);
561
0
    cluster_->trafficStats()->upstream_rq_maintenance_mode_.inc();
562
0
    return Http::FilterHeadersStatus::StopIteration;
563
0
  }
564
565
  // Fetch a connection pool for the upstream cluster.
566
2.40k
  const auto& upstream_http_protocol_options = cluster_->upstreamHttpProtocolOptions();
567
568
2.40k
  if (upstream_http_protocol_options.has_value() &&
569
2.40k
      (upstream_http_protocol_options.value().auto_sni() ||
570
0
       upstream_http_protocol_options.value().auto_san_validation())) {
571
    // Default the header to Host/Authority header.
572
0
    absl::string_view header_value = headers.getHostValue();
573
574
    // Check whether `override_auto_sni_header` is specified.
575
0
    const auto override_auto_sni_header =
576
0
        upstream_http_protocol_options.value().override_auto_sni_header();
577
0
    if (!override_auto_sni_header.empty()) {
578
      // Use the header value from `override_auto_sni_header` to set the SNI value.
579
0
      const auto overridden_header_value = Http::HeaderUtility::getAllOfHeaderAsString(
580
0
          headers, Http::LowerCaseString(override_auto_sni_header));
581
0
      if (overridden_header_value.result().has_value() &&
582
0
          !overridden_header_value.result().value().empty()) {
583
0
        header_value = overridden_header_value.result().value();
584
0
      }
585
0
    }
586
0
    const auto parsed_authority = Http::Utility::parseAuthority(header_value);
587
0
    bool should_set_sni = !parsed_authority.is_ip_address_;
588
    // `host_` returns a string_view so doing this should be safe.
589
0
    absl::string_view sni_value = parsed_authority.host_;
590
591
0
    if (should_set_sni && upstream_http_protocol_options.value().auto_sni() &&
592
0
        !callbacks_->streamInfo().filterState()->hasDataWithName(
593
0
            Network::UpstreamServerName::key())) {
594
0
      callbacks_->streamInfo().filterState()->setData(
595
0
          Network::UpstreamServerName::key(),
596
0
          std::make_unique<Network::UpstreamServerName>(sni_value),
597
0
          StreamInfo::FilterState::StateType::Mutable);
598
0
    }
599
600
0
    if (upstream_http_protocol_options.value().auto_san_validation() &&
601
0
        !callbacks_->streamInfo().filterState()->hasDataWithName(
602
0
            Network::UpstreamSubjectAltNames::key())) {
603
0
      callbacks_->streamInfo().filterState()->setData(
604
0
          Network::UpstreamSubjectAltNames::key(),
605
0
          std::make_unique<Network::UpstreamSubjectAltNames>(
606
0
              std::vector<std::string>{std::string(sni_value)}),
607
0
          StreamInfo::FilterState::StateType::Mutable);
608
0
    }
609
0
  }
610
611
2.40k
  transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState(
612
2.40k
      *callbacks_->streamInfo().filterState());
613
614
2.40k
  if (auto downstream_connection = downstreamConnection(); downstream_connection != nullptr) {
615
1.28k
    if (auto typed_state = downstream_connection->streamInfo()
616
1.28k
                               .filterState()
617
1.28k
                               .getDataReadOnly<Network::UpstreamSocketOptionsFilterState>(
618
1.28k
                                   Network::UpstreamSocketOptionsFilterState::key());
619
1.28k
        typed_state != nullptr) {
620
0
      auto downstream_options = typed_state->value();
621
0
      if (!upstream_options_) {
622
0
        upstream_options_ = std::make_shared<Network::Socket::Options>();
623
0
      }
624
0
      Network::Socket::appendOptions(upstream_options_, downstream_options);
625
0
    }
626
1.28k
  }
627
628
2.40k
  if (upstream_options_ && callbacks_->getUpstreamSocketOptions()) {
629
0
    Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions());
630
0
  }
631
632
2.40k
  std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
633
634
2.40k
  if (!generic_conn_pool) {
635
0
    sendNoHealthyUpstreamResponse();
636
0
    return Http::FilterHeadersStatus::StopIteration;
637
0
  }
638
2.40k
  Upstream::HostDescriptionConstSharedPtr host = generic_conn_pool->host();
639
640
2.40k
  if (debug_config && debug_config->append_upstream_host_) {
641
    // The hostname and address will be appended to any local or upstream responses from this point,
642
    // possibly in addition to the cluster name.
643
0
    modify_headers = [modify_headers, debug_config, host](Http::ResponseHeaderMap& headers) {
644
0
      modify_headers(headers);
645
0
      headers.addCopy(
646
0
          debug_config->hostname_header_.value_or(Http::Headers::get().EnvoyUpstreamHostname),
647
0
          host->hostname());
648
0
      headers.addCopy(debug_config->host_address_header_.value_or(
649
0
                          Http::Headers::get().EnvoyUpstreamHostAddress),
650
0
                      host->address()->asString());
651
0
    };
652
0
  }
653
654
  // If we've been instructed not to forward the request upstream, send an empty local response.
655
2.40k
  if (debug_config && debug_config->do_not_forward_) {
656
0
    modify_headers = [modify_headers, debug_config](Http::ResponseHeaderMap& headers) {
657
0
      modify_headers(headers);
658
0
      headers.addCopy(
659
0
          debug_config->not_forwarded_header_.value_or(Http::Headers::get().EnvoyNotForwarded),
660
0
          "true");
661
0
    };
662
0
    callbacks_->sendLocalReply(Http::Code::NoContent, "", modify_headers, absl::nullopt, "");
663
0
    return Http::FilterHeadersStatus::StopIteration;
664
0
  }
665
666
2.40k
  hedging_params_ = FilterUtility::finalHedgingParams(*route_entry_, headers);
667
668
2.40k
  timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_.suppress_envoy_headers_,
669
2.40k
                                         grpc_request_, hedging_params_.hedge_on_per_try_timeout_,
670
2.40k
                                         config_.respect_expected_rq_timeout_);
671
672
2.40k
  const Http::HeaderEntry* header_max_stream_duration_entry =
673
2.40k
      headers.EnvoyUpstreamStreamDurationMs();
674
2.40k
  if (header_max_stream_duration_entry) {
675
0
    dynamic_max_stream_duration_ =
676
0
        FilterUtility::tryParseHeaderTimeout(*header_max_stream_duration_entry);
677
0
    headers.removeEnvoyUpstreamStreamDurationMs();
678
0
  }
679
680
  // If this header is set with any value, use an alternate response code on timeout
681
2.40k
  if (headers.EnvoyUpstreamRequestTimeoutAltResponse()) {
682
0
    timeout_response_code_ = Http::Code::NoContent;
683
0
    headers.removeEnvoyUpstreamRequestTimeoutAltResponse();
684
0
  }
685
686
2.40k
  include_attempt_count_in_request_ = route_entry_->includeAttemptCountInRequest();
687
2.40k
  if (include_attempt_count_in_request_) {
688
0
    headers.setEnvoyAttemptCount(attempt_count_);
689
0
  }
690
691
  // The router has reached a point where it is going to try to send a request upstream,
692
  // so now modify_headers should attach x-envoy-attempt-count to the downstream response if the
693
  // config flag is true.
694
2.40k
  if (route_entry_->includeAttemptCountInResponse()) {
695
0
    modify_headers = [modify_headers, this](Http::ResponseHeaderMap& headers) {
696
0
      modify_headers(headers);
697
698
      // This header is added without checking for config_.suppress_envoy_headers_ to mirror what is
699
      // done for upstream requests.
700
0
      headers.setEnvoyAttemptCount(attempt_count_);
701
0
    };
702
0
  }
703
2.40k
  callbacks_->streamInfo().setAttemptCount(attempt_count_);
704
705
2.40k
  route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(),
706
2.40k
                                       !config_.suppress_envoy_headers_);
707
2.40k
  FilterUtility::setUpstreamScheme(
708
2.40k
      headers, callbacks_->streamInfo().downstreamAddressProvider().sslConnection() != nullptr);
709
710
  // Ensure an http transport scheme is selected before continuing with decoding.
711
2.40k
  ASSERT(headers.Scheme());
712
713
2.40k
  retry_state_ =
714
2.40k
      createRetryState(route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_,
715
2.40k
                       route_stats_context_, config_.runtime_, config_.random_,
716
2.40k
                       callbacks_->dispatcher(), config_.timeSource(), route_entry_->priority());
717
718
  // Determine which shadow policies to use. It's possible that we don't do any shadowing due to
719
  // runtime keys. Also the method CONNECT doesn't support shadowing.
720
2.40k
  auto method = headers.getMethodValue();
721
2.40k
  if (method != Http::Headers::get().MethodValues.Connect) {
722
2.40k
    for (const auto& shadow_policy : route_entry_->shadowPolicies()) {
723
0
      const auto& policy_ref = *shadow_policy;
724
0
      if (FilterUtility::shouldShadow(policy_ref, config_.runtime_, callbacks_->streamId())) {
725
0
        active_shadow_policies_.push_back(std::cref(policy_ref));
726
0
        shadow_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*downstream_headers_);
727
0
      }
728
0
    }
729
2.40k
  }
730
731
2.40k
  ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers);
732
733
  // Hang onto the modify_headers function for later use in handling upstream responses.
734
2.40k
  modify_headers_ = modify_headers;
735
736
2.40k
  const bool can_send_early_data =
737
2.40k
      route_entry_->earlyDataPolicy().allowsEarlyDataForRequest(*downstream_headers_);
738
739
2.40k
  include_timeout_retry_header_in_request_ =
740
2.40k
      route_entry_->virtualHost().includeIsTimeoutRetryHeader();
741
742
  // Set initial HTTP/3 use based on the presence of HTTP/1.1 proxy config.
743
  // For retries etc, HTTP/3 usability may transition from true to false, but
744
  // will never transition from false to true.
745
2.40k
  bool can_use_http3 =
746
2.40k
      !transport_socket_options_ || !transport_socket_options_->http11ProxyInfo().has_value();
747
2.40k
  UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
748
2.40k
      *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3);
749
2.40k
  LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
750
2.40k
  upstream_requests_.front()->acceptHeadersFromRouter(end_stream);
751
2.40k
  if (streaming_shadows_) {
752
    // start the shadow streams.
753
0
    for (const auto& shadow_policy_wrapper : active_shadow_policies_) {
754
0
      const auto& shadow_policy = shadow_policy_wrapper.get();
755
0
      const absl::optional<absl::string_view> shadow_cluster_name =
756
0
          getShadowCluster(shadow_policy, *downstream_headers_);
757
0
      if (!shadow_cluster_name.has_value()) {
758
0
        continue;
759
0
      }
760
0
      auto shadow_headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_);
761
0
      auto options =
762
0
          Http::AsyncClient::RequestOptions()
763
0
              .setTimeout(timeout_.global_timeout_)
764
0
              .setParentSpan(callbacks_->activeSpan())
765
0
              .setChildSpanName("mirror")
766
0
              .setSampled(shadow_policy.traceSampled())
767
0
              .setIsShadow(true)
768
0
              .setBufferAccount(callbacks_->account())
769
              // A buffer limit of 1 is set in the case that retry_shadow_buffer_limit_ == 0,
770
              // because a buffer limit of zero on async clients is interpreted as no buffer limit.
771
0
              .setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_);
772
0
      options.setFilterConfig(config_);
773
0
      if (end_stream) {
774
        // This is a header-only request, and can be dispatched immediately to the shadow
775
        // without waiting.
776
0
        Http::RequestMessagePtr request(new Http::RequestMessageImpl(
777
0
            Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_)));
778
0
        config_.shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request),
779
0
                                      options);
780
0
      } else {
781
0
        Http::AsyncClient::OngoingRequest* shadow_stream = config_.shadowWriter().streamingShadow(
782
0
            std::string(shadow_cluster_name.value()), std::move(shadow_headers), options);
783
0
        if (shadow_stream != nullptr) {
784
0
          shadow_streams_.insert(shadow_stream);
785
0
          shadow_stream->setDestructorCallback(
786
0
              [this, shadow_stream]() { shadow_streams_.erase(shadow_stream); });
787
0
          shadow_stream->setWatermarkCallbacks(*callbacks_);
788
0
        }
789
0
      }
790
0
    }
791
0
  }
792
2.40k
  if (end_stream) {
793
857
    onRequestComplete();
794
857
  }
795
796
2.40k
  return Http::FilterHeadersStatus::StopIteration;
797
2.40k
}
798
799
std::unique_ptr<GenericConnPool>
800
2.40k
Filter::createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster) {
801
2.40k
  GenericConnPoolFactory* factory = nullptr;
802
2.40k
  if (cluster_->upstreamConfig().has_value()) {
803
0
    factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>(
804
0
        cluster_->upstreamConfig().ref());
805
0
    ENVOY_BUG(factory != nullptr,
806
0
              fmt::format("invalid factory type '{}', failing over to default upstream",
807
0
                          cluster_->upstreamConfig().ref().DebugString()));
808
0
  }
809
2.40k
  if (!factory) {
810
2.40k
    factory = &config_.router_context_.genericConnPoolFactory();
811
2.40k
  }
812
813
2.40k
  using UpstreamProtocol = Envoy::Router::GenericConnPoolFactory::UpstreamProtocol;
814
2.40k
  UpstreamProtocol upstream_protocol = UpstreamProtocol::HTTP;
815
2.40k
  if (route_entry_->connectConfig().has_value()) {
816
0
    auto method = downstream_headers_->getMethodValue();
817
0
    if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.enable_connect_udp_support") &&
818
0
        Http::HeaderUtility::isConnectUdpRequest(*downstream_headers_)) {
819
0
      upstream_protocol = UpstreamProtocol::UDP;
820
0
    } else if (method == Http::Headers::get().MethodValues.Connect ||
821
0
               (route_entry_->connectConfig()->allow_post() &&
822
0
                method == Http::Headers::get().MethodValues.Post)) {
823
      // Allow POST for proxying raw TCP if it is configured.
824
0
      upstream_protocol = UpstreamProtocol::TCP;
825
0
    }
826
0
  }
827
2.40k
  return factory->createGenericConnPool(thread_local_cluster, upstream_protocol,
828
2.40k
                                        route_entry_->priority(),
829
2.40k
                                        callbacks_->streamInfo().protocol(), this);
830
2.40k
}
831
832
0
void Filter::sendNoHealthyUpstreamResponse() {
833
0
  callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream);
834
0
  chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, false);
835
0
  callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream", modify_headers_,
836
0
                             absl::nullopt,
837
0
                             StreamInfo::ResponseCodeDetails::get().NoHealthyUpstream);
838
0
}
839
840
4.66k
Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) {
841
  // upstream_requests_.size() cannot be > 1 because that only happens when a per
842
  // try timeout occurs with hedge_on_per_try_timeout enabled but the per
843
  // try timeout timer is not started until onRequestComplete(). It could be zero
844
  // if the first request attempt has already failed and a retry is waiting for
845
  // a backoff timer.
846
4.66k
  ASSERT(upstream_requests_.size() <= 1);
847
848
4.66k
  bool buffering = (retry_state_ && retry_state_->enabled()) ||
849
4.66k
                   (!active_shadow_policies_.empty() && !streaming_shadows_) ||
850
4.66k
                   (route_entry_ && route_entry_->internalRedirectPolicy().enabled());
851
4.66k
  if (buffering &&
852
4.66k
      getLength(callbacks_->decodingBuffer()) + data.length() > retry_shadow_buffer_limit_) {
853
0
    ENVOY_LOG(debug,
854
0
              "The request payload has at least {} bytes data which exceeds buffer limit {}. Give "
855
0
              "up on the retry/shadow.",
856
0
              getLength(callbacks_->decodingBuffer()) + data.length(), retry_shadow_buffer_limit_);
857
0
    cluster_->trafficStats()->retry_or_shadow_abandoned_.inc();
858
0
    retry_state_.reset();
859
0
    buffering = false;
860
0
    active_shadow_policies_.clear();
861
0
    request_buffer_overflowed_ = true;
862
863
    // If we had to abandon buffering and there's no request in progress, abort the request and
864
    // clean up. This happens if the initial upstream request failed, and we are currently waiting
865
    // for a backoff timer before starting the next upstream attempt.
866
0
    if (upstream_requests_.empty()) {
867
0
      cleanup();
868
0
      callbacks_->sendLocalReply(
869
0
          Http::Code::InsufficientStorage, "exceeded request buffer limit while retrying upstream",
870
0
          modify_headers_, absl::nullopt,
871
0
          StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit);
872
0
      return Http::FilterDataStatus::StopIterationNoBuffer;
873
0
    }
874
0
  }
875
876
  // If we aren't buffering and there is no active request, an abort should have occurred
877
  // already.
878
4.66k
  ASSERT(buffering || !upstream_requests_.empty());
879
880
4.66k
  for (auto* shadow_stream : shadow_streams_) {
881
0
    if (end_stream) {
882
0
      shadow_stream->removeDestructorCallback();
883
0
      shadow_stream->removeWatermarkCallbacks();
884
0
    }
885
0
    Buffer::OwnedImpl copy(data);
886
0
    shadow_stream->sendData(copy, end_stream);
887
0
  }
888
4.66k
  if (end_stream) {
889
1.35k
    shadow_streams_.clear();
890
1.35k
  }
891
4.66k
  if (buffering) {
892
0
    if (!upstream_requests_.empty()) {
893
0
      Buffer::OwnedImpl copy(data);
894
0
      upstream_requests_.front()->acceptDataFromRouter(copy, end_stream);
895
0
    }
896
897
    // If we are potentially going to retry or buffer shadow this request we need to buffer.
898
    // This will not cause the connection manager to 413 because before we hit the
899
    // buffer limit we give up on retries and buffering. We must buffer using addDecodedData()
900
    // so that all buffered data is available by the time we do request complete processing and
901
    // potentially shadow. Additionally, we can't do a copy here because there's a check down
902
    // this stack for whether `data` is the same buffer as already buffered data.
903
0
    callbacks_->addDecodedData(data, true);
904
4.66k
  } else {
905
4.66k
    upstream_requests_.front()->acceptDataFromRouter(data, end_stream);
906
4.66k
  }
907
908
4.66k
  if (end_stream) {
909
1.35k
    onRequestComplete();
910
1.35k
  }
911
912
4.66k
  return Http::FilterDataStatus::StopIterationNoBuffer;
913
4.66k
}
914
915
0
Http::FilterTrailersStatus Filter::decodeTrailers(Http::RequestTrailerMap& trailers) {
916
0
  ENVOY_STREAM_LOG(debug, "router decoding trailers:\n{}", *callbacks_, trailers);
917
918
0
  if (shadow_headers_) {
919
0
    shadow_trailers_ = Http::createHeaderMap<Http::RequestTrailerMapImpl>(trailers);
920
0
  }
921
922
  // upstream_requests_.size() cannot be > 1 because that only happens when a per
923
  // try timeout occurs with hedge_on_per_try_timeout enabled but the per
924
  // try timeout timer is not started until onRequestComplete(). It could be zero
925
  // if the first request attempt has already failed and a retry is waiting for
926
  // a backoff timer.
927
0
  ASSERT(upstream_requests_.size() <= 1);
928
0
  downstream_trailers_ = &trailers;
929
0
  if (!upstream_requests_.empty()) {
930
0
    upstream_requests_.front()->acceptTrailersFromRouter(trailers);
931
0
  }
932
0
  for (auto* shadow_stream : shadow_streams_) {
933
0
    shadow_stream->removeDestructorCallback();
934
0
    shadow_stream->removeWatermarkCallbacks();
935
0
    shadow_stream->captureAndSendTrailers(
936
0
        Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_));
937
0
  }
938
0
  shadow_streams_.clear();
939
940
0
  onRequestComplete();
941
0
  return Http::FilterTrailersStatus::StopIteration;
942
0
}
943
944
6
Http::FilterMetadataStatus Filter::decodeMetadata(Http::MetadataMap& metadata_map) {
945
6
  Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
946
6
  if (!upstream_requests_.empty()) {
947
    // TODO(soya3129): Save metadata for retry, redirect and shadowing case.
948
6
    upstream_requests_.front()->acceptMetadataFromRouter(std::move(metadata_map_ptr));
949
6
  }
950
6
  return Http::FilterMetadataStatus::Continue;
951
6
}
952
953
2.88k
void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
954
2.88k
  callbacks_ = &callbacks;
955
  // As the decoder filter only pushes back via watermarks once data has reached
956
  // it, it can latch the current buffer limit and does not need to update the
957
  // limit if another filter increases it.
958
  //
959
  // The default is "do not limit". If there are configured (non-zero) buffer
960
  // limits, apply them here.
961
2.88k
  if (callbacks_->decoderBufferLimit() != 0) {
962
1.74k
    retry_shadow_buffer_limit_ = callbacks_->decoderBufferLimit();
963
1.74k
  }
964
2.88k
}
965
966
5.25k
void Filter::cleanup() {
967
  // All callers of cleanup() should have cleaned out the upstream_requests_
968
  // list as appropriate.
969
5.25k
  ASSERT(upstream_requests_.empty());
970
971
5.25k
  retry_state_.reset();
972
5.25k
  if (response_timeout_) {
973
1.20k
    response_timeout_->disableTimer();
974
1.20k
    response_timeout_.reset();
975
1.20k
  }
976
5.25k
}
977
978
absl::optional<absl::string_view> Filter::getShadowCluster(const ShadowPolicy& policy,
979
0
                                                           const Http::HeaderMap& headers) const {
980
0
  if (!policy.cluster().empty()) {
981
0
    return policy.cluster();
982
0
  } else {
983
0
    ASSERT(!policy.clusterHeader().get().empty());
984
0
    const auto entry = headers.get(policy.clusterHeader());
985
0
    if (!entry.empty() && !entry[0]->value().empty()) {
986
0
      return entry[0]->value().getStringView();
987
0
    }
988
0
    ENVOY_STREAM_LOG(debug, "There is no cluster name in header: {}", *callbacks_,
989
0
                     policy.clusterHeader());
990
0
    return absl::nullopt;
991
0
  }
992
0
}
993
994
2.21k
void Filter::maybeDoShadowing() {
995
2.21k
  for (const auto& shadow_policy_wrapper : active_shadow_policies_) {
996
0
    const auto& shadow_policy = shadow_policy_wrapper.get();
997
998
0
    const absl::optional<absl::string_view> shadow_cluster_name =
999
0
        getShadowCluster(shadow_policy, *downstream_headers_);
1000
1001
    // The cluster name got from headers is empty.
1002
0
    if (!shadow_cluster_name.has_value()) {
1003
0
      continue;
1004
0
    }
1005
1006
0
    Http::RequestMessagePtr request(new Http::RequestMessageImpl(
1007
0
        Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_)));
1008
0
    if (callbacks_->decodingBuffer()) {
1009
0
      request->body().add(*callbacks_->decodingBuffer());
1010
0
    }
1011
0
    if (shadow_trailers_) {
1012
0
      request->trailers(Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_));
1013
0
    }
1014
1015
0
    auto options = Http::AsyncClient::RequestOptions()
1016
0
                       .setTimeout(timeout_.global_timeout_)
1017
0
                       .setParentSpan(callbacks_->activeSpan())
1018
0
                       .setChildSpanName("mirror")
1019
0
                       .setSampled(shadow_policy.traceSampled())
1020
0
                       .setIsShadow(true);
1021
0
    options.setFilterConfig(config_);
1022
0
    config_.shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request),
1023
0
                                  options);
1024
0
  }
1025
2.21k
}
1026
1027
2.21k
void Filter::onRequestComplete() {
1028
  // This should be called exactly once, when the downstream request has been received in full.
1029
2.21k
  ASSERT(!downstream_end_stream_);
1030
2.21k
  downstream_end_stream_ = true;
1031
2.21k
  Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1032
2.21k
  downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime();
1033
1034
  // Possible that we got an immediate reset.
1035
2.21k
  if (!upstream_requests_.empty()) {
1036
    // Even if we got an immediate reset, we could still shadow, but that is a riskier change and
1037
    // seems unnecessary right now.
1038
2.21k
    if (!streaming_shadows_) {
1039
2.21k
      maybeDoShadowing();
1040
2.21k
    }
1041
1042
2.21k
    if (timeout_.global_timeout_.count() > 0) {
1043
1.20k
      response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); });
1044
1.20k
      response_timeout_->enableTimer(timeout_.global_timeout_);
1045
1.20k
    }
1046
1047
2.21k
    for (auto& upstream_request : upstream_requests_) {
1048
2.21k
      if (upstream_request->createPerTryTimeoutOnRequestComplete()) {
1049
1.54k
        upstream_request->setupPerTryTimeout();
1050
1.54k
      }
1051
2.21k
    }
1052
2.21k
  }
1053
2.21k
}
1054
1055
3.99k
void Filter::onDestroy() {
1056
  // Reset any in-flight upstream requests.
1057
3.99k
  resetAll();
1058
3.99k
  cleanup();
1059
3.99k
}
1060
1061
0
void Filter::onResponseTimeout() {
1062
0
  ENVOY_STREAM_LOG(debug, "upstream timeout", *callbacks_);
1063
1064
  // Reset any upstream requests that are still in flight.
1065
0
  while (!upstream_requests_.empty()) {
1066
0
    UpstreamRequestPtr upstream_request =
1067
0
        upstream_requests_.back()->removeFromList(upstream_requests_);
1068
1069
    // We want to record the upstream timeouts and increase the stats counters in all the cases.
1070
    // For example, we also want to record the stats in the case of BiDi streaming APIs where we
1071
    // might have already seen the headers.
1072
0
    cluster_->trafficStats()->upstream_rq_timeout_.inc();
1073
0
    if (request_vcluster_) {
1074
0
      request_vcluster_->stats().upstream_rq_timeout_.inc();
1075
0
    }
1076
0
    if (route_stats_context_.has_value()) {
1077
0
      route_stats_context_->stats().upstream_rq_timeout_.inc();
1078
0
    }
1079
1080
0
    if (upstream_request->upstreamHost()) {
1081
0
      upstream_request->upstreamHost()->stats().rq_timeout_.inc();
1082
0
    }
1083
1084
0
    if (upstream_request->awaitingHeaders()) {
1085
0
      if (cluster_->timeoutBudgetStats().has_value()) {
1086
        // Cancel firing per-try timeout information, because the per-try timeout did not come into
1087
        // play when the global timeout was hit.
1088
0
        upstream_request->recordTimeoutBudget(false);
1089
0
      }
1090
1091
      // If this upstream request already hit a "soft" timeout, then it
1092
      // already recorded a timeout into outlier detection. Don't do it again.
1093
0
      if (!upstream_request->outlierDetectionTimeoutRecorded()) {
1094
0
        updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, *upstream_request,
1095
0
                               absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
1096
0
      }
1097
1098
0
      chargeUpstreamAbort(timeout_response_code_, false, *upstream_request);
1099
0
    }
1100
0
    upstream_request->resetStream();
1101
0
  }
1102
1103
0
  onUpstreamTimeoutAbort(StreamInfo::ResponseFlag::UpstreamRequestTimeout,
1104
0
                         StreamInfo::ResponseCodeDetails::get().ResponseTimeout);
1105
0
}
1106
1107
// Called when the per try timeout is hit but we didn't reset the request
1108
// (hedge_on_per_try_timeout enabled).
1109
0
void Filter::onSoftPerTryTimeout(UpstreamRequest& upstream_request) {
1110
  // Track this as a timeout for outlier detection purposes even though we didn't
1111
  // cancel the request yet and might get a 2xx later.
1112
0
  updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request,
1113
0
                         absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
1114
0
  upstream_request.outlierDetectionTimeoutRecorded(true);
1115
1116
0
  if (!downstream_response_started_ && retry_state_) {
1117
0
    RetryStatus retry_status = retry_state_->shouldHedgeRetryPerTryTimeout(
1118
0
        [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_]() -> void {
1119
          // Without any knowledge about what's going on in the connection pool, retry the request
1120
          // with the safest settings which is no early data but keep using or not using alt-svc as
1121
          // before. In this way, QUIC won't be falsely marked as broken.
1122
0
          doRetry(/*can_send_early_data*/ false, can_use_http3, TimeoutRetry::Yes);
1123
0
        });
1124
1125
0
    if (retry_status == RetryStatus::Yes) {
1126
0
      runRetryOptionsPredicates(upstream_request);
1127
0
      pending_retries_++;
1128
1129
      // Don't increment upstream_host->stats().rq_error_ here, we'll do that
1130
      // later if 1) we hit global timeout or 2) we get bad response headers
1131
      // back.
1132
0
      upstream_request.retried(true);
1133
1134
      // TODO: cluster stat for hedge attempted.
1135
0
    } else if (retry_status == RetryStatus::NoOverflow) {
1136
0
      callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
1137
0
    } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
1138
0
      callbacks_->streamInfo().setResponseFlag(
1139
0
          StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded);
1140
0
    }
1141
0
  }
1142
0
}
1143
1144
0
void Filter::onPerTryIdleTimeout(UpstreamRequest& upstream_request) {
1145
0
  onPerTryTimeoutCommon(upstream_request,
1146
0
                        cluster_->trafficStats()->upstream_rq_per_try_idle_timeout_,
1147
0
                        StreamInfo::ResponseCodeDetails::get().UpstreamPerTryIdleTimeout);
1148
0
}
1149
1150
0
void Filter::onPerTryTimeout(UpstreamRequest& upstream_request) {
1151
0
  onPerTryTimeoutCommon(upstream_request, cluster_->trafficStats()->upstream_rq_per_try_timeout_,
1152
0
                        StreamInfo::ResponseCodeDetails::get().UpstreamPerTryTimeout);
1153
0
}
1154
1155
void Filter::onPerTryTimeoutCommon(UpstreamRequest& upstream_request, Stats::Counter& error_counter,
1156
0
                                   const std::string& response_code_details) {
1157
0
  if (hedging_params_.hedge_on_per_try_timeout_) {
1158
0
    onSoftPerTryTimeout(upstream_request);
1159
0
    return;
1160
0
  }
1161
1162
0
  error_counter.inc();
1163
0
  if (upstream_request.upstreamHost()) {
1164
0
    upstream_request.upstreamHost()->stats().rq_timeout_.inc();
1165
0
  }
1166
1167
0
  upstream_request.resetStream();
1168
1169
0
  updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request,
1170
0
                         absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
1171
1172
0
  if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::Yes)) {
1173
0
    return;
1174
0
  }
1175
1176
0
  chargeUpstreamAbort(timeout_response_code_, false, upstream_request);
1177
1178
  // Remove this upstream request from the list now that we're done with it.
1179
0
  upstream_request.removeFromList(upstream_requests_);
1180
0
  onUpstreamTimeoutAbort(StreamInfo::ResponseFlag::UpstreamRequestTimeout, response_code_details);
1181
0
}
1182
1183
0
void Filter::onStreamMaxDurationReached(UpstreamRequest& upstream_request) {
1184
0
  upstream_request.resetStream();
1185
1186
0
  if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::No)) {
1187
0
    return;
1188
0
  }
1189
1190
0
  upstream_request.removeFromList(upstream_requests_);
1191
0
  cleanup();
1192
1193
0
  callbacks_->streamInfo().setResponseFlag(
1194
0
      StreamInfo::ResponseFlag::UpstreamMaxStreamDurationReached);
1195
  // Grab the const ref to call the const method of StreamInfo.
1196
0
  const auto& stream_info = callbacks_->streamInfo();
1197
0
  const bool downstream_decode_complete =
1198
0
      stream_info.downstreamTiming().has_value() &&
1199
0
      stream_info.downstreamTiming().value().get().lastDownstreamRxByteReceived().has_value();
1200
1201
  // sendLocalReply may instead reset the stream if downstream_response_started_ is true.
1202
0
  callbacks_->sendLocalReply(
1203
0
      Http::Utility::maybeRequestTimeoutCode(downstream_decode_complete),
1204
0
      "upstream max stream duration reached", modify_headers_, absl::nullopt,
1205
0
      StreamInfo::ResponseCodeDetails::get().UpstreamMaxStreamDurationReached);
1206
0
}
1207
1208
void Filter::updateOutlierDetection(Upstream::Outlier::Result result,
1209
                                    UpstreamRequest& upstream_request,
1210
74
                                    absl::optional<uint64_t> code) {
1211
74
  if (upstream_request.upstreamHost()) {
1212
74
    upstream_request.upstreamHost()->outlierDetector().putResult(result, code);
1213
74
  }
1214
74
}
1215
1216
74
void Filter::chargeUpstreamAbort(Http::Code code, bool dropped, UpstreamRequest& upstream_request) {
1217
74
  if (downstream_response_started_) {
1218
16
    if (upstream_request.grpcRqSuccessDeferred()) {
1219
14
      upstream_request.upstreamHost()->stats().rq_error_.inc();
1220
14
      stats_.rq_reset_after_downstream_response_started_.inc();
1221
14
    }
1222
58
  } else {
1223
58
    Upstream::HostDescriptionConstSharedPtr upstream_host = upstream_request.upstreamHost();
1224
1225
58
    chargeUpstreamCode(code, upstream_host, dropped);
1226
    // If we had non-5xx but still have been reset by backend or timeout before
1227
    // starting response, we treat this as an error. We only get non-5xx when
1228
    // timeout_response_code_ is used for code above, where this member can
1229
    // assume values such as 204 (NoContent).
1230
58
    if (upstream_host != nullptr && !Http::CodeUtility::is5xx(enumToInt(code))) {
1231
0
      upstream_host->stats().rq_error_.inc();
1232
0
    }
1233
58
  }
1234
74
}
1235
1236
void Filter::onUpstreamTimeoutAbort(StreamInfo::ResponseFlag response_flags,
1237
0
                                    absl::string_view details) {
1238
0
  Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats();
1239
0
  if (tb_stats.has_value()) {
1240
0
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1241
0
    std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
1242
0
        dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
1243
1244
0
    tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue(
1245
0
        FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_));
1246
0
  }
1247
1248
0
  const absl::string_view body =
1249
0
      timeout_response_code_ == Http::Code::GatewayTimeout ? "upstream request timeout" : "";
1250
0
  onUpstreamAbort(timeout_response_code_, response_flags, body, false, details);
1251
0
}
1252
1253
void Filter::onUpstreamAbort(Http::Code code, StreamInfo::ResponseFlag response_flags,
1254
74
                             absl::string_view body, bool dropped, absl::string_view details) {
1255
  // If we have not yet sent anything downstream, send a response with an appropriate status code.
1256
  // Otherwise just reset the ongoing response.
1257
74
  callbacks_->streamInfo().setResponseFlag(response_flags);
1258
  // This will destroy any created retry timers.
1259
74
  cleanup();
1260
  // sendLocalReply may instead reset the stream if downstream_response_started_ is true.
1261
74
  callbacks_->sendLocalReply(
1262
74
      code, body,
1263
74
      [dropped, this](Http::ResponseHeaderMap& headers) {
1264
58
        if (dropped && !config_.suppress_envoy_headers_) {
1265
0
          headers.addReference(Http::Headers::get().EnvoyOverloaded,
1266
0
                               Http::Headers::get().EnvoyOverloadedValues.True);
1267
0
        }
1268
58
        modify_headers_(headers);
1269
58
      },
1270
74
      absl::nullopt, details);
1271
74
}
1272
1273
bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason,
1274
74
                             UpstreamRequest& upstream_request, TimeoutRetry is_timeout_retry) {
1275
  // We don't retry if we already started the response, don't have a retry policy defined,
1276
  // or if we've already retried this upstream request (currently only possible if a per
1277
  // try timeout occurred and hedge_on_per_try_timeout is enabled).
1278
74
  if (downstream_response_started_ || !retry_state_ || upstream_request.retried()) {
1279
74
    return false;
1280
74
  }
1281
0
  RetryState::Http3Used was_using_http3 = RetryState::Http3Used::Unknown;
1282
0
  if (upstream_request.hadUpstream()) {
1283
0
    was_using_http3 = (upstream_request.streamInfo().protocol().has_value() &&
1284
0
                       upstream_request.streamInfo().protocol().value() == Http::Protocol::Http3)
1285
0
                          ? RetryState::Http3Used::Yes
1286
0
                          : RetryState::Http3Used::No;
1287
0
  }
1288
0
  const RetryStatus retry_status = retry_state_->shouldRetryReset(
1289
0
      reset_reason, was_using_http3,
1290
0
      [this, can_send_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_,
1291
0
       can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
1292
0
       is_timeout_retry](bool disable_http3) -> void {
1293
        // This retry might be because of ConnectionFailure of 0-RTT handshake. In this case, though
1294
        // the original request is retried with the same can_send_early_data setting, it will not be
1295
        // sent as early data by the underlying connection pool grid.
1296
0
        doRetry(can_send_early_data, disable_http3 ? false : can_use_http3, is_timeout_retry);
1297
0
      });
1298
0
  if (retry_status == RetryStatus::Yes) {
1299
0
    runRetryOptionsPredicates(upstream_request);
1300
0
    pending_retries_++;
1301
1302
0
    if (upstream_request.upstreamHost()) {
1303
0
      upstream_request.upstreamHost()->stats().rq_error_.inc();
1304
0
    }
1305
1306
0
    auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1307
0
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1308
0
    return true;
1309
0
  } else if (retry_status == RetryStatus::NoOverflow) {
1310
0
    callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
1311
0
  } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
1312
0
    callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded);
1313
0
  }
1314
1315
0
  return false;
1316
0
}
1317
1318
void Filter::onUpstreamReset(Http::StreamResetReason reset_reason,
1319
                             absl::string_view transport_failure_reason,
1320
74
                             UpstreamRequest& upstream_request) {
1321
74
  ENVOY_STREAM_LOG(debug, "upstream reset: reset reason: {}, transport failure reason: {}",
1322
74
                   *callbacks_, Http::Utility::resetReasonToString(reset_reason),
1323
74
                   transport_failure_reason);
1324
1325
74
  const bool dropped = reset_reason == Http::StreamResetReason::Overflow;
1326
1327
  // Ignore upstream reset caused by a resource overflow.
1328
  // Currently, circuit breakers can only produce this reset reason.
1329
  // It means that this reason is cluster-wise, not upstream-related.
1330
  // Therefore removing an upstream in the case of an overloaded cluster
1331
  // would make the situation even worse.
1332
  // https://github.com/envoyproxy/envoy/issues/25487
1333
74
  if (!dropped) {
1334
    // TODO: The reset may also come from upstream over the wire. In this case it should be
1335
    // treated as external origin error and distinguished from local origin error.
1336
    // This matters only when running OutlierDetection with split_external_local_origin_errors
1337
    // config param set to true.
1338
74
    updateOutlierDetection(Upstream::Outlier::Result::LocalOriginConnectFailed, upstream_request,
1339
74
                           absl::nullopt);
1340
74
  }
1341
1342
74
  if (maybeRetryReset(reset_reason, upstream_request, TimeoutRetry::No)) {
1343
0
    return;
1344
0
  }
1345
1346
74
  const Http::Code error_code = (reset_reason == Http::StreamResetReason::ProtocolError)
1347
74
                                    ? Http::Code::BadGateway
1348
74
                                    : Http::Code::ServiceUnavailable;
1349
74
  chargeUpstreamAbort(error_code, dropped, upstream_request);
1350
74
  auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1351
74
  callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1352
1353
  // If there are other in-flight requests that might see an upstream response,
1354
  // don't return anything downstream.
1355
74
  if (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0) {
1356
0
    return;
1357
0
  }
1358
1359
74
  const StreamInfo::ResponseFlag response_flags = streamResetReasonToResponseFlag(reset_reason);
1360
1361
74
  const std::string body =
1362
74
      absl::StrCat("upstream connect error or disconnect/reset before headers. ",
1363
74
                   (is_retry_ ? "retried and the latest " : ""),
1364
74
                   "reset reason: ", Http::Utility::resetReasonToString(reset_reason),
1365
74
                   !transport_failure_reason.empty() ? ", transport failure reason: " : "",
1366
74
                   transport_failure_reason);
1367
74
  const std::string& basic_details =
1368
74
      downstream_response_started_ ? StreamInfo::ResponseCodeDetails::get().LateUpstreamReset
1369
74
                                   : StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset;
1370
74
  const std::string details = StringUtil::replaceAllEmptySpace(absl::StrCat(
1371
74
      basic_details, "{", Http::Utility::resetReasonToString(reset_reason),
1372
74
      transport_failure_reason.empty() ? "" : absl::StrCat(",", transport_failure_reason), "}"));
1373
74
  onUpstreamAbort(error_code, response_flags, body, dropped, details);
1374
74
}
1375
1376
void Filter::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
1377
2.32k
                                    bool pool_success) {
1378
2.32k
  if (retry_state_ && host) {
1379
6
    retry_state_->onHostAttempted(host);
1380
6
  }
1381
1382
2.32k
  if (!pool_success) {
1383
0
    return;
1384
0
  }
1385
1386
2.32k
  if (request_vcluster_) {
1387
    // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady
1388
    // callback. Hence, the upstream request increases the virtual cluster's upstream_rq_total_ stat
1389
    // here.
1390
6
    request_vcluster_->stats().upstream_rq_total_.inc();
1391
6
  }
1392
2.32k
  if (route_stats_context_.has_value()) {
1393
    // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady
1394
    // callback. Hence, the upstream request increases the route level upstream_rq_total_ stat
1395
    // here.
1396
0
    route_stats_context_->stats().upstream_rq_total_.inc();
1397
0
  }
1398
2.32k
}
1399
1400
StreamInfo::ResponseFlag
1401
148
Filter::streamResetReasonToResponseFlag(Http::StreamResetReason reset_reason) {
1402
148
  switch (reset_reason) {
1403
0
  case Http::StreamResetReason::LocalConnectionFailure:
1404
0
  case Http::StreamResetReason::RemoteConnectionFailure:
1405
0
  case Http::StreamResetReason::ConnectionTimeout:
1406
0
    return StreamInfo::ResponseFlag::UpstreamConnectionFailure;
1407
40
  case Http::StreamResetReason::ConnectionTermination:
1408
40
    return StreamInfo::ResponseFlag::UpstreamConnectionTermination;
1409
0
  case Http::StreamResetReason::LocalReset:
1410
0
  case Http::StreamResetReason::LocalRefusedStreamReset:
1411
0
    return StreamInfo::ResponseFlag::LocalReset;
1412
0
  case Http::StreamResetReason::Overflow:
1413
0
    return StreamInfo::ResponseFlag::UpstreamOverflow;
1414
0
  case Http::StreamResetReason::RemoteReset:
1415
2
  case Http::StreamResetReason::RemoteRefusedStreamReset:
1416
2
  case Http::StreamResetReason::ConnectError:
1417
2
    return StreamInfo::ResponseFlag::UpstreamRemoteReset;
1418
106
  case Http::StreamResetReason::ProtocolError:
1419
106
    return StreamInfo::ResponseFlag::UpstreamProtocolError;
1420
0
  case Http::StreamResetReason::OverloadManager:
1421
0
    return StreamInfo::ResponseFlag::OverloadManager;
1422
148
  }
1423
1424
0
  PANIC_DUE_TO_CORRUPT_ENUM;
1425
0
}
1426
1427
void Filter::handleNon5xxResponseHeaders(absl::optional<Grpc::Status::GrpcStatus> grpc_status,
1428
                                         UpstreamRequest& upstream_request, bool end_stream,
1429
2.22k
                                         uint64_t grpc_to_http_status) {
1430
  // We need to defer gRPC success until after we have processed grpc-status in
1431
  // the trailers.
1432
2.22k
  if (grpc_request_) {
1433
1.12k
    if (end_stream) {
1434
93
      if (grpc_status && !Http::CodeUtility::is5xx(grpc_to_http_status)) {
1435
93
        upstream_request.upstreamHost()->stats().rq_success_.inc();
1436
93
      } else {
1437
0
        upstream_request.upstreamHost()->stats().rq_error_.inc();
1438
0
      }
1439
1.02k
    } else {
1440
1.02k
      upstream_request.grpcRqSuccessDeferred(true);
1441
1.02k
    }
1442
1.12k
  } else {
1443
1.10k
    upstream_request.upstreamHost()->stats().rq_success_.inc();
1444
1.10k
  }
1445
2.22k
}
1446
1447
void Filter::onUpstream1xxHeaders(Http::ResponseHeaderMapPtr&& headers,
1448
1
                                  UpstreamRequest& upstream_request) {
1449
1
  const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
1450
1
  chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false);
1451
1
  ENVOY_STREAM_LOG(debug, "upstream 1xx ({}).", *callbacks_, response_code);
1452
1453
1
  downstream_response_started_ = true;
1454
1
  final_upstream_request_ = &upstream_request;
1455
1
  resetOtherUpstreams(upstream_request);
1456
1457
  // Don't send retries after 100-Continue has been sent on. Arguably we could attempt to do a
1458
  // retry, assume the next upstream would also send an 100-Continue and swallow the second one
1459
  // but it's sketchy (as the subsequent upstream might not send a 100-Continue) and not worth
1460
  // the complexity until someone asks for it.
1461
1
  retry_state_.reset();
1462
1463
1
  callbacks_->encode1xxHeaders(std::move(headers));
1464
1
}
1465
1466
3.99k
void Filter::resetAll() {
1467
5.14k
  while (!upstream_requests_.empty()) {
1468
1.14k
    auto request_ptr = upstream_requests_.back()->removeFromList(upstream_requests_);
1469
1.14k
    request_ptr->resetStream();
1470
1.14k
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1471
1.14k
  }
1472
3.99k
}
1473
1474
2.22k
void Filter::resetOtherUpstreams(UpstreamRequest& upstream_request) {
1475
  // Pop each upstream request on the list and reset it if it's not the one
1476
  // provided. At the end we'll move it back into the list.
1477
2.22k
  UpstreamRequestPtr final_upstream_request;
1478
4.44k
  while (!upstream_requests_.empty()) {
1479
2.22k
    UpstreamRequestPtr upstream_request_tmp =
1480
2.22k
        upstream_requests_.back()->removeFromList(upstream_requests_);
1481
2.22k
    if (upstream_request_tmp.get() != &upstream_request) {
1482
0
      upstream_request_tmp->resetStream();
1483
      // TODO: per-host stat for hedge abandoned.
1484
      // TODO: cluster stat for hedge abandoned.
1485
2.22k
    } else {
1486
2.22k
      final_upstream_request = std::move(upstream_request_tmp);
1487
2.22k
    }
1488
2.22k
  }
1489
1490
2.22k
  ASSERT(final_upstream_request);
1491
  // Now put the final request back on this list.
1492
2.22k
  LinkedList::moveIntoList(std::move(final_upstream_request), upstream_requests_);
1493
2.22k
}
1494
1495
void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
1496
2.22k
                               UpstreamRequest& upstream_request, bool end_stream) {
1497
2.22k
  ENVOY_STREAM_LOG(debug, "upstream headers complete: end_stream={}", *callbacks_, end_stream);
1498
1499
2.22k
  modify_headers_(*headers);
1500
  // When grpc-status appears in response headers, convert grpc-status to HTTP status code
1501
  // for outlier detection. This does not currently change any stats or logging and does not
1502
  // handle the case when an error grpc-status is sent as a trailer.
1503
2.22k
  absl::optional<Grpc::Status::GrpcStatus> grpc_status;
1504
2.22k
  uint64_t grpc_to_http_status = 0;
1505
2.22k
  if (grpc_request_) {
1506
1.12k
    grpc_status = Grpc::Common::getGrpcStatus(*headers);
1507
1.12k
    if (grpc_status.has_value()) {
1508
93
      grpc_to_http_status = Grpc::Utility::grpcToHttpStatus(grpc_status.value());
1509
93
    }
1510
1.12k
  }
1511
1512
2.22k
  if (grpc_status.has_value()) {
1513
93
    upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(grpc_to_http_status);
1514
2.12k
  } else {
1515
2.12k
    upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(response_code);
1516
2.12k
  }
1517
1518
2.22k
  if (headers->EnvoyImmediateHealthCheckFail() != nullptr) {
1519
0
    upstream_request.upstreamHost()->healthChecker().setUnhealthy(
1520
0
        Upstream::HealthCheckHostMonitor::UnhealthyType::ImmediateHealthCheckFail);
1521
0
  }
1522
1523
2.22k
  bool could_not_retry = false;
1524
1525
  // Check if this upstream request was already retried, for instance after
1526
  // hitting a per try timeout. Don't retry it if we already have.
1527
2.22k
  if (retry_state_) {
1528
0
    if (upstream_request.retried()) {
1529
      // We already retried this request (presumably for a per try timeout) so
1530
      // we definitely won't retry it again. Check if we would have retried it
1531
      // if we could.
1532
0
      bool retry_as_early_data; // Not going to be used as we are not retrying.
1533
0
      could_not_retry = retry_state_->wouldRetryFromHeaders(*headers, *downstream_headers_,
1534
0
                                                            retry_as_early_data) !=
1535
0
                        RetryState::RetryDecision::NoRetry;
1536
0
    } else {
1537
0
      const RetryStatus retry_status = retry_state_->shouldRetryHeaders(
1538
0
          *headers, *downstream_headers_,
1539
0
          [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
1540
0
           had_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_](
1541
0
              bool disable_early_data) -> void {
1542
0
            doRetry((disable_early_data ? false : had_early_data), can_use_http3, TimeoutRetry::No);
1543
0
          });
1544
0
      if (retry_status == RetryStatus::Yes) {
1545
0
        runRetryOptionsPredicates(upstream_request);
1546
0
        pending_retries_++;
1547
0
        upstream_request.upstreamHost()->stats().rq_error_.inc();
1548
0
        Http::CodeStats& code_stats = httpContext().codeStats();
1549
0
        code_stats.chargeBasicResponseStat(cluster_->statsScope(), stats_.stat_names_.retry_,
1550
0
                                           static_cast<Http::Code>(response_code),
1551
0
                                           exclude_http_code_stats_);
1552
1553
0
        if (!end_stream || !upstream_request.encodeComplete()) {
1554
0
          upstream_request.resetStream();
1555
0
        }
1556
0
        auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1557
0
        callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1558
0
        return;
1559
0
      } else if (retry_status == RetryStatus::NoOverflow) {
1560
0
        callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
1561
0
        could_not_retry = true;
1562
0
      } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
1563
0
        callbacks_->streamInfo().setResponseFlag(
1564
0
            StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded);
1565
0
        could_not_retry = true;
1566
0
      }
1567
0
    }
1568
0
  }
1569
1570
2.22k
  if (route_entry_->internalRedirectPolicy().enabled() &&
1571
2.22k
      route_entry_->internalRedirectPolicy().shouldRedirectForResponseCode(
1572
0
          static_cast<Http::Code>(response_code)) &&
1573
2.22k
      setupRedirect(*headers)) {
1574
0
    return;
1575
    // If the redirect could not be handled, fail open and let it pass to the
1576
    // next downstream.
1577
0
  }
1578
1579
  // Check if we got a "bad" response, but there are still upstream requests in
1580
  // flight awaiting headers or scheduled retries. If so, exit to give them a
1581
  // chance to return before returning a response downstream.
1582
2.22k
  if (could_not_retry && (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0)) {
1583
0
    upstream_request.upstreamHost()->stats().rq_error_.inc();
1584
1585
    // Reset the stream because there are other in-flight requests that we'll
1586
    // wait around for and we're not interested in consuming any body/trailers.
1587
0
    auto request_ptr = upstream_request.removeFromList(upstream_requests_);
1588
0
    request_ptr->resetStream();
1589
0
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
1590
0
    return;
1591
0
  }
1592
1593
  // Make sure any retry timers are destroyed since we may not call cleanup() if end_stream is
1594
  // false.
1595
2.22k
  if (retry_state_) {
1596
0
    retry_state_.reset();
1597
0
  }
1598
1599
  // Only send upstream service time if we received the complete request and this is not a
1600
  // premature response.
1601
2.22k
  if (DateUtil::timePointValid(downstream_request_complete_time_)) {
1602
1.09k
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1603
1.09k
    MonotonicTime response_received_time = dispatcher.timeSource().monotonicTime();
1604
1.09k
    std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1605
1.09k
        response_received_time - downstream_request_complete_time_);
1606
1.09k
    if (!config_.suppress_envoy_headers_) {
1607
1.09k
      headers->setEnvoyUpstreamServiceTime(ms.count());
1608
1.09k
    }
1609
1.09k
  }
1610
1611
2.22k
  upstream_request.upstreamCanary(
1612
2.22k
      (headers->EnvoyUpstreamCanary() && headers->EnvoyUpstreamCanary()->value() == "true") ||
1613
2.22k
      upstream_request.upstreamHost()->canary());
1614
2.22k
  chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false);
1615
2.22k
  if (!Http::CodeUtility::is5xx(response_code)) {
1616
2.22k
    handleNon5xxResponseHeaders(grpc_status, upstream_request, end_stream, grpc_to_http_status);
1617
2.22k
  }
1618
1619
  // Append routing cookies
1620
2.22k
  for (const auto& header_value : downstream_set_cookies_) {
1621
0
    headers->addReferenceKey(Http::Headers::get().SetCookie, header_value);
1622
0
  }
1623
1624
2.22k
  callbacks_->streamInfo().setResponseCodeDetails(
1625
2.22k
      StreamInfo::ResponseCodeDetails::get().ViaUpstream);
1626
1627
2.22k
  if (Runtime::runtimeFeatureEnabled(
1628
2.22k
          "envoy.reloadable_features.copy_response_code_to_downstream_stream_info")) {
1629
2.22k
    callbacks_->streamInfo().setResponseCode(response_code);
1630
2.22k
  }
1631
1632
  // TODO(zuercher): If access to response_headers_to_add (at any level) is ever needed outside
1633
  // Router::Filter we'll need to find a better location for this work. One possibility is to
1634
  // provide finalizeResponseHeaders functions on the Router::Config and VirtualHost interfaces.
1635
2.22k
  route_entry_->finalizeResponseHeaders(*headers, callbacks_->streamInfo());
1636
1637
2.22k
  downstream_response_started_ = true;
1638
2.22k
  final_upstream_request_ = &upstream_request;
1639
  // Make sure that for request hedging, we end up with the correct final upstream info.
1640
2.22k
  callbacks_->streamInfo().setUpstreamInfo(final_upstream_request_->streamInfo().upstreamInfo());
1641
2.22k
  resetOtherUpstreams(upstream_request);
1642
2.22k
  if (end_stream) {
1643
110
    onUpstreamComplete(upstream_request);
1644
110
  }
1645
1646
2.22k
  callbacks_->encodeHeaders(std::move(headers), end_stream,
1647
2.22k
                            StreamInfo::ResponseCodeDetails::get().ViaUpstream);
1648
2.22k
}
1649
1650
void Filter::onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request,
1651
4.28k
                            bool end_stream) {
1652
  // This should be true because when we saw headers we either reset the stream
1653
  // (hence wouldn't have made it to onUpstreamData) or all other in-flight
1654
  // streams.
1655
4.28k
  ASSERT(upstream_requests_.size() == 1);
1656
4.28k
  if (end_stream) {
1657
    // gRPC request termination without trailers is an error.
1658
1.06k
    if (upstream_request.grpcRqSuccessDeferred()) {
1659
0
      upstream_request.upstreamHost()->stats().rq_error_.inc();
1660
0
    }
1661
1.06k
    onUpstreamComplete(upstream_request);
1662
1.06k
  }
1663
1664
4.28k
  callbacks_->encodeData(data, end_stream);
1665
4.28k
}
1666
1667
void Filter::onUpstreamTrailers(Http::ResponseTrailerMapPtr&& trailers,
1668
3
                                UpstreamRequest& upstream_request) {
1669
  // This should be true because when we saw headers we either reset the stream
1670
  // (hence wouldn't have made it to onUpstreamTrailers) or all other in-flight
1671
  // streams.
1672
3
  ASSERT(upstream_requests_.size() == 1);
1673
1674
3
  if (upstream_request.grpcRqSuccessDeferred()) {
1675
3
    absl::optional<Grpc::Status::GrpcStatus> grpc_status = Grpc::Common::getGrpcStatus(*trailers);
1676
3
    if (grpc_status &&
1677
3
        !Http::CodeUtility::is5xx(Grpc::Utility::grpcToHttpStatus(grpc_status.value()))) {
1678
3
      upstream_request.upstreamHost()->stats().rq_success_.inc();
1679
3
    } else {
1680
0
      upstream_request.upstreamHost()->stats().rq_error_.inc();
1681
0
    }
1682
3
  }
1683
1684
3
  onUpstreamComplete(upstream_request);
1685
1686
3
  callbacks_->encodeTrailers(std::move(trailers));
1687
3
}
1688
1689
181
void Filter::onUpstreamMetadata(Http::MetadataMapPtr&& metadata_map) {
1690
181
  callbacks_->encodeMetadata(std::move(metadata_map));
1691
181
}
1692
1693
1.18k
void Filter::onUpstreamComplete(UpstreamRequest& upstream_request) {
1694
1.18k
  if (!downstream_end_stream_) {
1695
99
    upstream_request.resetStream();
1696
99
  }
1697
1.18k
  Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1698
1.18k
  std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
1699
1.18k
      dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
1700
1701
1.18k
  Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats();
1702
1.18k
  if (tb_stats.has_value()) {
1703
0
    tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue(
1704
0
        FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_));
1705
0
  }
1706
1707
1.18k
  if (config_.emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck() &&
1708
1.18k
      DateUtil::timePointValid(downstream_request_complete_time_)) {
1709
1.08k
    upstream_request.upstreamHost()->outlierDetector().putResponseTime(response_time);
1710
1.08k
    const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_);
1711
1712
1.08k
    Http::CodeStats& code_stats = httpContext().codeStats();
1713
1.08k
    Http::CodeStats::ResponseTimingInfo info{
1714
1.08k
        config_.scope_,
1715
1.08k
        cluster_->statsScope(),
1716
1.08k
        config_.empty_stat_name_,
1717
1.08k
        response_time,
1718
1.08k
        upstream_request.upstreamCanary(),
1719
1.08k
        internal_request,
1720
1.08k
        route_entry_->virtualHost().statName(),
1721
1.08k
        request_vcluster_ ? request_vcluster_->statName() : config_.empty_stat_name_,
1722
1.08k
        route_stats_context_.has_value() ? route_stats_context_->statName()
1723
1.08k
                                         : config_.empty_stat_name_,
1724
1.08k
        config_.zone_name_,
1725
1.08k
        upstreamZone(upstream_request.upstreamHost())};
1726
1727
1.08k
    code_stats.chargeResponseTiming(info);
1728
1729
1.08k
    if (alt_stat_prefix_ != nullptr) {
1730
0
      Http::CodeStats::ResponseTimingInfo info{config_.scope_,
1731
0
                                               cluster_->statsScope(),
1732
0
                                               alt_stat_prefix_->statName(),
1733
0
                                               response_time,
1734
0
                                               upstream_request.upstreamCanary(),
1735
0
                                               internal_request,
1736
0
                                               config_.empty_stat_name_,
1737
0
                                               config_.empty_stat_name_,
1738
0
                                               config_.empty_stat_name_,
1739
0
                                               config_.zone_name_,
1740
0
                                               upstreamZone(upstream_request.upstreamHost())};
1741
1742
0
      code_stats.chargeResponseTiming(info);
1743
0
    }
1744
1.08k
  }
1745
1746
  // Defer deletion as this is generally called under the stack of the upstream
1747
  // request, and immediate deletion is dangerous.
1748
1.18k
  callbacks_->dispatcher().deferredDelete(upstream_request.removeFromList(upstream_requests_));
1749
1.18k
  cleanup();
1750
1.18k
}
1751
1752
0
bool Filter::setupRedirect(const Http::ResponseHeaderMap& headers) {
1753
0
  ENVOY_STREAM_LOG(debug, "attempting internal redirect", *callbacks_);
1754
0
  const Http::HeaderEntry* location = headers.Location();
1755
1756
0
  const uint64_t status_code = Http::Utility::getResponseStatus(headers);
1757
1758
  // Redirects are not supported for streaming requests yet.
1759
0
  if (downstream_end_stream_ && (!request_buffer_overflowed_ || !callbacks_->decodingBuffer()) &&
1760
0
      location != nullptr &&
1761
0
      convertRequestHeadersForInternalRedirect(*downstream_headers_, *location, status_code) &&
1762
0
      callbacks_->recreateStream(&headers)) {
1763
0
    ENVOY_STREAM_LOG(debug, "Internal redirect succeeded", *callbacks_);
1764
0
    cluster_->trafficStats()->upstream_internal_redirect_succeeded_total_.inc();
1765
0
    return true;
1766
0
  }
1767
  // convertRequestHeadersForInternalRedirect logs failure reasons but log
1768
  // details for other failure modes here.
1769
0
  if (!downstream_end_stream_) {
1770
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: request incomplete", *callbacks_);
1771
0
  } else if (request_buffer_overflowed_) {
1772
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: request body overflow", *callbacks_);
1773
0
  } else if (location == nullptr) {
1774
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: missing location header", *callbacks_);
1775
0
  }
1776
1777
0
  cluster_->trafficStats()->upstream_internal_redirect_failed_total_.inc();
1778
0
  return false;
1779
0
}
1780
1781
bool Filter::convertRequestHeadersForInternalRedirect(Http::RequestHeaderMap& downstream_headers,
1782
                                                      const Http::HeaderEntry& internal_redirect,
1783
0
                                                      uint64_t status_code) {
1784
0
  if (!downstream_headers.Path()) {
1785
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: no path in downstream_headers", *callbacks_);
1786
0
    return false;
1787
0
  }
1788
1789
0
  absl::string_view redirect_url = internal_redirect.value().getStringView();
1790
  // Make sure the redirect response contains a URL to redirect to.
1791
0
  if (redirect_url.empty()) {
1792
0
    stats_.passthrough_internal_redirect_bad_location_.inc();
1793
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: empty location", *callbacks_);
1794
0
    return false;
1795
0
  }
1796
0
  Http::Utility::Url absolute_url;
1797
0
  if (!absolute_url.initialize(redirect_url, false)) {
1798
0
    stats_.passthrough_internal_redirect_bad_location_.inc();
1799
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: invalid location {}", *callbacks_,
1800
0
                     redirect_url);
1801
0
    return false;
1802
0
  }
1803
1804
0
  const auto& policy = route_entry_->internalRedirectPolicy();
1805
  // Don't change the scheme from the original request
1806
0
  const bool scheme_is_http = schemeIsHttp(downstream_headers, callbacks_->connection());
1807
0
  const bool target_is_http = Http::Utility::schemeIsHttp(absolute_url.scheme());
1808
0
  if (!policy.isCrossSchemeRedirectAllowed() && scheme_is_http != target_is_http) {
1809
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: incorrect scheme for {}", *callbacks_,
1810
0
                     redirect_url);
1811
0
    stats_.passthrough_internal_redirect_unsafe_scheme_.inc();
1812
0
    return false;
1813
0
  }
1814
1815
0
  const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
1816
  // Make sure that performing the redirect won't result in exceeding the configured number of
1817
  // redirects allowed for this route.
1818
0
  StreamInfo::UInt32Accessor* num_internal_redirect{};
1819
1820
0
  if (num_internal_redirect = filter_state->getDataMutable<StreamInfo::UInt32Accessor>(
1821
0
          NumInternalRedirectsFilterStateName);
1822
0
      num_internal_redirect == nullptr) {
1823
0
    auto state = std::make_shared<StreamInfo::UInt32AccessorImpl>(0);
1824
0
    num_internal_redirect = state.get();
1825
1826
0
    filter_state->setData(NumInternalRedirectsFilterStateName, std::move(state),
1827
0
                          StreamInfo::FilterState::StateType::Mutable,
1828
0
                          StreamInfo::FilterState::LifeSpan::Request);
1829
0
  }
1830
1831
0
  if (num_internal_redirect->value() >= policy.maxInternalRedirects()) {
1832
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: redirect limits exceeded.", *callbacks_);
1833
0
    stats_.passthrough_internal_redirect_too_many_redirects_.inc();
1834
0
    return false;
1835
0
  }
1836
  // Copy the old values, so they can be restored if the redirect fails.
1837
0
  const std::string original_host(downstream_headers.getHostValue());
1838
0
  const std::string original_path(downstream_headers.getPathValue());
1839
0
  const bool scheme_is_set = (downstream_headers.Scheme() != nullptr);
1840
0
  Cleanup restore_original_headers(
1841
0
      [&downstream_headers, original_host, original_path, scheme_is_set, scheme_is_http]() {
1842
0
        downstream_headers.setHost(original_host);
1843
0
        downstream_headers.setPath(original_path);
1844
0
        if (scheme_is_set) {
1845
0
          downstream_headers.setScheme(scheme_is_http ? Http::Headers::get().SchemeValues.Http
1846
0
                                                      : Http::Headers::get().SchemeValues.Https);
1847
0
        }
1848
0
      });
1849
1850
  // Replace the original host, scheme and path.
1851
0
  downstream_headers.setScheme(absolute_url.scheme());
1852
0
  downstream_headers.setHost(absolute_url.hostAndPort());
1853
1854
0
  auto path_and_query = absolute_url.pathAndQueryParams();
1855
0
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http_reject_path_with_fragment")) {
1856
    // Envoy treats internal redirect as a new request and will reject it if URI path
1857
    // contains #fragment. However the Location header is allowed to have #fragment in URI path. To
1858
    // prevent Envoy from rejecting internal redirect, strip the #fragment from Location URI if it
1859
    // is present.
1860
0
    auto fragment_pos = path_and_query.find('#');
1861
0
    path_and_query = path_and_query.substr(0, fragment_pos);
1862
0
  }
1863
0
  downstream_headers.setPath(path_and_query);
1864
1865
  // Only clear the route cache if there are downstream callbacks. There aren't, for example,
1866
  // for async connections.
1867
0
  if (callbacks_->downstreamCallbacks()) {
1868
0
    callbacks_->downstreamCallbacks()->clearRouteCache();
1869
0
  }
1870
0
  const auto route = callbacks_->route();
1871
  // Don't allow a redirect to a non existing route.
1872
0
  if (!route) {
1873
0
    stats_.passthrough_internal_redirect_no_route_.inc();
1874
0
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: no route found", *callbacks_);
1875
0
    return false;
1876
0
  }
1877
1878
0
  const auto& route_name = route->routeName();
1879
0
  for (const auto& predicate : policy.predicates()) {
1880
0
    if (!predicate->acceptTargetRoute(*filter_state, route_name, !scheme_is_http,
1881
0
                                      !target_is_http)) {
1882
0
      stats_.passthrough_internal_redirect_predicate_.inc();
1883
0
      ENVOY_STREAM_LOG(trace,
1884
0
                       "Internal redirect failed: rejecting redirect targeting {}, by {} predicate",
1885
0
                       *callbacks_, route_name, predicate->name());
1886
0
      return false;
1887
0
    }
1888
0
  }
1889
1890
  // See https://tools.ietf.org/html/rfc7231#section-6.4.4.
1891
0
  if (status_code == enumToInt(Http::Code::SeeOther) &&
1892
0
      downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Get &&
1893
0
      downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Head) {
1894
0
    downstream_headers.setMethod(Http::Headers::get().MethodValues.Get);
1895
0
    downstream_headers.remove(Http::Headers::get().ContentLength);
1896
0
    callbacks_->modifyDecodingBuffer([](Buffer::Instance& data) { data.drain(data.length()); });
1897
0
  }
1898
1899
0
  num_internal_redirect->increment();
1900
0
  restore_original_headers.cancel();
1901
  // Preserve the original request URL for the second pass.
1902
0
  downstream_headers.setEnvoyOriginalUrl(absl::StrCat(scheme_is_http
1903
0
                                                          ? Http::Headers::get().SchemeValues.Http
1904
0
                                                          : Http::Headers::get().SchemeValues.Https,
1905
0
                                                      "://", original_host, original_path));
1906
0
  return true;
1907
0
}
1908
1909
0
void Filter::runRetryOptionsPredicates(UpstreamRequest& retriable_request) {
1910
0
  for (const auto& options_predicate : route_entry_->retryPolicy().retryOptionsPredicates()) {
1911
0
    const Upstream::RetryOptionsPredicate::UpdateOptionsParameters parameters{
1912
0
        retriable_request.streamInfo(), upstreamSocketOptions()};
1913
0
    auto ret = options_predicate->updateOptions(parameters);
1914
0
    if (ret.new_upstream_socket_options_.has_value()) {
1915
0
      upstream_options_ = ret.new_upstream_socket_options_.value();
1916
0
    }
1917
0
  }
1918
0
}
1919
1920
0
void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry) {
1921
0
  ENVOY_STREAM_LOG(debug, "performing retry", *callbacks_);
1922
1923
0
  is_retry_ = true;
1924
0
  attempt_count_++;
1925
0
  callbacks_->streamInfo().setAttemptCount(attempt_count_);
1926
0
  ASSERT(pending_retries_ > 0);
1927
0
  pending_retries_--;
1928
1929
  // Clusters can technically get removed by CDS during a retry. Make sure it still exists.
1930
0
  const auto cluster = config_.cm_.getThreadLocalCluster(route_entry_->clusterName());
1931
0
  std::unique_ptr<GenericConnPool> generic_conn_pool;
1932
0
  if (cluster != nullptr) {
1933
0
    cluster_ = cluster->info();
1934
0
    generic_conn_pool = createConnPool(*cluster);
1935
0
  }
1936
1937
0
  if (!generic_conn_pool) {
1938
0
    sendNoHealthyUpstreamResponse();
1939
0
    cleanup();
1940
0
    return;
1941
0
  }
1942
0
  UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
1943
0
      *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3);
1944
1945
0
  if (include_attempt_count_in_request_) {
1946
0
    downstream_headers_->setEnvoyAttemptCount(attempt_count_);
1947
0
  }
1948
1949
0
  if (include_timeout_retry_header_in_request_) {
1950
0
    downstream_headers_->setEnvoyIsTimeoutRetry(is_timeout_retry == TimeoutRetry::Yes ? "true"
1951
0
                                                                                      : "false");
1952
0
  }
1953
1954
  // The request timeouts only account for time elapsed since the downstream request completed
1955
  // which might not have happened yet (in which case zero time has elapsed.)
1956
0
  std::chrono::milliseconds elapsed_time = std::chrono::milliseconds::zero();
1957
1958
0
  if (DateUtil::timePointValid(downstream_request_complete_time_)) {
1959
0
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
1960
0
    elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(
1961
0
        dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
1962
0
  }
1963
1964
0
  FilterUtility::setTimeoutHeaders(elapsed_time.count(), timeout_, *route_entry_,
1965
0
                                   *downstream_headers_, !config_.suppress_envoy_headers_,
1966
0
                                   grpc_request_, hedging_params_.hedge_on_per_try_timeout_);
1967
1968
0
  UpstreamRequest* upstream_request_tmp = upstream_request.get();
1969
0
  LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
1970
0
  upstream_requests_.front()->acceptHeadersFromRouter(
1971
0
      !callbacks_->decodingBuffer() && !downstream_trailers_ && downstream_end_stream_);
1972
  // It's possible we got immediately reset which means the upstream request we just
1973
  // added to the front of the list might have been removed, so we need to check to make
1974
  // sure we don't send data on the wrong request.
1975
0
  if (!upstream_requests_.empty() && (upstream_requests_.front().get() == upstream_request_tmp)) {
1976
0
    if (callbacks_->decodingBuffer()) {
1977
      // If we are doing a retry we need to make a copy.
1978
0
      Buffer::OwnedImpl copy(*callbacks_->decodingBuffer());
1979
0
      upstream_requests_.front()->acceptDataFromRouter(copy, !downstream_trailers_ &&
1980
0
                                                                 downstream_end_stream_);
1981
0
    }
1982
1983
0
    if (downstream_trailers_) {
1984
0
      upstream_requests_.front()->acceptTrailersFromRouter(*downstream_trailers_);
1985
0
    }
1986
0
  }
1987
0
}
1988
1989
74
uint32_t Filter::numRequestsAwaitingHeaders() {
1990
74
  return std::count_if(upstream_requests_.begin(), upstream_requests_.end(),
1991
74
                       [](const auto& req) -> bool { return req->awaitingHeaders(); });
1992
74
}
1993
1994
RetryStatePtr
1995
ProdFilter::createRetryState(const RetryPolicy& policy, Http::RequestHeaderMap& request_headers,
1996
                             const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster,
1997
                             RouteStatsContextOptRef route_stats_context, Runtime::Loader& runtime,
1998
                             Random::RandomGenerator& random, Event::Dispatcher& dispatcher,
1999
2.39k
                             TimeSource& time_source, Upstream::ResourcePriority priority) {
2000
2.39k
  std::unique_ptr<RetryStateImpl> retry_state =
2001
2.39k
      RetryStateImpl::create(policy, request_headers, cluster, vcluster, route_stats_context,
2002
2.39k
                             runtime, random, dispatcher, time_source, priority);
2003
2.39k
  if (retry_state != nullptr && retry_state->isAutomaticallyConfiguredForHttp3()) {
2004
    // Since doing retry will make Envoy to buffer the request body, if upstream using HTTP/3 is the
2005
    // only reason for doing retry, set the retry shadow buffer limit to 0 so that we don't retry or
2006
    // buffer safe requests with body which is not common.
2007
0
    setRetryShadowBufferLimit(0);
2008
0
  }
2009
2.39k
  return retry_state;
2010
2.39k
}
2011
2012
} // namespace Router
2013
} // namespace Envoy