LCOV - code coverage report
Current view: top level - source/common/router - upstream_request.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 279 527 52.9 %
Date: 2024-01-05 06:35:25 Functions: 32 56 57.1 %

          Line data    Source code
       1             : #include "source/common/router/upstream_request.h"
       2             : 
       3             : #include <chrono>
       4             : #include <cstdint>
       5             : #include <functional>
       6             : #include <memory>
       7             : #include <string>
       8             : 
       9             : #include "envoy/event/dispatcher.h"
      10             : #include "envoy/event/timer.h"
      11             : #include "envoy/grpc/status.h"
      12             : #include "envoy/http/conn_pool.h"
      13             : #include "envoy/http/header_map.h"
      14             : #include "envoy/runtime/runtime.h"
      15             : #include "envoy/upstream/cluster_manager.h"
      16             : #include "envoy/upstream/upstream.h"
      17             : 
      18             : #include "source/common/common/assert.h"
      19             : #include "source/common/common/dump_state_utils.h"
      20             : #include "source/common/common/empty_string.h"
      21             : #include "source/common/common/enum_to_int.h"
      22             : #include "source/common/common/scope_tracker.h"
      23             : #include "source/common/common/utility.h"
      24             : #include "source/common/grpc/common.h"
      25             : #include "source/common/http/codes.h"
      26             : #include "source/common/http/header_map_impl.h"
      27             : #include "source/common/http/headers.h"
      28             : #include "source/common/http/message_impl.h"
      29             : #include "source/common/http/utility.h"
      30             : #include "source/common/network/application_protocol.h"
      31             : #include "source/common/network/transport_socket_options_impl.h"
      32             : #include "source/common/network/upstream_server_name.h"
      33             : #include "source/common/network/upstream_subject_alt_names.h"
      34             : #include "source/common/router/config_impl.h"
      35             : #include "source/common/router/debug_config.h"
      36             : #include "source/common/router/router.h"
      37             : #include "source/common/stream_info/uint32_accessor_impl.h"
      38             : #include "source/common/tracing/http_tracer_impl.h"
      39             : #include "source/extensions/common/proxy_protocol/proxy_protocol_header.h"
      40             : 
      41             : namespace Envoy {
      42             : namespace Router {
      43             : 
      44             : // The upstream HTTP filter manager class.
      45             : class UpstreamFilterManager : public Http::FilterManager {
      46             : public:
      47             :   UpstreamFilterManager(Http::FilterManagerCallbacks& filter_manager_callbacks,
      48             :                         Event::Dispatcher& dispatcher, OptRef<const Network::Connection> connection,
      49             :                         uint64_t stream_id, Buffer::BufferMemoryAccountSharedPtr account,
      50             :                         bool proxy_100_continue, uint32_t buffer_limit,
      51             :                         const Http::FilterChainFactory& filter_chain_factory,
      52             :                         UpstreamRequest& request)
      53             :       : FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
      54             :                       proxy_100_continue, buffer_limit, filter_chain_factory),
      55         251 :         upstream_request_(request) {}
      56             : 
      57         392 :   StreamInfo::StreamInfo& streamInfo() override {
      58         392 :     return upstream_request_.parent_.callbacks()->streamInfo();
      59         392 :   }
      60           0 :   const StreamInfo::StreamInfo& streamInfo() const override {
      61           0 :     return upstream_request_.parent_.callbacks()->streamInfo();
      62           0 :   }
      63             :   // Send local replies via the downstream HTTP filter manager.
      64             :   // Local replies will not be seen by upstream HTTP filters.
      65             :   void sendLocalReply(Http::Code code, absl::string_view body,
      66             :                       const std::function<void(Http::ResponseHeaderMap& headers)>& modify_headers,
      67             :                       const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
      68           0 :                       absl::string_view details) override {
      69           0 :     state().decoder_filter_chain_aborted_ = true;
      70           0 :     state().encoder_filter_chain_aborted_ = true;
      71           0 :     state().remote_encode_complete_ = true;
      72           0 :     state().local_complete_ = true;
      73             :     // TODO(alyssawilk) this should be done through the router to play well with hedging.
      74           0 :     upstream_request_.parent_.callbacks()->sendLocalReply(code, body, modify_headers, grpc_status,
      75           0 :                                                           details);
      76           0 :   }
      77           0 :   void executeLocalReplyIfPrepared() override {}
      78             :   UpstreamRequest& upstream_request_;
      79             : };
      80             : 
      81             : UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
      82             :                                  std::unique_ptr<GenericConnPool>&& conn_pool,
      83             :                                  bool can_send_early_data, bool can_use_http3)
      84             :     : parent_(parent), conn_pool_(std::move(conn_pool)),
      85             :       stream_info_(parent_.callbacks()->dispatcher().timeSource(), nullptr),
      86             :       start_time_(parent_.callbacks()->dispatcher().timeSource().monotonicTime()),
      87             :       calling_encode_headers_(false), upstream_canary_(false), router_sent_end_stream_(false),
      88             :       encode_trailers_(false), retried_(false), awaiting_headers_(true),
      89             :       outlier_detection_timeout_recorded_(false),
      90             :       create_per_try_timeout_on_request_complete_(false), paused_for_connect_(false),
      91             :       reset_stream_(false),
      92             :       record_timeout_budget_(parent_.cluster()->timeoutBudgetStats().has_value()),
      93             :       cleaned_up_(false), had_upstream_(false),
      94             :       stream_options_({can_send_early_data, can_use_http3}), grpc_rq_success_deferred_(false),
      95             :       upstream_wait_for_response_headers_before_disabling_read_(Runtime::runtimeFeatureEnabled(
      96         251 :           "envoy.reloadable_features.upstream_wait_for_response_headers_before_disabling_read")) {
      97         251 :   if (auto tracing_config = parent_.callbacks()->tracingConfig(); tracing_config.has_value()) {
      98          68 :     if (tracing_config->spawnUpstreamSpan() || parent_.config().start_child_span_) {
      99           0 :       span_ = parent_.callbacks()->activeSpan().spawnChild(
     100           0 :           tracing_config.value().get(),
     101           0 :           absl::StrCat("router ", parent.cluster()->observabilityName(), " egress"),
     102           0 :           parent_.callbacks()->dispatcher().timeSource().systemTime());
     103           0 :       if (parent.attemptCount() != 1) {
     104             :         // This is a retry request, add this metadata to span.
     105           0 :         span_->setTag(Tracing::Tags::get().RetryCount, std::to_string(parent.attemptCount() - 1));
     106           0 :       }
     107           0 :     }
     108          68 :   }
     109             : 
     110             :   // The router checks that the connection pool is non-null before creating the upstream request.
     111         251 :   auto upstream_host = conn_pool_->host();
     112         251 :   Tracing::HttpTraceContext trace_context(*parent_.downstreamHeaders());
     113         251 :   if (span_ != nullptr) {
     114           0 :     span_->injectContext(trace_context, upstream_host);
     115         251 :   } else {
     116             :     // No independent child span for current upstream request then inject the parent span's tracing
     117             :     // context into the request headers.
     118             :     // The injectContext() of the parent span may be called repeatedly when the request is retried.
     119         251 :     parent_.callbacks()->activeSpan().injectContext(trace_context, upstream_host);
     120         251 :   }
     121             : 
     122         251 :   stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
     123         251 :   stream_info_.route_ = parent_.callbacks()->route();
     124         251 :   parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo());
     125             : 
     126         251 :   stream_info_.healthCheck(parent_.callbacks()->streamInfo().healthCheck());
     127         251 :   stream_info_.setIsShadow(parent_.callbacks()->streamInfo().isShadow());
     128         251 :   absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info =
     129         251 :       parent_.callbacks()->streamInfo().upstreamClusterInfo();
     130         251 :   if (cluster_info.has_value()) {
     131         251 :     stream_info_.setUpstreamClusterInfo(*cluster_info);
     132         251 :   }
     133             : 
     134             :   // Set up the upstream HTTP filter manager.
     135         251 :   filter_manager_callbacks_ = std::make_unique<UpstreamRequestFilterManagerCallbacks>(*this);
     136         251 :   filter_manager_ = std::make_unique<UpstreamFilterManager>(
     137         251 :       *filter_manager_callbacks_, parent_.callbacks()->dispatcher(), connection(),
     138         251 :       parent_.callbacks()->streamId(), parent_.callbacks()->account(), true,
     139         251 :       parent_.callbacks()->decoderBufferLimit(), *parent_.cluster(), *this);
     140             :   // Attempt to create custom cluster-specified filter chain
     141         251 :   bool created = parent_.cluster()->createFilterChain(*filter_manager_,
     142         251 :                                                       /*only_create_if_configured=*/true);
     143         251 :   if (!created) {
     144             :     // Attempt to create custom router-specified filter chain.
     145         251 :     created = parent_.config().createFilterChain(*filter_manager_);
     146         251 :   }
     147         251 :   if (!created) {
     148             :     // Neither cluster nor router have a custom filter chain; add the default
     149             :     // cluster filter chain, which only consists of the codec filter.
     150         251 :     created = parent_.cluster()->createFilterChain(*filter_manager_, false);
     151         251 :   }
     152             :   // There will always be a codec filter present, which sets the upstream
     153             :   // interface. Fast-fail any tests that don't set up mocks correctly.
     154         251 :   ASSERT(created && upstream_interface_.has_value());
     155         251 : }
     156             : 
     157         251 : UpstreamRequest::~UpstreamRequest() { cleanUp(); }
     158             : 
     159         502 : void UpstreamRequest::cleanUp() {
     160         502 :   if (cleaned_up_) {
     161         251 :     return;
     162         251 :   }
     163         251 :   cleaned_up_ = true;
     164             : 
     165         251 :   filter_manager_->destroyFilters();
     166             : 
     167         251 :   if (span_ != nullptr) {
     168           0 :     auto tracing_config = parent_.callbacks()->tracingConfig();
     169           0 :     ASSERT(tracing_config.has_value());
     170           0 :     Tracing::HttpTracerUtility::finalizeUpstreamSpan(*span_, stream_info_,
     171           0 :                                                      tracing_config.value().get());
     172           0 :   }
     173             : 
     174         251 :   if (per_try_timeout_ != nullptr) {
     175             :     // Allows for testing.
     176           0 :     per_try_timeout_->disableTimer();
     177           0 :   }
     178             : 
     179         251 :   if (per_try_idle_timeout_ != nullptr) {
     180             :     // Allows for testing.
     181           0 :     per_try_idle_timeout_->disableTimer();
     182           0 :   }
     183             : 
     184         251 :   if (max_stream_duration_timer_ != nullptr) {
     185           0 :     max_stream_duration_timer_->disableTimer();
     186           0 :   }
     187             : 
     188         251 :   if (upstream_log_flush_timer_ != nullptr) {
     189           0 :     upstream_log_flush_timer_->disableTimer();
     190           0 :   }
     191             : 
     192         251 :   clearRequestEncoder();
     193             : 
     194             :   // If desired, fire the per-try histogram when the UpstreamRequest
     195             :   // completes.
     196         251 :   if (record_timeout_budget_) {
     197           0 :     Event::Dispatcher& dispatcher = parent_.callbacks()->dispatcher();
     198           0 :     const MonotonicTime end_time = dispatcher.timeSource().monotonicTime();
     199           0 :     const std::chrono::milliseconds response_time =
     200           0 :         std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time_);
     201           0 :     Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = parent_.cluster()->timeoutBudgetStats();
     202           0 :     tb_stats->get().upstream_rq_timeout_budget_per_try_percent_used_.recordValue(
     203           0 :         FilterUtility::percentageOfTimeout(response_time, parent_.timeout().per_try_timeout_));
     204           0 :   }
     205             : 
     206             :   // Ditto for request/response size histograms.
     207         251 :   Upstream::ClusterRequestResponseSizeStatsOptRef req_resp_stats_opt =
     208         251 :       parent_.cluster()->requestResponseSizeStats();
     209         251 :   if (req_resp_stats_opt.has_value() && parent_.downstreamHeaders()) {
     210           0 :     auto& req_resp_stats = req_resp_stats_opt->get();
     211           0 :     req_resp_stats.upstream_rq_headers_size_.recordValue(parent_.downstreamHeaders()->byteSize());
     212           0 :     req_resp_stats.upstream_rq_body_size_.recordValue(stream_info_.bytesSent());
     213             : 
     214           0 :     if (response_headers_size_.has_value()) {
     215           0 :       req_resp_stats.upstream_rs_headers_size_.recordValue(response_headers_size_.value());
     216           0 :       req_resp_stats.upstream_rs_body_size_.recordValue(stream_info_.bytesReceived());
     217           0 :     }
     218           0 :   }
     219             : 
     220         251 :   stream_info_.onRequestComplete();
     221         251 :   upstreamLog(AccessLog::AccessLogType::UpstreamEnd);
     222             : 
     223         251 :   while (downstream_data_disabled_ != 0) {
     224           0 :     parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
     225           0 :     parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc();
     226           0 :     --downstream_data_disabled_;
     227           0 :   }
     228             :   // The upstream HTTP filter chain callbacks own headers/trailers while they are traversing the
     229             :   // filter chain. Make sure to not delete them immediately when the stream ends, as the stream
     230             :   // often ends during filter chain processing and it causes use-after-free violations.
     231         251 :   parent_.callbacks()->dispatcher().deferredDelete(std::move(filter_manager_callbacks_));
     232         251 : }
     233             : 
     234         251 : void UpstreamRequest::upstreamLog(AccessLog::AccessLogType access_log_type) {
     235         251 :   const Formatter::HttpFormatterContext log_context{parent_.downstreamHeaders(),
     236         251 :                                                     upstream_headers_.get(),
     237         251 :                                                     upstream_trailers_.get(),
     238         251 :                                                     {},
     239         251 :                                                     access_log_type};
     240             : 
     241         251 :   for (const auto& upstream_log : parent_.config().upstream_logs_) {
     242           0 :     upstream_log->log(log_context, stream_info_);
     243           0 :   }
     244         251 : }
     245             : 
     246             : // This is called by the FilterManager when all filters have processed 1xx headers. Forward them
     247             : // on to the router.
     248           0 : void UpstreamRequest::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) {
     249           0 :   ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
     250             : 
     251           0 :   ASSERT(Http::HeaderUtility::isSpecial1xx(*headers));
     252           0 :   addResponseHeadersSize(headers->byteSize());
     253           0 :   maybeHandleDeferredReadDisable();
     254           0 :   parent_.onUpstream1xxHeaders(std::move(headers), *this);
     255           0 : }
     256             : 
     257             : // This is called by the FilterManager when all filters have processed headers. Forward them
     258             : // on to the router.
     259         141 : void UpstreamRequest::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
     260         141 :   ASSERT(headers.get());
     261         141 :   ENVOY_STREAM_LOG(trace, "upstream response headers:\n{}", *parent_.callbacks(), *headers);
     262         141 :   ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
     263             : 
     264         141 :   resetPerTryIdleTimer();
     265             : 
     266         141 :   addResponseHeadersSize(headers->byteSize());
     267             : 
     268             :   // We drop unsupported 1xx on the floor here. 101 upgrade headers need to be passed to the client
     269             :   // as part of the final response. Most 1xx headers are handled in onUpstream1xxHeaders.
     270             :   //
     271             :   // We could in principle handle other headers here, but this might result in the double invocation
     272             :   // of decodeHeaders() (once for informational, again for non-informational), which is likely an
     273             :   // easy to miss corner case in the filter and HCM contract.
     274             :   //
     275             :   // This filtering is done early in upstream request, unlike 100 coalescing which is performed in
     276             :   // the router filter, since the filtering only depends on the state of a single upstream, and we
     277             :   // don't want to confuse accounting such as onFirstUpstreamRxByteReceived() with informational
     278             :   // headers.
     279         141 :   const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
     280         141 :   if (Http::CodeUtility::is1xx(response_code) &&
     281         141 :       response_code != enumToInt(Http::Code::SwitchingProtocols)) {
     282           0 :     return;
     283           0 :   }
     284             : 
     285         141 :   awaiting_headers_ = false;
     286         141 :   if (span_ != nullptr) {
     287           0 :     Tracing::HttpTracerUtility::onUpstreamResponseHeaders(*span_, headers.get());
     288           0 :   }
     289         141 :   if (!parent_.config().upstream_logs_.empty()) {
     290           0 :     upstream_headers_ = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(*headers);
     291           0 :   }
     292         141 :   stream_info_.setResponseCode(static_cast<uint32_t>(response_code));
     293             : 
     294         141 :   maybeHandleDeferredReadDisable();
     295         141 :   ASSERT(headers.get());
     296             : 
     297         141 :   parent_.onUpstreamHeaders(response_code, std::move(headers), *this, end_stream);
     298         141 : }
     299             : 
     300         141 : void UpstreamRequest::maybeHandleDeferredReadDisable() {
     301         141 :   for (; deferred_read_disabling_count_ > 0; --deferred_read_disabling_count_) {
     302             :     // If the deferred read disabling count hasn't been cancelled out by read
     303             :     // enabling count so far, stop the upstream from reading the rest response.
     304             :     // Because readDisable keeps track of how many time it is called with
     305             :     // "true" or "false", here it has to be called with "true" the same number
     306             :     // of times as it would be called with "false" in the future.
     307           0 :     parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
     308           0 :     upstream_->readDisable(true);
     309           0 :   }
     310         141 : }
     311             : 
     312         395 : void UpstreamRequest::decodeData(Buffer::Instance& data, bool end_stream) {
     313         395 :   ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
     314             : 
     315         395 :   resetPerTryIdleTimer();
     316         395 :   stream_info_.addBytesReceived(data.length());
     317         395 :   parent_.onUpstreamData(data, *this, end_stream);
     318         395 : }
     319             : 
     320           6 : void UpstreamRequest::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) {
     321           6 :   ENVOY_STREAM_LOG(trace, "upstream response trailers:\n{}", *parent_.callbacks(), *trailers);
     322           6 :   ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
     323             : 
     324           6 :   if (span_ != nullptr) {
     325           0 :     Tracing::HttpTracerUtility::onUpstreamResponseTrailers(*span_, trailers.get());
     326           0 :   }
     327           6 :   if (!parent_.config().upstream_logs_.empty()) {
     328           0 :     upstream_trailers_ = Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*trailers);
     329           0 :   }
     330           6 :   parent_.onUpstreamTrailers(std::move(trailers), *this);
     331           6 : }
     332             : 
     333           0 : void UpstreamRequest::dumpState(std::ostream& os, int indent_level) const {
     334           0 :   const char* spaces = spacesForLevel(indent_level);
     335           0 :   os << spaces << "UpstreamRequest " << this << "\n";
     336           0 :   if (connection()) {
     337           0 :     const auto addressProvider = connection()->connectionInfoProviderSharedPtr();
     338           0 :     DUMP_DETAILS(addressProvider);
     339           0 :   }
     340           0 :   const Http::RequestHeaderMap* request_headers = parent_.downstreamHeaders();
     341           0 :   DUMP_DETAILS(request_headers);
     342           0 :   if (filter_manager_) {
     343           0 :     filter_manager_->dumpState(os, indent_level);
     344           0 :   }
     345           0 : }
     346             : 
     347         202 : const Route& UpstreamRequest::route() const { return *parent_.callbacks()->route(); }
     348             : 
     349         251 : OptRef<const Network::Connection> UpstreamRequest::connection() const {
     350         251 :   return parent_.callbacks()->connection();
     351         251 : }
     352             : 
     353           2 : void UpstreamRequest::decodeMetadata(Http::MetadataMapPtr&& metadata_map) {
     354           2 :   parent_.onUpstreamMetadata(std::move(metadata_map));
     355           2 : }
     356             : 
     357           0 : void UpstreamRequest::maybeEndDecode(bool end_stream) {
     358           0 :   if (end_stream) {
     359           0 :     upstreamTiming().onLastUpstreamRxByteReceived(parent_.callbacks()->dispatcher().timeSource());
     360           0 :   }
     361           0 : }
     362             : 
     363             : void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
     364         208 :                                              bool pool_success) {
     365         208 :   StreamInfo::UpstreamInfo& upstream_info = *streamInfo().upstreamInfo();
     366         208 :   upstream_info.setUpstreamHost(host);
     367         208 :   upstream_host_ = host;
     368         208 :   parent_.onUpstreamHostSelected(host, pool_success);
     369         208 : }
     370             : 
     371         251 : void UpstreamRequest::acceptHeadersFromRouter(bool end_stream) {
     372         251 :   ASSERT(!router_sent_end_stream_);
     373         251 :   router_sent_end_stream_ = end_stream;
     374             : 
     375             :   // Make sure that when we are forwarding CONNECT payload we do not do so until
     376             :   // the upstream has accepted the CONNECT request.
     377             :   // This must be done before conn_pool->newStream, as onPoolReady un-pauses for CONNECT
     378             :   // termination.
     379         251 :   auto* headers = parent_.downstreamHeaders();
     380         251 :   if (headers->getMethodValue() == Http::Headers::get().MethodValues.Connect) {
     381           0 :     paused_for_connect_ = true;
     382           0 :   }
     383             : 
     384             :   // Kick off creation of the upstream connection immediately upon receiving headers.
     385             :   // In future it may be possible for upstream HTTP filters to delay this, or influence connection
     386             :   // creation but for now optimize for minimal latency and fetch the connection
     387             :   // as soon as possible.
     388         251 :   conn_pool_->newStream(this);
     389             : 
     390         251 :   if (parent_.config().upstream_log_flush_interval_.has_value()) {
     391           0 :     upstream_log_flush_timer_ = parent_.callbacks()->dispatcher().createTimer([this]() -> void {
     392             :       // If the request is complete, we've already done the stream-end upstream log, and shouldn't
     393             :       // do the periodic log.
     394           0 :       if (!streamInfo().requestComplete().has_value()) {
     395           0 :         upstreamLog(AccessLog::AccessLogType::UpstreamPeriodic);
     396           0 :         resetUpstreamLogFlushTimer();
     397           0 :       }
     398             :       // Both downstream and upstream bytes meters may not be initialized when
     399             :       // the timer goes off, e.g. if it takes longer than the interval for a
     400             :       // connection to be initialized; check for nullptr.
     401           0 :       auto& downstream_bytes_meter = stream_info_.getDownstreamBytesMeter();
     402           0 :       auto& upstream_bytes_meter = stream_info_.getUpstreamBytesMeter();
     403           0 :       const SystemTime now = parent_.callbacks()->dispatcher().timeSource().systemTime();
     404           0 :       if (downstream_bytes_meter) {
     405           0 :         downstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now);
     406           0 :       }
     407           0 :       if (upstream_bytes_meter) {
     408           0 :         upstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now);
     409           0 :       }
     410           0 :     });
     411             : 
     412           0 :     resetUpstreamLogFlushTimer();
     413           0 :   }
     414             : 
     415         251 :   filter_manager_->requestHeadersInitialized();
     416         251 :   filter_manager_->streamInfo().setRequestHeaders(*parent_.downstreamHeaders());
     417         251 :   filter_manager_->decodeHeaders(*parent_.downstreamHeaders(), end_stream);
     418         251 : }
     419             : 
     420         596 : void UpstreamRequest::acceptDataFromRouter(Buffer::Instance& data, bool end_stream) {
     421         596 :   ASSERT(!router_sent_end_stream_);
     422         596 :   router_sent_end_stream_ = end_stream;
     423             : 
     424         596 :   filter_manager_->decodeData(data, end_stream);
     425         596 : }
     426             : 
     427           0 : void UpstreamRequest::acceptTrailersFromRouter(Http::RequestTrailerMap& trailers) {
     428           0 :   ASSERT(!router_sent_end_stream_);
     429           0 :   router_sent_end_stream_ = true;
     430           0 :   encode_trailers_ = true;
     431             : 
     432           0 :   filter_manager_->decodeTrailers(trailers);
     433           0 : }
     434             : 
     435           0 : void UpstreamRequest::acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr) {
     436           0 :   filter_manager_->decodeMetadata(*metadata_map_ptr);
     437           0 : }
     438             : 
     439             : void UpstreamRequest::onResetStream(Http::StreamResetReason reason,
     440          86 :                                     absl::string_view transport_failure_reason) {
     441          86 :   ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
     442             : 
     443          86 :   if (span_ != nullptr) {
     444             :     // Add tags about reset.
     445           0 :     span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
     446           0 :     span_->setTag(Tracing::Tags::get().ErrorReason, Http::Utility::resetReasonToString(reason));
     447           0 :   }
     448          86 :   clearRequestEncoder();
     449          86 :   awaiting_headers_ = false;
     450          86 :   if (!calling_encode_headers_) {
     451          86 :     stream_info_.setResponseFlag(Filter::streamResetReasonToResponseFlag(reason));
     452          86 :     parent_.onUpstreamReset(reason, transport_failure_reason, *this);
     453          86 :   } else {
     454           0 :     deferred_reset_reason_ = reason;
     455           0 :   }
     456          86 : }
     457             : 
     458          92 : void UpstreamRequest::resetStream() {
     459          92 :   if (conn_pool_->cancelAnyPendingStream()) {
     460          43 :     ENVOY_STREAM_LOG(debug, "canceled pool request", *parent_.callbacks());
     461          43 :     ASSERT(!upstream_);
     462          43 :   }
     463             : 
     464             :   // Don't reset the stream if we're already done with it.
     465          92 :   if (upstreamTiming().last_upstream_tx_byte_sent_.has_value() &&
     466          92 :       upstreamTiming().last_upstream_rx_byte_received_.has_value()) {
     467           0 :     return;
     468           0 :   }
     469             : 
     470          92 :   if (span_ != nullptr) {
     471             :     // Add tags about the cancellation.
     472           0 :     span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True);
     473           0 :   }
     474             : 
     475          92 :   if (upstream_) {
     476          49 :     ENVOY_STREAM_LOG(debug, "resetting pool request", *parent_.callbacks());
     477          49 :     upstream_->resetStream();
     478          49 :     clearRequestEncoder();
     479          49 :   }
     480          92 :   reset_stream_ = true;
     481          92 : }
     482             : 
     483         536 : void UpstreamRequest::resetPerTryIdleTimer() {
     484         536 :   if (per_try_idle_timeout_ != nullptr) {
     485           0 :     per_try_idle_timeout_->enableTimer(parent_.timeout().per_try_idle_timeout_);
     486           0 :   }
     487         536 : }
     488             : 
     489           0 : void UpstreamRequest::resetUpstreamLogFlushTimer() {
     490           0 :   if (upstream_log_flush_timer_ != nullptr) {
     491           0 :     upstream_log_flush_timer_->enableTimer(parent_.config().upstream_log_flush_interval_.value());
     492           0 :   }
     493           0 : }
     494             : 
     495         147 : void UpstreamRequest::setupPerTryTimeout() {
     496         147 :   ASSERT(!per_try_timeout_);
     497         147 :   if (parent_.timeout().per_try_timeout_.count() > 0) {
     498           0 :     per_try_timeout_ =
     499           0 :         parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryTimeout(); });
     500           0 :     per_try_timeout_->enableTimer(parent_.timeout().per_try_timeout_);
     501           0 :   }
     502             : 
     503         147 :   ASSERT(!per_try_idle_timeout_);
     504         147 :   if (parent_.timeout().per_try_idle_timeout_.count() > 0) {
     505           0 :     per_try_idle_timeout_ =
     506           0 :         parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryIdleTimeout(); });
     507           0 :     resetPerTryIdleTimer();
     508           0 :   }
     509         147 : }
     510             : 
     511           0 : void UpstreamRequest::onPerTryIdleTimeout() {
     512           0 :   ENVOY_STREAM_LOG(debug, "upstream per try idle timeout", *parent_.callbacks());
     513           0 :   stream_info_.setResponseFlag(StreamInfo::ResponseFlag::StreamIdleTimeout);
     514           0 :   parent_.onPerTryIdleTimeout(*this);
     515           0 : }
     516             : 
     517           0 : void UpstreamRequest::onPerTryTimeout() {
     518             :   // If we've sent anything downstream, ignore the per try timeout and let the response continue
     519             :   // up to the global timeout
     520           0 :   if (!parent_.downstreamResponseStarted()) {
     521           0 :     ENVOY_STREAM_LOG(debug, "upstream per try timeout", *parent_.callbacks());
     522             : 
     523           0 :     stream_info_.setResponseFlag(StreamInfo::ResponseFlag::UpstreamRequestTimeout);
     524           0 :     parent_.onPerTryTimeout(*this);
     525           0 :   } else {
     526           0 :     ENVOY_STREAM_LOG(debug,
     527           0 :                      "ignored upstream per try timeout due to already started downstream response",
     528           0 :                      *parent_.callbacks());
     529           0 :   }
     530           0 : }
     531             : 
     532         208 : void UpstreamRequest::recordConnectionPoolCallbackLatency() {
     533         208 :   upstreamTiming().recordConnectionPoolCallbackLatency(
     534         208 :       start_time_, parent_.callbacks()->dispatcher().timeSource());
     535         208 : }
     536             : 
     537             : void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason,
     538             :                                     absl::string_view transport_failure_reason,
     539           6 :                                     Upstream::HostDescriptionConstSharedPtr host) {
     540           6 :   recordConnectionPoolCallbackLatency();
     541           6 :   Http::StreamResetReason reset_reason = [](ConnectionPool::PoolFailureReason reason) {
     542           6 :     switch (reason) {
     543           0 :     case ConnectionPool::PoolFailureReason::Overflow:
     544           0 :       return Http::StreamResetReason::Overflow;
     545           6 :     case ConnectionPool::PoolFailureReason::RemoteConnectionFailure:
     546           6 :       return Http::StreamResetReason::RemoteConnectionFailure;
     547           0 :     case ConnectionPool::PoolFailureReason::LocalConnectionFailure:
     548           0 :       return Http::StreamResetReason::LocalConnectionFailure;
     549           0 :     case ConnectionPool::PoolFailureReason::Timeout:
     550           0 :       return Http::StreamResetReason::ConnectionTimeout;
     551           6 :     }
     552           0 :     PANIC_DUE_TO_CORRUPT_ENUM;
     553           0 :   }(reason);
     554             : 
     555           6 :   stream_info_.upstreamInfo()->setUpstreamTransportFailureReason(transport_failure_reason);
     556             : 
     557             :   // Mimic an upstream reset.
     558           6 :   onUpstreamHostSelected(host, false);
     559           6 :   onResetStream(reset_reason, transport_failure_reason);
     560           6 : }
     561             : 
     562             : void UpstreamRequest::onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
     563             :                                   Upstream::HostDescriptionConstSharedPtr host,
     564             :                                   const Network::ConnectionInfoProvider& address_provider,
     565             :                                   StreamInfo::StreamInfo& info,
     566         202 :                                   absl::optional<Http::Protocol> protocol) {
     567             :   // This may be called under an existing ScopeTrackerScopeState but it will unwind correctly.
     568         202 :   ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
     569         202 :   ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks());
     570         202 :   recordConnectionPoolCallbackLatency();
     571         202 :   upstream_ = std::move(upstream);
     572         202 :   had_upstream_ = true;
     573             :   // Have the upstream use the account of the downstream.
     574         202 :   upstream_->setAccount(parent_.callbacks()->account());
     575             : 
     576         202 :   host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess);
     577             : 
     578         202 :   onUpstreamHostSelected(host, true);
     579             : 
     580         202 :   if (protocol) {
     581         202 :     stream_info_.protocol(protocol.value());
     582         202 :   } else {
     583             :     // We only pause for CONNECT for HTTP upstreams. If this is a TCP upstream, unpause.
     584           0 :     paused_for_connect_ = false;
     585           0 :   }
     586             : 
     587         202 :   StreamInfo::UpstreamInfo& upstream_info = *stream_info_.upstreamInfo();
     588         202 :   if (info.upstreamInfo()) {
     589         202 :     auto& upstream_timing = info.upstreamInfo()->upstreamTiming();
     590         202 :     upstreamTiming().upstream_connect_start_ = upstream_timing.upstream_connect_start_;
     591         202 :     upstreamTiming().upstream_connect_complete_ = upstream_timing.upstream_connect_complete_;
     592         202 :     upstreamTiming().upstream_handshake_complete_ = upstream_timing.upstream_handshake_complete_;
     593         202 :     upstream_info.setUpstreamNumStreams(info.upstreamInfo()->upstreamNumStreams());
     594         202 :   }
     595             : 
     596             :   // Upstream HTTP filters might have already created/set a filter state.
     597         202 :   const StreamInfo::FilterStateSharedPtr& filter_state = info.filterState();
     598         202 :   if (!filter_state) {
     599           0 :     upstream_info.setUpstreamFilterState(
     600           0 :         std::make_shared<StreamInfo::FilterStateImpl>(StreamInfo::FilterState::LifeSpan::Request));
     601         202 :   } else {
     602         202 :     upstream_info.setUpstreamFilterState(filter_state);
     603         202 :   }
     604         202 :   upstream_info.setUpstreamLocalAddress(address_provider.localAddress());
     605         202 :   upstream_info.setUpstreamRemoteAddress(address_provider.remoteAddress());
     606         202 :   upstream_info.setUpstreamSslConnection(info.downstreamAddressProvider().sslConnection());
     607             : 
     608         202 :   if (info.downstreamAddressProvider().connectionID().has_value()) {
     609         202 :     upstream_info.setUpstreamConnectionId(info.downstreamAddressProvider().connectionID().value());
     610         202 :   }
     611             : 
     612         202 :   if (info.downstreamAddressProvider().interfaceName().has_value()) {
     613           0 :     upstream_info.setUpstreamInterfaceName(
     614           0 :         info.downstreamAddressProvider().interfaceName().value());
     615           0 :   }
     616             : 
     617         202 :   stream_info_.setUpstreamBytesMeter(upstream_->bytesMeter());
     618         202 :   StreamInfo::StreamInfo::syncUpstreamAndDownstreamBytesMeter(parent_.callbacks()->streamInfo(),
     619         202 :                                                               stream_info_);
     620         202 :   if (protocol) {
     621         202 :     upstream_info.setUpstreamProtocol(protocol.value());
     622         202 :   }
     623             : 
     624         202 :   if (parent_.downstreamEndStream()) {
     625          89 :     setupPerTryTimeout();
     626         185 :   } else {
     627         113 :     create_per_try_timeout_on_request_complete_ = true;
     628         113 :   }
     629             : 
     630             :   // Make sure the connection manager will inform the downstream watermark manager when the
     631             :   // downstream buffers are overrun. This may result in immediate watermark callbacks referencing
     632             :   // the encoder.
     633         202 :   parent_.callbacks()->addDownstreamWatermarkCallbacks(downstream_watermark_manager_);
     634             : 
     635         202 :   absl::optional<std::chrono::milliseconds> max_stream_duration;
     636         202 :   if (parent_.dynamicMaxStreamDuration().has_value()) {
     637           0 :     max_stream_duration = parent_.dynamicMaxStreamDuration().value();
     638         202 :   } else if (upstream_host_->cluster().commonHttpProtocolOptions().has_max_stream_duration()) {
     639           0 :     max_stream_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
     640           0 :         upstream_host_->cluster().commonHttpProtocolOptions().max_stream_duration()));
     641           0 :   }
     642         202 :   if (max_stream_duration.has_value() && max_stream_duration->count()) {
     643           0 :     max_stream_duration_timer_ = parent_.callbacks()->dispatcher().createTimer(
     644           0 :         [this]() -> void { onStreamMaxDurationReached(); });
     645           0 :     max_stream_duration_timer_->enableTimer(*max_stream_duration);
     646           0 :   }
     647             : 
     648         202 :   const auto* route_entry = route().routeEntry();
     649         202 :   if (route_entry->autoHostRewrite() && !host->hostname().empty()) {
     650           0 :     Http::Utility::updateAuthority(*parent_.downstreamHeaders(), host->hostname(),
     651           0 :                                    route_entry->appendXfh());
     652           0 :   }
     653             : 
     654         202 :   stream_info_.setRequestHeaders(*parent_.downstreamHeaders());
     655             : 
     656         202 :   if (parent_.config().flush_upstream_log_on_upstream_stream_) {
     657           0 :     upstreamLog(AccessLog::AccessLogType::UpstreamPoolReady);
     658           0 :   }
     659             : 
     660         202 :   if (address_provider.connectionID() && stream_info_.downstreamAddressProvider().connectionID()) {
     661           0 :     ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
     662           0 :               address_provider.connectionID().value(),
     663           0 :               stream_info_.downstreamAddressProvider().connectionID().value());
     664           0 :   }
     665             : 
     666         202 :   for (auto* callback : upstream_callbacks_) {
     667         202 :     callback->onUpstreamConnectionEstablished();
     668         202 :   }
     669         202 : }
     670             : 
     671         704 : UpstreamToDownstream& UpstreamRequest::upstreamToDownstream() { return *upstream_interface_; }
     672             : 
     673           0 : void UpstreamRequest::onStreamMaxDurationReached() {
     674           0 :   upstream_host_->cluster().trafficStats()->upstream_rq_max_duration_reached_.inc();
     675             : 
     676             :   // The upstream had closed then try to retry along with retry policy.
     677           0 :   parent_.onStreamMaxDurationReached(*this);
     678           0 : }
     679             : 
     680         386 : void UpstreamRequest::clearRequestEncoder() {
     681             :   // Before clearing the encoder, unsubscribe from callbacks.
     682         386 :   if (upstream_) {
     683         202 :     parent_.callbacks()->removeDownstreamWatermarkCallbacks(downstream_watermark_manager_);
     684         202 :   }
     685         386 :   upstream_.reset();
     686         386 : }
     687             : 
     688          32 : void UpstreamRequest::readDisableOrDefer(bool disable) {
     689          32 :   if (!upstream_wait_for_response_headers_before_disabling_read_) {
     690           0 :     if (disable) {
     691           0 :       parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
     692           0 :       upstream_->readDisable(true);
     693           0 :     } else {
     694           0 :       parent_.cluster()->trafficStats()->upstream_flow_control_resumed_reading_total_.inc();
     695           0 :       upstream_->readDisable(false);
     696           0 :     }
     697           0 :     return;
     698           0 :   }
     699             : 
     700          32 :   if (disable) {
     701             :     // See comments on deferred_read_disabling_count_ for when we do and don't defer.
     702          16 :     if (parent_.downstreamResponseStarted()) {
     703             :       // The downstream connection is overrun. Pause reads from upstream.
     704             :       // If there are multiple calls to readDisable either the codec (H2) or the
     705             :       // underlying Network::Connection (H1) will handle reference counting.
     706          16 :       parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
     707          16 :       upstream_->readDisable(disable);
     708          16 :     } else {
     709           0 :       ++deferred_read_disabling_count_;
     710           0 :     }
     711          16 :     return;
     712          16 :   }
     713             : 
     714             :   // One source of connection blockage has buffer available.
     715          16 :   if (deferred_read_disabling_count_ > 0) {
     716           0 :     ASSERT(!parent_.downstreamResponseStarted());
     717             :     // Cancel out an existing deferred read disabling.
     718           0 :     --deferred_read_disabling_count_;
     719           0 :     return;
     720           0 :   }
     721          16 :   ASSERT(parent_.downstreamResponseStarted());
     722             :   // Pass this on to the stream, which
     723             :   // will resume reads if this was the last remaining high watermark.
     724          16 :   parent_.cluster()->trafficStats()->upstream_flow_control_resumed_reading_total_.inc();
     725          16 :   upstream_->readDisable(disable);
     726          16 : }
     727             : 
     728          16 : void UpstreamRequest::DownstreamWatermarkManager::onAboveWriteBufferHighWatermark() {
     729          16 :   ASSERT(parent_.upstream_);
     730          16 :   parent_.readDisableOrDefer(true);
     731          16 : }
     732             : 
     733          16 : void UpstreamRequest::DownstreamWatermarkManager::onBelowWriteBufferLowWatermark() {
     734          16 :   ASSERT(parent_.upstream_);
     735          16 :   parent_.readDisableOrDefer(false);
     736          16 : }
     737             : 
     738           0 : void UpstreamRequest::disableDataFromDownstreamForFlowControl() {
     739           0 :   parent_.cluster()->trafficStats()->upstream_flow_control_backed_up_total_.inc();
     740           0 :   parent_.callbacks()->onDecoderFilterAboveWriteBufferHighWatermark();
     741           0 :   ++downstream_data_disabled_;
     742           0 : }
     743             : 
     744           0 : void UpstreamRequest::enableDataFromDownstreamForFlowControl() {
     745           0 :   parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc();
     746           0 :   parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
     747           0 :   ASSERT(downstream_data_disabled_ != 0);
     748           0 :   if (downstream_data_disabled_ > 0) {
     749           0 :     --downstream_data_disabled_;
     750           0 :   }
     751           0 : }
     752             : 
     753         626 : Http::RequestHeaderMapOptRef UpstreamRequestFilterManagerCallbacks::requestHeaders() {
     754         626 :   return {*upstream_request_.parent_.downstreamHeaders()};
     755         626 : }
     756             : 
     757        1436 : Http::RequestTrailerMapOptRef UpstreamRequestFilterManagerCallbacks::requestTrailers() {
     758        1436 :   if (upstream_request_.parent_.downstreamTrailers()) {
     759           0 :     return {*upstream_request_.parent_.downstreamTrailers()};
     760           0 :   }
     761        1436 :   if (trailers_) {
     762           0 :     return {*trailers_};
     763           0 :   }
     764        1436 :   return {};
     765        1436 : }
     766             : 
     767           0 : const ScopeTrackedObject& UpstreamRequestFilterManagerCallbacks::scope() {
     768           0 :   return upstream_request_.parent_.callbacks()->scope();
     769           0 : }
     770             : 
     771           0 : OptRef<const Tracing::Config> UpstreamRequestFilterManagerCallbacks::tracingConfig() const {
     772           0 :   return upstream_request_.parent_.callbacks()->tracingConfig();
     773           0 : }
     774             : 
     775           0 : Tracing::Span& UpstreamRequestFilterManagerCallbacks::activeSpan() {
     776           0 :   return upstream_request_.parent_.callbacks()->activeSpan();
     777           0 : }
     778             : 
     779             : void UpstreamRequestFilterManagerCallbacks::resetStream(
     780          80 :     Http::StreamResetReason reset_reason, absl::string_view transport_failure_reason) {
     781             :   // The filter manager needs to disambiguate between a filter-driven reset,
     782             :   // which should force reset the stream, and a codec driven reset, which should
     783             :   // tell the router the stream reset, and let the router make the decision to
     784             :   // send a local reply, or retry the stream.
     785          80 :   if (reset_reason == Http::StreamResetReason::LocalReset &&
     786          80 :       transport_failure_reason != "codec_error") {
     787           0 :     upstream_request_.parent_.callbacks()->resetStream();
     788           0 :     return;
     789           0 :   }
     790          80 :   return upstream_request_.onResetStream(reset_reason, transport_failure_reason);
     791          80 : }
     792             : 
     793           0 : Upstream::ClusterInfoConstSharedPtr UpstreamRequestFilterManagerCallbacks::clusterInfo() {
     794           0 :   return upstream_request_.parent_.callbacks()->clusterInfo();
     795           0 : }
     796             : 
     797             : Http::Http1StreamEncoderOptionsOptRef
     798           0 : UpstreamRequestFilterManagerCallbacks::http1StreamEncoderOptions() {
     799           0 :   return upstream_request_.parent_.callbacks()->http1StreamEncoderOptions();
     800           0 : }
     801             : 
     802             : } // namespace Router
     803             : } // namespace Envoy

Generated by: LCOV version 1.15