1
#pragma once
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8

            
9
#include "envoy/common/random_generator.h"
10
#include "envoy/extensions/filters/http/router/v3/router.pb.h"
11
#include "envoy/http/codes.h"
12
#include "envoy/http/filter.h"
13
#include "envoy/http/filter_factory.h"
14
#include "envoy/http/hash_policy.h"
15
#include "envoy/local_info/local_info.h"
16
#include "envoy/router/router_filter_interface.h"
17
#include "envoy/router/shadow_writer.h"
18
#include "envoy/runtime/runtime.h"
19
#include "envoy/server/factory_context.h"
20
#include "envoy/stats/scope.h"
21
#include "envoy/stats/stats_macros.h"
22
#include "envoy/stream_info/stream_info.h"
23
#include "envoy/upstream/cluster_manager.h"
24

            
25
#include "source/common/common/hash.h"
26
#include "source/common/common/hex.h"
27
#include "source/common/common/logger.h"
28
#include "source/common/config/well_known_names.h"
29
#include "source/common/http/filter_chain_helper.h"
30
#include "source/common/http/sidestream_watermark.h"
31
#include "source/common/http/utility.h"
32
#include "source/common/router/context_impl.h"
33
#include "source/common/router/metadatamatchcriteria_impl.h"
34
#include "source/common/router/upstream_request.h"
35
#include "source/common/stats/symbol_table.h"
36
#include "source/common/upstream/load_balancer_context_base.h"
37
#include "source/common/upstream/upstream_factory_context_impl.h"
38

            
39
#include "absl/types/optional.h"
40

            
41
namespace Envoy {
42
namespace Router {
43

            
44
/**
45
 * Struct definition for all router filter stats. @see stats_macros.h
46
 */
47
MAKE_STATS_STRUCT(FilterStats, StatNames, ALL_ROUTER_STATS);
48

            
49
/**
50
 * Router filter utilities split out for ease of testing.
51
 */
52
class FilterUtility {
53
public:
54
  struct HedgingParams {
55
    bool hedge_on_per_try_timeout_ : 1;
56
  };
57

            
58
  class StrictHeaderChecker {
59
  public:
60
    struct HeaderCheckResult {
61
      bool valid_ = true;
62
      const Http::HeaderEntry* entry_;
63
    };
64

            
65
    /**
66
     * Determine whether a given header's value passes the strict validation
67
     * defined for that header.
68
     * @param headers supplies the headers from which to get the target header.
69
     * @param target_header is the header to be validated.
70
     * @return HeaderCheckResult containing the entry for @param target_header
71
     *         and valid_ set to FALSE if @param target_header is set to an
72
     *         invalid value. If @param target_header doesn't appear in
73
     *         @param headers, return a result with valid_ set to TRUE.
74
     */
75
    static const HeaderCheckResult checkHeader(Http::RequestHeaderMap& headers,
76
                                               const Http::LowerCaseString& target_header);
77

            
78
    using ParseRetryFlagsFunc = std::function<std::pair<uint32_t, bool>(absl::string_view)>;
79

            
80
  private:
81
    static HeaderCheckResult hasValidRetryFields(const Http::HeaderEntry* header_entry,
82
19
                                                 const ParseRetryFlagsFunc& parse_fn) {
83
19
      HeaderCheckResult r;
84
19
      if (header_entry) {
85
14
        const auto flags_and_validity = parse_fn(header_entry->value().getStringView());
86
14
        r.valid_ = flags_and_validity.second;
87
14
        r.entry_ = header_entry;
88
14
      }
89
19
      return r;
90
19
    }
91

            
92
53
    static HeaderCheckResult isInteger(const Http::HeaderEntry* header_entry) {
93
53
      HeaderCheckResult r;
94
53
      if (header_entry) {
95
27
        uint64_t out;
96
27
        r.valid_ = absl::SimpleAtoi(header_entry->value().getStringView(), &out);
97
27
        r.entry_ = header_entry;
98
27
      }
99
53
      return r;
100
53
    }
101
  };
102

            
103
  /**
104
   * Returns response_time / timeout, as a  percentage as [0, 100]. Returns 0
105
   * if there is no timeout.
106
   * @param response_time supplies the response time thus far.
107
   * @param timeout supplies  the timeout to get the percentage of.
108
   * @return the percentage of timeout [0, 100] for stats use.
109
   */
110
  static uint64_t percentageOfTimeout(const std::chrono::milliseconds response_time,
111
                                      const std::chrono::milliseconds timeout);
112

            
113
  /**
114
   * Set the :scheme header using the best information available. In order this is
115
   * - whether the upstream connection is using TLS if use_upstream is true
116
   * - existing scheme header if valid
117
   * - x-forwarded-proto header if valid
118
   * - whether the downstream connection is using TLS
119
   */
120
  static void setUpstreamScheme(Http::RequestHeaderMap& headers, bool downstream_ssl,
121
                                bool upstream_ssl, bool use_upstream);
122

            
123
  /**
124
   * Determine whether a request should be shadowed.
125
   * @param policy supplies the route's shadow policy.
126
   * @param runtime supplies the runtime to lookup the shadow key in.
127
   * @param stable_random supplies the random number to use when determining whether shadowing
128
   *        should take place.
129
   * @return TRUE if shadowing should take place.
130
   */
131
  static bool shouldShadow(const ShadowPolicy& policy, Runtime::Loader& runtime,
132
                           uint64_t stable_random);
133

            
134
  /**
135
   * Determine the final timeout to use based on the route as well as the request headers.
136
   * @param route supplies the request route.
137
   * @param request_headers supplies the request headers.
138
   * @param insert_envoy_expected_request_timeout_ms insert
139
   *        x-envoy-expected-request-timeout-ms?
140
   * @param grpc_request tells if the request is a gRPC request.
141
   * @return TimeoutData for both the global and per try timeouts.
142
   */
143
  static TimeoutData finalTimeout(const RouteEntry& route, Http::RequestHeaderMap& request_headers,
144
                                  bool insert_envoy_expected_request_timeout_ms, bool grpc_request,
145
                                  bool per_try_timeout_hedging_enabled,
146
                                  bool respect_expected_rq_timeout);
147

            
148
  /**
149
   * Set the x-envoy-expected-request-timeout-ms and grpc-timeout headers if needed.
150
   * @param elapsed_time time elapsed since completion of the downstream request
151
   * @param timeout final TimeoutData to use for the request
152
   * @param request_headers the request headers to modify
153
   * @param insert_envoy_expected_request_timeout_ms insert
154
   *        x-envoy-expected-request-timeout-ms?
155
   * @param grpc_request tells if the request is a gRPC request.
156
   * @param per_try_timeout_headging_enabled is request hedging enabled?
157
   */
158
  static void setTimeoutHeaders(uint64_t elapsed_time, const TimeoutData& timeout,
159
                                const RouteEntry& route, Http::RequestHeaderMap& request_headers,
160
                                bool insert_envoy_expected_request_timeout_ms, bool grpc_request,
161
                                bool per_try_timeout_hedging_enabled);
162

            
163
  /**
164
   * Try to parse a header entry that may have a timeout field
165
   *
166
   * @param header_timeout_entry header entry which may contain a timeout value.
167
   * @return result timeout value from header. It will return nullopt if parse failed.
168
   */
169
  static absl::optional<std::chrono::milliseconds>
170
  tryParseHeaderTimeout(const Http::HeaderEntry& header_timeout_entry);
171

            
172
  /**
173
   * Try to set global timeout.
174
   *
175
   * @param header_timeout_entry header entry which may contain a timeout value.
176
   * @param timeout timeout data to set from header timeout entry.
177
   */
178
  static void trySetGlobalTimeout(const Http::HeaderEntry& header_timeout_entry,
179
                                  TimeoutData& timeout);
180

            
181
  /**
182
   * Determine the final hedging settings after applying randomized behavior.
183
   * @param route supplies the request route.
184
   * @param request_headers supplies the request headers.
185
   * @return HedgingParams the final parameters to use for request hedging.
186
   */
187
  static HedgingParams finalHedgingParams(const RouteEntry& route,
188
                                          Http::RequestHeaderMap& request_headers);
189
};
190

            
191
/**
192
 * Configuration for the router filter.
193
 */
194
class FilterConfig : public Http::FilterChainFactory {
195
public:
196
  FilterConfig(Server::Configuration::CommonFactoryContext& factory_context,
197
               Stats::StatName stat_prefix, Stats::Scope& scope, Upstream::ClusterManager& cm,
198
               Runtime::Loader& runtime, Random::RandomGenerator& random,
199
               ShadowWriterPtr&& shadow_writer, bool emit_dynamic_stats, bool start_child_span,
200
               bool suppress_envoy_headers, bool respect_expected_rq_timeout,
201
               bool suppress_grpc_request_failure_code_stats,
202
               bool flush_upstream_log_on_upstream_stream, bool reject_connect_request_early_data,
203
               const Protobuf::RepeatedPtrField<std::string>& strict_check_headers,
204
               TimeSource& time_source, Http::Context& http_context,
205
               Router::Context& router_context)
206
13093
      : factory_context_(factory_context), router_context_(router_context), scope_(scope), cm_(cm),
207
13093
        runtime_(runtime), default_stats_(router_context_.statNames(), scope_, stat_prefix),
208
13093
        async_stats_(router_context_.statNames(), scope, http_context.asyncClientStatPrefix()),
209
13093
        random_(random), emit_dynamic_stats_(emit_dynamic_stats),
210
13093
        start_child_span_(start_child_span), suppress_envoy_headers_(suppress_envoy_headers),
211
13093
        respect_expected_rq_timeout_(respect_expected_rq_timeout),
212
13093
        suppress_grpc_request_failure_code_stats_(suppress_grpc_request_failure_code_stats),
213
13093
        flush_upstream_log_on_upstream_stream_(flush_upstream_log_on_upstream_stream),
214
13093
        reject_connect_request_early_data_(reject_connect_request_early_data),
215
13093
        http_context_(http_context), zone_name_(factory_context.localInfo().zoneStatName()),
216
13093
        shadow_writer_(std::move(shadow_writer)), time_source_(time_source) {
217
13093
    if (!strict_check_headers.empty()) {
218
31
      strict_check_headers_ = std::make_unique<HeaderVector>();
219
132
      for (const auto& header : strict_check_headers) {
220
132
        strict_check_headers_->emplace_back(Http::LowerCaseString(header));
221
132
      }
222
31
    }
223
13093
  }
224

            
225
  static absl::StatusOr<std::unique_ptr<FilterConfig>>
226
  create(Stats::StatName stat_prefix, Server::Configuration::FactoryContext& context,
227
         ShadowWriterPtr&& shadow_writer,
228
         const envoy::extensions::filters::http::router::v3::Router& config);
229

            
230
protected:
231
  FilterConfig(Stats::StatName stat_prefix, Server::Configuration::FactoryContext& context,
232
               ShadowWriterPtr&& shadow_writer,
233
               const envoy::extensions::filters::http::router::v3::Router& config,
234
               absl::Status& creation_status);
235

            
236
public:
237
46110
  bool createFilterChain(Http::FilterChainFactoryCallbacks& callbacks) const override {
238
    // Currently there is no default filter chain, so only_create_if_configured true doesn't make
239
    // sense.
240
46110
    if (upstream_http_filter_factories_.empty()) {
241
46071
      return false;
242
46071
    }
243
39
    Http::FilterChainUtility::createFilterChainForFactories(callbacks,
244
39
                                                            upstream_http_filter_factories_);
245
39
    return true;
246
46110
  }
247

            
248
  bool createUpgradeFilterChain(absl::string_view, const UpgradeMap*,
249
1
                                Http::FilterChainFactoryCallbacks&) const override {
250
    // Upgrade filter chains not yet supported for upstream HTTP filters.
251
1
    return false;
252
1
  }
253

            
254
  using HeaderVector = std::vector<Http::LowerCaseString>;
255
  using HeaderVectorPtr = std::unique_ptr<HeaderVector>;
256

            
257
93
  ShadowWriter& shadowWriter() { return *shadow_writer_; }
258
2
  TimeSource& timeSource() { return time_source_; }
259

            
260
  Server::Configuration::CommonFactoryContext& factory_context_;
261
  Router::Context& router_context_;
262
  Stats::Scope& scope_;
263
  Upstream::ClusterManager& cm_;
264
  Runtime::Loader& runtime_;
265
  FilterStats default_stats_;
266
  FilterStats async_stats_;
267
  Random::RandomGenerator& random_;
268
  const bool emit_dynamic_stats_;
269
  const bool start_child_span_;
270
  const bool suppress_envoy_headers_;
271
  const bool respect_expected_rq_timeout_;
272
  const bool suppress_grpc_request_failure_code_stats_;
273
  // TODO(xyu-stripe): Make this a bitset to keep cluster memory footprint down.
274
  HeaderVectorPtr strict_check_headers_;
275
  const bool flush_upstream_log_on_upstream_stream_;
276
  const bool reject_connect_request_early_data_;
277
  absl::optional<std::chrono::milliseconds> upstream_log_flush_interval_;
278
  std::list<AccessLog::InstanceSharedPtr> upstream_logs_;
279
  Http::Context& http_context_;
280
  Stats::StatName zone_name_;
281
  Stats::StatName empty_stat_name_;
282
  std::unique_ptr<Server::Configuration::UpstreamFactoryContext> upstream_ctx_;
283
  Http::FilterChainUtility::FilterFactoriesList upstream_http_filter_factories_;
284

            
285
private:
286
  ShadowWriterPtr shadow_writer_;
287
  TimeSource& time_source_;
288
};
289

            
290
using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;
291

            
292
class UpstreamRequest;
293
using UpstreamRequestPtr = std::unique_ptr<UpstreamRequest>;
294

            
295
/**
296
 * Service routing filter.
297
 */
298
class Filter : Logger::Loggable<Logger::Id::router>,
299
               public Http::StreamDecoderFilter,
300
               public Upstream::LoadBalancerContextBase,
301
               public RouterFilterInterface {
302
public:
303
  Filter(const FilterConfigSharedPtr& config, FilterStats& stats)
304
95367
      : config_(config), stats_(stats),
305
95367
        allow_multiplexed_upstream_half_close_(Runtime::runtimeFeatureEnabled(
306
95367
            "envoy.reloadable_features.allow_multiplexed_upstream_half_close")),
307
95367
        reject_early_connect_data_enabled_(config->reject_connect_request_early_data_) {}
308

            
309
  ~Filter() override;
310

            
311
  static StreamInfo::CoreResponseFlag
312
  streamResetReasonToResponseFlag(Http::StreamResetReason reset_reason);
313

            
314
  // Http::StreamFilterBase
315
  void onDestroy() override;
316

            
317
7606
  Http::LocalErrorStatus onLocalReply(const LocalReplyData&) override {
318
    // If there's a local reply (e.g. timeout) during host selection, cancel host
319
    // selection.
320
7606
    if (host_selection_cancelable_) {
321
31
      host_selection_cancelable_->cancel();
322
31
      host_selection_cancelable_.reset();
323
31
    }
324

            
325
    // Clean up the upstream_requests_.
326
7606
    resetAll();
327
7606
    return Http::LocalErrorStatus::Continue;
328
7606
  }
329

            
330
  // Http::StreamDecoderFilter
331
  Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
332
                                          bool end_stream) override;
333

            
334
  bool continueDecodeHeaders(Upstream::ThreadLocalCluster* cluster, Http::RequestHeaderMap& headers,
335
                             bool end_stream, Upstream::HostConstSharedPtr&& host,
336
                             absl::string_view host_selection_details = {});
337

            
338
  Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override;
339
  Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override;
340
  Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap& metadata_map) override;
341
  void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override;
342

            
343
  // Upstream::LoadBalancerContext
344
1344
  absl::optional<uint64_t> computeHashKey() override {
345
1344
    if (route_entry_ && downstream_headers_) {
346
      // Use cluster-level hash policy if available (most specific wins).
347
      // If no cluster-level policy is configured, fall back to route-level policy.
348
1343
      const Http::HashPolicy* hash_policy =
349
1343
          cluster_ != nullptr ? cluster_->httpProtocolOptions().hashPolicy() : nullptr;
350
1343
      if (hash_policy == nullptr) {
351
1329
        hash_policy = route_entry_->hashPolicy();
352
1329
      }
353

            
354
1343
      if (hash_policy) {
355
1341
        return hash_policy->generateHash(
356
1341
            *downstream_headers_, callbacks_->streamInfo(),
357
1341
            [this](absl::string_view key, absl::string_view path, std::chrono::seconds max_age,
358
1341
                   absl::Span<const Http::CookieAttribute> attributes) -> std::string {
359
454
              return addDownstreamSetCookie(key, path, max_age, attributes);
360
454
            });
361
1341
      }
362
1343
    }
363
3
    return {};
364
1344
  }
365
4077
  const Router::MetadataMatchCriteria* metadataMatchCriteria() override {
366
4077
    if (!route_entry_) {
367
1
      return nullptr;
368
1
    }
369

            
370
    // Have we been called before? If so, there's no need to recompute because
371
    // by the time this method is called for the first time, route_entry_ should
372
    // not change anymore.
373
4076
    if (metadata_match_ != nullptr) {
374
15
      return metadata_match_.get();
375
15
    }
376

            
377
4061
    OptRef<const Protobuf::Struct> connection_metadata;
378
4061
    const auto* downstream_conn = downstreamConnection();
379
4061
    if (downstream_conn != nullptr) {
380
4057
      const auto& connection_fm = downstream_conn->streamInfo().dynamicMetadata().filter_metadata();
381
4057
      if (const auto it = connection_fm.find(Envoy::Config::MetadataFilters::get().ENVOY_LB);
382
4057
          it != connection_fm.end()) {
383
5
        connection_metadata = makeOptRef<const Protobuf::Struct>(it->second);
384
5
      }
385
4057
    }
386

            
387
4061
    OptRef<const Protobuf::Struct> request_metadata;
388
4061
    const auto& request_fm = callbacks_->streamInfo().dynamicMetadata().filter_metadata();
389
4061
    if (const auto it = request_fm.find(Envoy::Config::MetadataFilters::get().ENVOY_LB);
390
4061
        it != request_fm.end()) {
391
12
      request_metadata = makeOptRef<const Protobuf::Struct>(it->second);
392
12
    }
393

            
394
    // The precedence is: request metadata > connection metadata > route criteria.
395
    // We start with the route's criteria and sequentially merge others on top.
396
4061
    const auto* base_criteria = route_entry_->metadataMatchCriteria();
397
4061
    Router::MetadataMatchCriteriaConstPtr merged_criteria;
398

            
399
4061
    const auto* current_base = base_criteria;
400

            
401
    // Merge connection metadata, if it exists.
402
4061
    if (connection_metadata) {
403
5
      merged_criteria =
404
5
          current_base ? current_base->mergeMatchCriteria(*connection_metadata)
405
5
                       : std::make_unique<Router::MetadataMatchCriteriaImpl>(*connection_metadata);
406
5
      current_base = merged_criteria.get();
407
5
    }
408

            
409
    // Merge request metadata, if it exists.
410
4061
    if (request_metadata) {
411
12
      merged_criteria =
412
12
          current_base ? current_base->mergeMatchCriteria(*request_metadata)
413
12
                       : std::make_unique<Router::MetadataMatchCriteriaImpl>(*request_metadata);
414
12
    }
415

            
416
    // If merged_criteria is null, no merges occurred. Return the original base criteria.
417
4061
    if (!merged_criteria) {
418
4047
      return base_criteria;
419
4047
    }
420

            
421
    // Otherwise, cache the newly created criteria and return it.
422
14
    metadata_match_ = std::move(merged_criteria);
423
14
    return metadata_match_.get();
424
4061
  }
425
142115
  const Network::Connection* downstreamConnection() const override {
426
142115
    return callbacks_->connection().ptr();
427
142115
  }
428
627
  StreamInfo::StreamInfo* requestStreamInfo() const override { return &callbacks_->streamInfo(); }
429
362
  const Http::RequestHeaderMap* downstreamHeaders() const override { return downstream_headers_; }
430

            
431
46965
  bool shouldSelectAnotherHost(const Upstream::Host& host) override {
432
    // We only care about host selection when performing a retry, at which point we consult the
433
    // RetryState to see if we're configured to avoid certain hosts during retries.
434
46965
    if (!is_retry_) {
435
46757
      return false;
436
46757
    }
437

            
438
208
    ASSERT(retry_state_);
439
208
    return retry_state_->shouldSelectAnotherHost(host);
440
46965
  }
441

            
442
  const Upstream::HealthyAndDegradedLoad& determinePriorityLoad(
443
      const Upstream::PrioritySet& priority_set,
444
      const Upstream::HealthyAndDegradedLoad& original_priority_load,
445
45692
      const Upstream::RetryPriority::PriorityMappingFunc& priority_mapping_func) override {
446
    // We only modify the priority load on retries.
447
45692
    if (!is_retry_) {
448
45485
      return original_priority_load;
449
45485
    }
450
207
    return retry_state_->priorityLoadForRetry(requestStreamInfo(), priority_set,
451
207
                                              original_priority_load, priority_mapping_func);
452
45692
  }
453

            
454
46981
  uint32_t hostSelectionRetryCount() const override {
455
46981
    if (!is_retry_) {
456
46781
      return 1;
457
46781
    }
458
200
    return retry_state_->hostSelectionMaxAttempts();
459
46981
  }
460

            
461
46711
  Network::Socket::OptionsSharedPtr upstreamSocketOptions() const override {
462
46711
    return (upstream_options_ != nullptr) ? upstream_options_
463
46711
                                          : callbacks_->getUpstreamSocketOptions();
464
46711
  }
465

            
466
98977
  Network::TransportSocketOptionsConstSharedPtr upstreamTransportSocketOptions() const override {
467
98977
    return transport_socket_options_;
468
98977
  }
469

            
470
47287
  absl::optional<OverrideHost> overrideHostToSelect() const override {
471
47287
    if (is_retry_) {
472
202
      return {};
473
202
    }
474
47085
    return callbacks_->upstreamOverrideHost();
475
47287
  }
476
1
  void setHeadersModifier(std::function<void(Http::ResponseHeaderMap&)> modifier) override {
477
1
    modify_headers_from_upstream_lb_ = std::move(modifier);
478
1
  }
479

            
480
  void onAsyncHostSelection(Upstream::HostConstSharedPtr&& host, std::string&& details) override;
481

            
482
  /**
483
   * Set a computed cookie to be sent with the downstream headers.
484
   * @param key supplies the size of the cookie
485
   * @param max_age the lifetime of the cookie
486
   * @param  path the path of the cookie, or ""
487
   * @return std::string the value of the new cookie
488
   */
489
  std::string addDownstreamSetCookie(absl::string_view key, absl::string_view path,
490
                                     std::chrono::seconds max_age,
491
454
                                     absl::Span<const Http::CookieAttribute> attributes) {
492
    // The cookie value should be the same per connection so that if multiple
493
    // streams race on the same path, they all receive the same cookie.
494
    // Since the downstream port is part of the hashed value, multiple HTTP1
495
    // connections can receive different cookies if they race on requests.
496
454
    std::string value;
497
454
    const Network::Connection* conn = downstreamConnection();
498
    // Need to check for null conn if this is ever used by Http::AsyncClient in the future.
499
454
    value = conn->connectionInfoProvider().remoteAddress()->asString() +
500
454
            conn->connectionInfoProvider().localAddress()->asString();
501

            
502
454
    const std::string cookie_value = Hex::uint64ToHex(HashUtil::xxHash64(value));
503
454
    downstream_set_cookies_.emplace_back(
504
454
        Http::Utility::makeSetCookieValue(key, cookie_value, path, max_age, true, attributes));
505
454
    return cookie_value;
506
454
  }
507

            
508
  // RouterFilterInterface
509
  void onUpstream1xxHeaders(Http::ResponseHeaderMapPtr&& headers,
510
                            UpstreamRequest& upstream_request) override;
511
  void onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
512
                         UpstreamRequest& upstream_request, bool end_stream) override;
513
  void onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request,
514
                      bool end_stream) override;
515
  void onUpstreamTrailers(Http::ResponseTrailerMapPtr&& trailers,
516
                          UpstreamRequest& upstream_request) override;
517
  void onUpstreamMetadata(Http::MetadataMapPtr&& metadata_map) override;
518
  void onUpstreamReset(Http::StreamResetReason reset_reason, absl::string_view transport_failure,
519
                       UpstreamRequest& upstream_request) override;
520
  void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
521
                              bool pool_success) override;
522
  void onPerTryTimeout(UpstreamRequest& upstream_request) override;
523
  void onPerTryIdleTimeout(UpstreamRequest& upstream_request) override;
524
  void onStreamMaxDurationReached(UpstreamRequest& upstream_request) override;
525
  void setupRouteTimeoutForWebsocketUpgrade() override;
526
  void disableRouteTimeoutForWebsocketUpgrade() override;
527
2277503
  Http::StreamDecoderFilterCallbacks* callbacks() override { return callbacks_; }
528
646756
  Upstream::ClusterInfoConstSharedPtr cluster() override { return cluster_; }
529
230012
  FilterConfig& config() override { return *config_; }
530
81016
  TimeoutData timeout() override { return timeout_; }
531
46268
  absl::optional<std::chrono::milliseconds> dynamicMaxStreamDuration() const override {
532
46268
    return dynamic_max_stream_duration_;
533
46268
  }
534
405100
  Http::RequestHeaderMap* downstreamHeaders() override { return downstream_headers_; }
535
431519
  Http::RequestTrailerMap* downstreamTrailers() override { return downstream_trailers_; }
536
19619
  bool downstreamResponseStarted() const override { return downstream_response_started_; }
537
46254
  bool downstreamEndStream() const override { return downstream_end_stream_; }
538
21
  uint32_t attemptCount() const override { return attempt_count_; }
539
3
  const std::list<UpstreamRequestPtr>& upstreamRequests() const { return upstream_requests_; }
540

            
541
1
  TimeSource& timeSource() { return config_->timeSource(); }
542
1
  const Route* route() const { return route_.get(); }
543
7
  const FilterStats& stats() { return stats_; }
544
70484
  bool awaitingHost() { return host_selection_cancelable_ != nullptr; }
545

            
546
protected:
547
671
  void setRequestBodyBufferLimit(uint64_t buffer_limit) {
548
671
    request_body_buffer_limit_ = buffer_limit;
549
671
  }
550

            
551
  uint64_t calculateEffectiveBufferLimit() const;
552

            
553
private:
554
  friend class UpstreamRequest;
555

            
556
  enum class TimeoutRetry { Yes, No };
557

            
558
  void onPerTryTimeoutCommon(UpstreamRequest& upstream_request, Stats::Counter& error_counter,
559
                             const std::string& response_code_details);
560
  Stats::StatName upstreamZone(Upstream::HostDescriptionOptConstRef upstream_host);
561
  void chargeUpstreamCode(uint64_t response_status_code,
562
                          const Http::ResponseHeaderMap& response_headers,
563
                          Upstream::HostDescriptionOptConstRef upstream_host, bool dropped);
564
  void chargeUpstreamCode(Http::Code code, Upstream::HostDescriptionOptConstRef upstream_host,
565
                          bool dropped);
566
  void chargeUpstreamAbort(Http::Code code, bool dropped, UpstreamRequest& upstream_request);
567
  void cleanup();
568
  virtual RetryStatePtr
569
  createRetryState(const RetryPolicy& policy, Http::RequestHeaderMap& request_headers,
570
                   const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster,
571
                   RouteStatsContextOptRef route_stats_context,
572
                   Server::Configuration::CommonFactoryContext& context,
573
                   Event::Dispatcher& dispatcher, Upstream::ResourcePriority priority) PURE;
574

            
575
  std::unique_ptr<GenericConnPool>
576
  createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
577
                 Upstream::HostConstSharedPtr host);
578
  UpstreamRequestPtr createUpstreamRequest();
579
  absl::optional<absl::string_view> getShadowCluster(const ShadowPolicy& shadow_policy,
580
                                                     const Http::HeaderMap& headers) const;
581
  void applyShadowPolicyHeaders(const ShadowPolicy& shadow_policy,
582
                                Http::RequestHeaderMap& headers) const;
583
  bool maybeRetryReset(Http::StreamResetReason reset_reason, UpstreamRequest& upstream_request,
584
                       TimeoutRetry is_timeout_retry);
585
  uint32_t numRequestsAwaitingHeaders();
586
  void onGlobalTimeout();
587
  void onRequestComplete();
588
  void onResponseTimeout();
589
  // Handle an upstream request aborted due to a local timeout.
590
  void onSoftPerTryTimeout();
591
  void onSoftPerTryTimeout(UpstreamRequest& upstream_request);
592
  void onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag response_flag,
593
                              absl::string_view details);
594
  // Handle an "aborted" upstream request, meaning we didn't see response
595
  // headers (e.g. due to a reset). Handles recording stats and responding
596
  // downstream if appropriate.
597
  void onUpstreamAbort(Http::Code code, StreamInfo::CoreResponseFlag response_flag,
598
                       absl::string_view body, bool dropped, absl::string_view details);
599
  void onUpstreamComplete(UpstreamRequest& upstream_request);
600
  // Reset all in-flight upstream requests.
601
  void resetAll();
602
  // Reset all in-flight upstream requests that do NOT match the passed argument. This is used
603
  // if a "good" response comes back and we return downstream, so there is no point in waiting
604
  // for the remaining upstream requests to return.
605
  void resetOtherUpstreams(UpstreamRequest& upstream_request);
606
  void sendNoHealthyUpstreamResponse(absl::string_view details);
607
  bool setupRedirect(const Http::ResponseHeaderMap& headers);
608
  bool convertRequestHeadersForInternalRedirect(Http::RequestHeaderMap& downstream_headers,
609
                                                const Http::ResponseHeaderMap& upstream_headers,
610
                                                const Http::HeaderEntry& internal_redirect,
611
                                                uint64_t status_code);
612
  void updateOutlierDetection(Upstream::Outlier::Result result, UpstreamRequest& upstream_request,
613
                              absl::optional<uint64_t> code);
614
  void doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry);
615
  void continueDoRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry,
616
                       Upstream::HostConstSharedPtr&& host, Upstream::ThreadLocalCluster& cluster,
617
                       absl::string_view host_selection_details);
618

            
619
  void runRetryOptionsPredicates(UpstreamRequest& retriable_request);
620
  // Returns the effective retry policy to use for this request.
621
  // Cluster-level retry policy takes precedence over route-level retry policy.
622
  const Router::RetryPolicy* getEffectiveRetryPolicy() const;
623
  // Called immediately after a non-5xx header is received from upstream, performs stats accounting
624
  // and handle difference between gRPC and non-gRPC requests.
625
  void handleNon5xxResponseHeaders(absl::optional<Grpc::Status::GrpcStatus> grpc_status,
626
                                   UpstreamRequest& upstream_request, bool end_stream,
627
                                   uint64_t grpc_to_http_status);
628
77196
  Http::Context& httpContext() { return config_->http_context_; }
629
  bool checkDropOverload(Upstream::ThreadLocalCluster& cluster);
630
  // Process Orca Load Report if necessary (e.g. cluster has lrsReportMetricNames).
631
  void maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or_trailers,
632
                                  UpstreamRequest& upstream_request);
633
  bool isEarlyConnectData();
634

            
635
  RetryStatePtr retry_state_;
636
  const FilterConfigSharedPtr config_;
637
  Http::StreamDecoderFilterCallbacks* callbacks_{};
638
  RouteConstSharedPtr route_;
639
  const RouteEntry* route_entry_{};
640
  Upstream::ClusterInfoConstSharedPtr cluster_;
641
  std::unique_ptr<Stats::StatNameDynamicStorage> alt_stat_prefix_;
642
  const VirtualCluster* request_vcluster_{};
643
  RouteStatsContextOptRef route_stats_context_;
644
  std::function<void(Upstream::HostConstSharedPtr&& host, absl::string_view details)>
645
      on_host_selected_;
646
  std::unique_ptr<Upstream::AsyncHostSelectionHandle> host_selection_cancelable_;
647
  Event::TimerPtr response_timeout_;
648
  TimeoutData timeout_;
649
  std::list<UpstreamRequestPtr> upstream_requests_;
650
  FilterStats stats_;
651
  // Tracks which upstream request "wins" and will have the corresponding
652
  // response forwarded downstream
653
  UpstreamRequest* final_upstream_request_ = nullptr;
654
  Http::RequestHeaderMap* downstream_headers_{};
655
  Http::RequestTrailerMap* downstream_trailers_{};
656
  MonotonicTime downstream_request_complete_time_;
657
  MetadataMatchCriteriaConstPtr metadata_match_;
658
  std::function<void(Http::ResponseHeaderMap&)> modify_headers_;
659
  std::function<void(Http::ResponseHeaderMap&)> modify_headers_from_upstream_lb_;
660
  std::vector<std::reference_wrapper<const ShadowPolicy>> active_shadow_policies_;
661
  std::unique_ptr<Http::RequestHeaderMap> shadow_headers_;
662
  std::unique_ptr<Http::RequestTrailerMap> shadow_trailers_;
663
  // The stream lifetime configured by request header.
664
  absl::optional<std::chrono::milliseconds> dynamic_max_stream_duration_;
665
  // list of cookies to add to upstream headers
666
  std::vector<std::string> downstream_set_cookies_;
667

            
668
  Network::TransportSocketOptionsConstSharedPtr transport_socket_options_;
669
  Network::Socket::OptionsSharedPtr upstream_options_;
670
  // Set of ongoing shadow streams which have not yet received end stream.
671
  absl::flat_hash_set<Http::AsyncClient::OngoingRequest*> shadow_streams_;
672

            
673
  // Keep small members (bools and enums) at the end of class, to reduce alignment overhead.
674
  uint64_t request_body_buffer_limit_{std::numeric_limits<uint64_t>::max()};
675
  uint32_t attempt_count_{0};
676
  uint32_t pending_retries_{0};
677
  Http::Code timeout_response_code_ = Http::Code::GatewayTimeout;
678
  FilterUtility::HedgingParams hedging_params_;
679
  Http::StreamFilterSidestreamWatermarkCallbacks watermark_callbacks_;
680
  bool grpc_request_ : 1 = false;
681
  bool exclude_http_code_stats_ : 1 = false;
682
  bool downstream_response_started_ : 1 = false;
683
  bool downstream_end_stream_ : 1 = false;
684
  bool is_retry_ : 1 = false;
685
  bool include_attempt_count_in_request_ : 1 = false;
686
  bool include_timeout_retry_header_in_request_ : 1 = false;
687
  bool request_buffer_overflowed_ : 1 = false;
688
  const bool allow_multiplexed_upstream_half_close_ : 1 = false;
689
  bool upstream_request_started_ : 1 = false;
690
  // Indicate that ORCA report is received to process it only once in either response headers or
691
  // trailers.
692
  bool orca_load_report_received_ : 1 = false;
693
  // Cached runtime flag value for reject_early_connect_data to avoid evaluating it on every data
694
  // chunk.
695
  bool reject_early_connect_data_enabled_ : 1 = false;
696
};
697

            
698
class ProdFilter : public Filter {
699
public:
700
  using Filter::Filter;
701

            
702
private:
703
  // Filter
704
  RetryStatePtr createRetryState(const RetryPolicy& policy, Http::RequestHeaderMap& request_headers,
705
                                 const Upstream::ClusterInfo& cluster,
706
                                 const VirtualCluster* vcluster,
707
                                 RouteStatsContextOptRef route_stats_context,
708
                                 Server::Configuration::CommonFactoryContext& context,
709
                                 Event::Dispatcher& dispatcher,
710
                                 Upstream::ResourcePriority priority) override;
711
};
712

            
713
} // namespace Router
714
} // namespace Envoy