LCOV - code coverage report
Current view: top level - source/common/router - upstream_request.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 68 116 58.6 %
Date: 2024-01-05 06:35:25 Functions: 31 55 56.4 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <chrono>
       4             : #include <cstdint>
       5             : #include <functional>
       6             : #include <memory>
       7             : #include <string>
       8             : 
       9             : #include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h"
      10             : #include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.validate.h"
      11             : #include "envoy/http/codec.h"
      12             : #include "envoy/http/codes.h"
      13             : #include "envoy/http/conn_pool.h"
      14             : #include "envoy/http/filter.h"
      15             : #include "envoy/stats/scope.h"
      16             : #include "envoy/tcp/conn_pool.h"
      17             : 
      18             : #include "source/common/buffer/watermark_buffer.h"
      19             : #include "source/common/common/cleanup.h"
      20             : #include "source/common/common/hash.h"
      21             : #include "source/common/common/hex.h"
      22             : #include "source/common/common/linked_object.h"
      23             : #include "source/common/common/logger.h"
      24             : #include "source/common/config/well_known_names.h"
      25             : #include "source/common/http/filter_manager.h"
      26             : #include "source/common/stream_info/stream_info_impl.h"
      27             : #include "source/common/tracing/null_span_impl.h"
      28             : #include "source/extensions/filters/http/common/factory_base.h"
      29             : 
      30             : namespace Envoy {
      31             : namespace Router {
      32             : 
      33             : class GenericUpstream;
      34             : class GenericConnectionPoolCallbacks;
      35             : class RouterFilterInterface;
      36             : class UpstreamRequest;
      37             : class UpstreamRequestFilterManagerCallbacks;
      38             : class UpstreamFilterManager;
      39             : class UpstreamCodecFilter;
      40             : 
      41             : /* The Upstream request is the base class for forwarding HTTP upstream.
      42             :  *
      43             :  * On the new request path, payload (headers/body/metadata/data) still arrives via
      44             :  * the accept[X]fromRouter functions. Said data is immediately passed off to the
      45             :  * UpstreamFilterManager, which passes each item through the filter chain until
      46             :  * it arrives at the last filter in the chain, the UpstreamCodecFilter. If the upstream
      47             :  * stream is not established, the UpstreamCodecFilter returns StopAllIteration, and the
      48             :  * FilterManager will buffer data, using watermarks to push back to the router
      49             :  * filter if buffers become overrun. When an upstream connection is established,
      50             :  * the UpstreamCodecFilter will send data upstream.
      51             :  *
      52             :  * On the new response path, payload arrives from upstream via the UpstreamCodecFilter's
      53             :  * CodecBridge. It is passed off directly to the FilterManager, traverses the
      54             :  * filter chain, and completion is signaled via the
      55             :  * UpstreamRequestFilterManagerCallbacks's encode[X] functions. These somewhat
      56             :  * confusingly pass through the UpstreamRequest's legacy decode[X] functions
      57             :  * (required due to the UpstreamToDownstream interface, but will be renamed once
      58             :  * the classic mode is deprecated), and are finally passed to the router via the
      59             :  * RouterFilterInterface onUpstream[X] functions.
      60             :  *
      61             :  * There is some required communication between the UpstreamRequest and
      62             :  * UpstreamCodecFilter. This is accomplished via the UpstreamStreamFilterCallbacks
      63             :  * interface, with the UpstreamFilterManager acting as intermediary.
      64             :  */
      65             : class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
      66             :                         public UpstreamToDownstream,
      67             :                         public LinkedObject<UpstreamRequest>,
      68             :                         public GenericConnectionPoolCallbacks,
      69             :                         public Event::DeferredDeletable {
      70             : public:
      71             :   UpstreamRequest(RouterFilterInterface& parent, std::unique_ptr<GenericConnPool>&& conn_pool,
      72             :                   bool can_send_early_data, bool can_use_http3);
      73             :   ~UpstreamRequest() override;
      74         251 :   void deleteIsPending() override { cleanUp(); }
      75             : 
      76             :   // To be called from the destructor, or prior to deferred delete.
      77             :   void cleanUp();
      78             : 
      79             :   void acceptHeadersFromRouter(bool end_stream);
      80             :   void acceptDataFromRouter(Buffer::Instance& data, bool end_stream);
      81             :   void acceptTrailersFromRouter(Http::RequestTrailerMap& trailers);
      82             :   void acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr);
      83             : 
      84             :   void resetStream();
      85             :   void setupPerTryTimeout();
      86             :   void maybeEndDecode(bool end_stream);
      87             :   void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host, bool pool_success);
      88             : 
      89             :   // Http::StreamDecoder
      90             :   void decodeData(Buffer::Instance& data, bool end_stream) override;
      91             :   void decodeMetadata(Http::MetadataMapPtr&& metadata_map) override;
      92             : 
      93             :   // UpstreamToDownstream (Http::ResponseDecoder)
      94             :   void decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) override;
      95             :   void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override;
      96             :   void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override;
      97             :   void dumpState(std::ostream& os, int indent_level) const override;
      98             : 
      99             :   // UpstreamToDownstream (Http::StreamCallbacks)
     100             :   void onResetStream(Http::StreamResetReason reason,
     101             :                      absl::string_view transport_failure_reason) override;
     102           0 :   void onAboveWriteBufferHighWatermark() override { disableDataFromDownstreamForFlowControl(); }
     103           0 :   void onBelowWriteBufferLowWatermark() override { enableDataFromDownstreamForFlowControl(); }
     104             :   // UpstreamToDownstream
     105             :   const Route& route() const override;
     106             :   OptRef<const Network::Connection> connection() const override;
     107         251 :   const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override {
     108         251 :     return stream_options_;
     109         251 :   }
     110             : 
     111             :   void disableDataFromDownstreamForFlowControl();
     112             :   void enableDataFromDownstreamForFlowControl();
     113             : 
     114             :   // GenericConnPool
     115             :   void onPoolFailure(ConnectionPool::PoolFailureReason reason,
     116             :                      absl::string_view transport_failure_reason,
     117             :                      Upstream::HostDescriptionConstSharedPtr host) override;
     118             :   void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
     119             :                    Upstream::HostDescriptionConstSharedPtr host,
     120             :                    const Network::ConnectionInfoProvider& address_provider,
     121             :                    StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) override;
     122             :   UpstreamToDownstream& upstreamToDownstream() override;
     123             : 
     124             :   void clearRequestEncoder();
     125             :   void onStreamMaxDurationReached();
     126             : 
     127             :   // Either disable upstream reading immediately or defer it and keep tracking
     128             :   // of how many read disabling has happened.
     129             :   void readDisableOrDefer(bool disable);
     130             :   // Called upon receiving the first response headers from the upstream. And
     131             :   // applies read disabling to it if there is any pending read disabling.
     132             :   void maybeHandleDeferredReadDisable();
     133             : 
     134             :   struct DownstreamWatermarkManager : public Http::DownstreamWatermarkCallbacks {
     135         251 :     DownstreamWatermarkManager(UpstreamRequest& parent) : parent_(parent) {}
     136             : 
     137             :     // Http::DownstreamWatermarkCallbacks
     138             :     void onBelowWriteBufferLowWatermark() override;
     139             :     void onAboveWriteBufferHighWatermark() override;
     140             : 
     141             :     UpstreamRequest& parent_;
     142             :   };
     143             : 
     144             :   void readEnable();
     145             :   void encodeBodyAndTrailers();
     146             : 
     147             :   // Getters and setters
     148         925 :   Upstream::HostDescriptionConstSharedPtr& upstreamHost() { return upstream_host_; }
     149           0 :   void outlierDetectionTimeoutRecorded(bool recorded) {
     150           0 :     outlier_detection_timeout_recorded_ = recorded;
     151           0 :   }
     152           0 :   bool outlierDetectionTimeoutRecorded() { return outlier_detection_timeout_recorded_; }
     153           0 :   void retried(bool value) { retried_ = value; }
     154           0 :   bool retried() { return retried_; }
     155         105 :   bool grpcRqSuccessDeferred() { return grpc_rq_success_deferred_; }
     156          49 :   void grpcRqSuccessDeferred(bool deferred) { grpc_rq_success_deferred_ = deferred; }
     157         141 :   void upstreamCanary(bool value) { upstream_canary_ = value; }
     158          73 :   bool upstreamCanary() { return upstream_canary_; }
     159           0 :   bool awaitingHeaders() { return awaiting_headers_; }
     160           0 :   void recordTimeoutBudget(bool value) { record_timeout_budget_ = value; }
     161         196 :   bool createPerTryTimeoutOnRequestComplete() {
     162         196 :     return create_per_try_timeout_on_request_complete_;
     163         196 :   }
     164           0 :   bool encodeComplete() const { return router_sent_end_stream_; }
     165             :   // Exposes streamInfo for the upstream stream.
     166        1445 :   StreamInfo::StreamInfo& streamInfo() { return stream_info_; }
     167           0 :   bool hadUpstream() const { return had_upstream_; }
     168             : 
     169             : private:
     170             :   friend class UpstreamFilterManager;
     171             :   friend class UpstreamCodecFilter;
     172             :   friend class UpstreamRequestFilterManagerCallbacks;
     173         930 :   StreamInfo::UpstreamTiming& upstreamTiming() {
     174         930 :     return stream_info_.upstreamInfo()->upstreamTiming();
     175         930 :   }
     176             :   // Records the latency from when the upstream request was first created to
     177             :   // when the pool callback fires. This latency can be useful to track excessive
     178             :   // queuing.
     179             :   void recordConnectionPoolCallbackLatency();
     180             : 
     181         141 :   void addResponseHeadersSize(uint64_t size) {
     182         141 :     response_headers_size_ = response_headers_size_.value_or(0) + size;
     183         141 :   }
     184             :   void resetPerTryIdleTimer();
     185             :   void onPerTryTimeout();
     186             :   void onPerTryIdleTimeout();
     187             :   void upstreamLog(AccessLog::AccessLogType access_log_type);
     188             :   void resetUpstreamLogFlushTimer();
     189             : 
     190             :   RouterFilterInterface& parent_;
     191             :   std::unique_ptr<GenericConnPool> conn_pool_;
     192             :   Event::TimerPtr per_try_timeout_;
     193             :   Event::TimerPtr per_try_idle_timeout_;
     194             :   std::unique_ptr<GenericUpstream> upstream_;
     195             :   absl::optional<Http::StreamResetReason> deferred_reset_reason_;
     196             :   Upstream::HostDescriptionConstSharedPtr upstream_host_;
     197             :   DownstreamWatermarkManager downstream_watermark_manager_{*this};
     198             :   Tracing::SpanPtr span_;
     199             :   StreamInfo::StreamInfoImpl stream_info_;
     200             :   const MonotonicTime start_time_;
     201             :   // This is wrapped in an optional, since we want to avoid computing zero size headers when in
     202             :   // reality we just didn't get a response back.
     203             :   absl::optional<uint64_t> response_headers_size_{};
     204             :   // Copies of upstream headers/trailers. These are only set if upstream
     205             :   // access logging is configured.
     206             :   Http::ResponseHeaderMapPtr upstream_headers_;
     207             :   Http::ResponseTrailerMapPtr upstream_trailers_;
     208             :   OptRef<UpstreamToDownstream> upstream_interface_;
     209             :   std::list<Http::UpstreamCallbacks*> upstream_callbacks_;
     210             : 
     211             :   Event::TimerPtr max_stream_duration_timer_;
     212             : 
     213             :   // Per-stream access log flush duration. This timer is enabled once when the stream is created
     214             :   // and will log to all access logs once per trigger.
     215             :   Event::TimerPtr upstream_log_flush_timer_;
     216             : 
     217             :   std::unique_ptr<UpstreamRequestFilterManagerCallbacks> filter_manager_callbacks_;
     218             :   std::unique_ptr<Http::FilterManager> filter_manager_;
     219             : 
     220             :   // The number of outstanding readDisable to be called with parameter value true.
     221             :   // When downstream send buffers get above high watermark before response headers arrive, we
     222             :   // increment this counter instead of immediately calling readDisable on upstream stream. This is
     223             :   // to avoid the upstream request from being spuriously retried or reset because of upstream
     224             :   // timeouts while upstream stream is readDisabled by downstream but the response has actually
     225             :   // arrived from upstream. See https://github.com/envoyproxy/envoy/issues/25901. During the
     226             :   // deferring period, if the downstream buffer gets below low watermark, this counter gets
     227             :   // decremented. Once the response headers arrive, call readDisable the number of times as the
     228             :   // remaining value of this counter.
     229             :   size_t deferred_read_disabling_count_{0};
     230             : 
     231             :   // Keep small members (bools and enums) at the end of class, to reduce alignment overhead.
     232             :   // Tracks the number of times the flow of data from downstream has been disabled.
     233             :   uint32_t downstream_data_disabled_{};
     234             :   bool calling_encode_headers_ : 1;
     235             :   bool upstream_canary_ : 1;
     236             :   bool router_sent_end_stream_ : 1;
     237             :   bool encode_trailers_ : 1;
     238             :   bool retried_ : 1;
     239             :   bool awaiting_headers_ : 1;
     240             :   bool outlier_detection_timeout_recorded_ : 1;
     241             :   // Tracks whether we deferred a per try timeout because the downstream request
     242             :   // had not been completed yet.
     243             :   bool create_per_try_timeout_on_request_complete_ : 1;
     244             :   // True if the CONNECT headers have been sent but proxying payload is paused
     245             :   // waiting for response headers.
     246             :   bool paused_for_connect_ : 1;
     247             :   bool reset_stream_ : 1;
     248             : 
     249             :   // Sentinel to indicate if timeout budget tracking is configured for the cluster,
     250             :   // and if so, if the per-try histogram should record a value.
     251             :   bool record_timeout_budget_ : 1;
     252             :   // Track if one time clean up has been performed.
     253             :   bool cleaned_up_ : 1;
     254             :   bool had_upstream_ : 1;
     255             :   Http::ConnectionPool::Instance::StreamOptions stream_options_;
     256             :   bool grpc_rq_success_deferred_ : 1;
     257             :   bool upstream_wait_for_response_headers_before_disabling_read_ : 1;
     258             : };
     259             : 
     260             : class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallbacks,
     261             :                                               public Event::DeferredDeletable,
     262             :                                               public Http::UpstreamStreamFilterCallbacks {
     263             : public:
     264             :   UpstreamRequestFilterManagerCallbacks(UpstreamRequest& upstream_request)
     265         251 :       : upstream_request_(upstream_request) {}
     266         141 :   void encodeHeaders(Http::ResponseHeaderMap&, bool end_stream) override {
     267         141 :     upstream_request_.decodeHeaders(std::move(response_headers_), end_stream);
     268         141 :   }
     269           0 :   void encode1xxHeaders(Http::ResponseHeaderMap&) override {
     270           0 :     upstream_request_.decode1xxHeaders(std::move(informational_headers_));
     271           0 :   }
     272         395 :   void encodeData(Buffer::Instance& data, bool end_stream) override {
     273         395 :     upstream_request_.decodeData(data, end_stream);
     274         395 :   }
     275           6 :   void encodeTrailers(Http::ResponseTrailerMap&) override {
     276           6 :     upstream_request_.decodeTrailers(std::move(response_trailers_));
     277           6 :   }
     278           2 :   void encodeMetadata(Http::MetadataMapPtr&& metadata) override {
     279           2 :     upstream_request_.decodeMetadata(std::move(metadata));
     280           2 :   }
     281           0 :   void setRequestTrailers(Http::RequestTrailerMapPtr&& request_trailers) override {
     282           0 :     trailers_ = std::move(request_trailers);
     283           0 :   }
     284           0 :   void setInformationalHeaders(Http::ResponseHeaderMapPtr&& response_headers) override {
     285           0 :     informational_headers_ = std::move(response_headers);
     286           0 :   }
     287         141 :   void setResponseHeaders(Http::ResponseHeaderMapPtr&& response_headers) override {
     288         141 :     response_headers_ = std::move(response_headers);
     289         141 :   }
     290           6 :   void setResponseTrailers(Http::ResponseTrailerMapPtr&& response_trailers) override {
     291           6 :     response_trailers_ = std::move(response_trailers);
     292           6 :   }
     293             :   Http::RequestHeaderMapOptRef requestHeaders() override;
     294             :   Http::RequestTrailerMapOptRef requestTrailers() override;
     295           0 :   Http::ResponseHeaderMapOptRef informationalHeaders() override {
     296           0 :     if (informational_headers_) {
     297           0 :       return {*informational_headers_};
     298           0 :     }
     299           0 :     return {};
     300           0 :   }
     301         141 :   Http::ResponseHeaderMapOptRef responseHeaders() override {
     302         141 :     if (response_headers_) {
     303         141 :       return {*response_headers_};
     304         141 :     }
     305           0 :     return {};
     306         141 :   }
     307         401 :   Http::ResponseTrailerMapOptRef responseTrailers() override {
     308         401 :     if (response_trailers_) {
     309           6 :       return {*response_trailers_};
     310           6 :     }
     311         395 :     return {};
     312         401 :   }
     313             :   // If the filter manager determines a decoder filter has available, tell
     314             :   // the router to resume the flow of data from downstream.
     315           0 :   void onDecoderFilterBelowWriteBufferLowWatermark() override {
     316           0 :     upstream_request_.onBelowWriteBufferLowWatermark();
     317           0 :   }
     318             :   // If the filter manager determines a decoder filter has too much data, tell
     319             :   // the router to stop the flow of data from downstream.
     320           0 :   void onDecoderFilterAboveWriteBufferHighWatermark() override {
     321           0 :     upstream_request_.onAboveWriteBufferHighWatermark();
     322           0 :   }
     323             : 
     324             :   // These functions are delegated to the downstream HCM/FM
     325             :   OptRef<const Tracing::Config> tracingConfig() const override;
     326             :   const ScopeTrackedObject& scope() override;
     327             :   Tracing::Span& activeSpan() override;
     328             :   void resetStream(Http::StreamResetReason reset_reason,
     329             :                    absl::string_view transport_failure_reason) override;
     330             :   Upstream::ClusterInfoConstSharedPtr clusterInfo() override;
     331             :   Http::Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override;
     332             : 
     333             :   // Intentional no-op functions.
     334           0 :   void onResponseDataTooLarge() override {}
     335           0 :   void onRequestDataTooLarge() override {}
     336          98 :   void endStream() override {}
     337         357 :   void disarmRequestTimeout() override {}
     338        1224 :   void resetIdleTimer() override {}
     339           0 :   void onLocalReply(Http::Code) override {}
     340             :   // Upgrade filter chains not supported.
     341           0 :   const Router::RouteEntry::UpgradeMap* upgradeMap() override { return nullptr; }
     342             : 
     343             :   // Unsupported functions.
     344           0 :   void recreateStream(StreamInfo::FilterStateSharedPtr) override {
     345           0 :     IS_ENVOY_BUG("recreateStream called from upstream HTTP filter");
     346           0 :   }
     347           0 :   void upgradeFilterChainCreated() override {
     348           0 :     IS_ENVOY_BUG("upgradeFilterChainCreated called from upstream HTTP filter");
     349           0 :   }
     350        3277 :   OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {*this}; }
     351             : 
     352             :   // Http::UpstreamStreamFilterCallbacks
     353        1096 :   StreamInfo::StreamInfo& upstreamStreamInfo() override { return upstream_request_.streamInfo(); }
     354        1085 :   OptRef<GenericUpstream> upstream() override {
     355        1085 :     return makeOptRefFromPtr(upstream_request_.upstream_.get());
     356        1085 :   }
     357           0 :   void dumpState(std::ostream& os, int indent_level = 0) const override {
     358           0 :     upstream_request_.dumpState(os, indent_level);
     359           0 :   }
     360         343 :   bool pausedForConnect() const override { return upstream_request_.paused_for_connect_; }
     361           0 :   void setPausedForConnect(bool value) override { upstream_request_.paused_for_connect_ = value; }
     362         251 :   const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override {
     363         251 :     return upstream_request_.upstreamStreamOptions();
     364         251 :   }
     365         251 :   void addUpstreamCallbacks(Http::UpstreamCallbacks& callbacks) override {
     366         251 :     upstream_request_.upstream_callbacks_.push_back(&callbacks);
     367         251 :   }
     368         251 :   void setUpstreamToDownstream(UpstreamToDownstream& upstream_to_downstream_interface) override {
     369         251 :     upstream_request_.upstream_interface_ = upstream_to_downstream_interface;
     370         251 :   }
     371             : 
     372             :   Http::RequestTrailerMapPtr trailers_;
     373             :   Http::ResponseHeaderMapPtr informational_headers_;
     374             :   Http::ResponseHeaderMapPtr response_headers_;
     375             :   Http::ResponseTrailerMapPtr response_trailers_;
     376             :   UpstreamRequest& upstream_request_;
     377             : };
     378             : 
     379             : } // namespace Router
     380             : } // namespace Envoy

Generated by: LCOV version 1.15