Line data Source code
1 : #pragma once 2 : 3 : #include <chrono> 4 : #include <cstdint> 5 : #include <functional> 6 : #include <memory> 7 : #include <string> 8 : 9 : #include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h" 10 : #include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.validate.h" 11 : #include "envoy/http/codec.h" 12 : #include "envoy/http/codes.h" 13 : #include "envoy/http/conn_pool.h" 14 : #include "envoy/http/filter.h" 15 : #include "envoy/stats/scope.h" 16 : #include "envoy/tcp/conn_pool.h" 17 : 18 : #include "source/common/buffer/watermark_buffer.h" 19 : #include "source/common/common/cleanup.h" 20 : #include "source/common/common/hash.h" 21 : #include "source/common/common/hex.h" 22 : #include "source/common/common/linked_object.h" 23 : #include "source/common/common/logger.h" 24 : #include "source/common/config/well_known_names.h" 25 : #include "source/common/http/filter_manager.h" 26 : #include "source/common/stream_info/stream_info_impl.h" 27 : #include "source/common/tracing/null_span_impl.h" 28 : #include "source/extensions/filters/http/common/factory_base.h" 29 : 30 : namespace Envoy { 31 : namespace Router { 32 : 33 : class GenericUpstream; 34 : class GenericConnectionPoolCallbacks; 35 : class RouterFilterInterface; 36 : class UpstreamRequest; 37 : class UpstreamRequestFilterManagerCallbacks; 38 : class UpstreamFilterManager; 39 : class UpstreamCodecFilter; 40 : 41 : /* The Upstream request is the base class for forwarding HTTP upstream. 42 : * 43 : * On the new request path, payload (headers/body/metadata/data) still arrives via 44 : * the accept[X]fromRouter functions. Said data is immediately passed off to the 45 : * UpstreamFilterManager, which passes each item through the filter chain until 46 : * it arrives at the last filter in the chain, the UpstreamCodecFilter. If the upstream 47 : * stream is not established, the UpstreamCodecFilter returns StopAllIteration, and the 48 : * FilterManager will buffer data, using watermarks to push back to the router 49 : * filter if buffers become overrun. When an upstream connection is established, 50 : * the UpstreamCodecFilter will send data upstream. 51 : * 52 : * On the new response path, payload arrives from upstream via the UpstreamCodecFilter's 53 : * CodecBridge. It is passed off directly to the FilterManager, traverses the 54 : * filter chain, and completion is signaled via the 55 : * UpstreamRequestFilterManagerCallbacks's encode[X] functions. These somewhat 56 : * confusingly pass through the UpstreamRequest's legacy decode[X] functions 57 : * (required due to the UpstreamToDownstream interface, but will be renamed once 58 : * the classic mode is deprecated), and are finally passed to the router via the 59 : * RouterFilterInterface onUpstream[X] functions. 60 : * 61 : * There is some required communication between the UpstreamRequest and 62 : * UpstreamCodecFilter. This is accomplished via the UpstreamStreamFilterCallbacks 63 : * interface, with the UpstreamFilterManager acting as intermediary. 64 : */ 65 : class UpstreamRequest : public Logger::Loggable<Logger::Id::router>, 66 : public UpstreamToDownstream, 67 : public LinkedObject<UpstreamRequest>, 68 : public GenericConnectionPoolCallbacks, 69 : public Event::DeferredDeletable { 70 : public: 71 : UpstreamRequest(RouterFilterInterface& parent, std::unique_ptr<GenericConnPool>&& conn_pool, 72 : bool can_send_early_data, bool can_use_http3); 73 : ~UpstreamRequest() override; 74 251 : void deleteIsPending() override { cleanUp(); } 75 : 76 : // To be called from the destructor, or prior to deferred delete. 77 : void cleanUp(); 78 : 79 : void acceptHeadersFromRouter(bool end_stream); 80 : void acceptDataFromRouter(Buffer::Instance& data, bool end_stream); 81 : void acceptTrailersFromRouter(Http::RequestTrailerMap& trailers); 82 : void acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr); 83 : 84 : void resetStream(); 85 : void setupPerTryTimeout(); 86 : void maybeEndDecode(bool end_stream); 87 : void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host, bool pool_success); 88 : 89 : // Http::StreamDecoder 90 : void decodeData(Buffer::Instance& data, bool end_stream) override; 91 : void decodeMetadata(Http::MetadataMapPtr&& metadata_map) override; 92 : 93 : // UpstreamToDownstream (Http::ResponseDecoder) 94 : void decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) override; 95 : void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override; 96 : void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override; 97 : void dumpState(std::ostream& os, int indent_level) const override; 98 : 99 : // UpstreamToDownstream (Http::StreamCallbacks) 100 : void onResetStream(Http::StreamResetReason reason, 101 : absl::string_view transport_failure_reason) override; 102 0 : void onAboveWriteBufferHighWatermark() override { disableDataFromDownstreamForFlowControl(); } 103 0 : void onBelowWriteBufferLowWatermark() override { enableDataFromDownstreamForFlowControl(); } 104 : // UpstreamToDownstream 105 : const Route& route() const override; 106 : OptRef<const Network::Connection> connection() const override; 107 251 : const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override { 108 251 : return stream_options_; 109 251 : } 110 : 111 : void disableDataFromDownstreamForFlowControl(); 112 : void enableDataFromDownstreamForFlowControl(); 113 : 114 : // GenericConnPool 115 : void onPoolFailure(ConnectionPool::PoolFailureReason reason, 116 : absl::string_view transport_failure_reason, 117 : Upstream::HostDescriptionConstSharedPtr host) override; 118 : void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream, 119 : Upstream::HostDescriptionConstSharedPtr host, 120 : const Network::ConnectionInfoProvider& address_provider, 121 : StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) override; 122 : UpstreamToDownstream& upstreamToDownstream() override; 123 : 124 : void clearRequestEncoder(); 125 : void onStreamMaxDurationReached(); 126 : 127 : // Either disable upstream reading immediately or defer it and keep tracking 128 : // of how many read disabling has happened. 129 : void readDisableOrDefer(bool disable); 130 : // Called upon receiving the first response headers from the upstream. And 131 : // applies read disabling to it if there is any pending read disabling. 132 : void maybeHandleDeferredReadDisable(); 133 : 134 : struct DownstreamWatermarkManager : public Http::DownstreamWatermarkCallbacks { 135 251 : DownstreamWatermarkManager(UpstreamRequest& parent) : parent_(parent) {} 136 : 137 : // Http::DownstreamWatermarkCallbacks 138 : void onBelowWriteBufferLowWatermark() override; 139 : void onAboveWriteBufferHighWatermark() override; 140 : 141 : UpstreamRequest& parent_; 142 : }; 143 : 144 : void readEnable(); 145 : void encodeBodyAndTrailers(); 146 : 147 : // Getters and setters 148 925 : Upstream::HostDescriptionConstSharedPtr& upstreamHost() { return upstream_host_; } 149 0 : void outlierDetectionTimeoutRecorded(bool recorded) { 150 0 : outlier_detection_timeout_recorded_ = recorded; 151 0 : } 152 0 : bool outlierDetectionTimeoutRecorded() { return outlier_detection_timeout_recorded_; } 153 0 : void retried(bool value) { retried_ = value; } 154 0 : bool retried() { return retried_; } 155 105 : bool grpcRqSuccessDeferred() { return grpc_rq_success_deferred_; } 156 49 : void grpcRqSuccessDeferred(bool deferred) { grpc_rq_success_deferred_ = deferred; } 157 141 : void upstreamCanary(bool value) { upstream_canary_ = value; } 158 73 : bool upstreamCanary() { return upstream_canary_; } 159 0 : bool awaitingHeaders() { return awaiting_headers_; } 160 0 : void recordTimeoutBudget(bool value) { record_timeout_budget_ = value; } 161 196 : bool createPerTryTimeoutOnRequestComplete() { 162 196 : return create_per_try_timeout_on_request_complete_; 163 196 : } 164 0 : bool encodeComplete() const { return router_sent_end_stream_; } 165 : // Exposes streamInfo for the upstream stream. 166 1445 : StreamInfo::StreamInfo& streamInfo() { return stream_info_; } 167 0 : bool hadUpstream() const { return had_upstream_; } 168 : 169 : private: 170 : friend class UpstreamFilterManager; 171 : friend class UpstreamCodecFilter; 172 : friend class UpstreamRequestFilterManagerCallbacks; 173 930 : StreamInfo::UpstreamTiming& upstreamTiming() { 174 930 : return stream_info_.upstreamInfo()->upstreamTiming(); 175 930 : } 176 : // Records the latency from when the upstream request was first created to 177 : // when the pool callback fires. This latency can be useful to track excessive 178 : // queuing. 179 : void recordConnectionPoolCallbackLatency(); 180 : 181 141 : void addResponseHeadersSize(uint64_t size) { 182 141 : response_headers_size_ = response_headers_size_.value_or(0) + size; 183 141 : } 184 : void resetPerTryIdleTimer(); 185 : void onPerTryTimeout(); 186 : void onPerTryIdleTimeout(); 187 : void upstreamLog(AccessLog::AccessLogType access_log_type); 188 : void resetUpstreamLogFlushTimer(); 189 : 190 : RouterFilterInterface& parent_; 191 : std::unique_ptr<GenericConnPool> conn_pool_; 192 : Event::TimerPtr per_try_timeout_; 193 : Event::TimerPtr per_try_idle_timeout_; 194 : std::unique_ptr<GenericUpstream> upstream_; 195 : absl::optional<Http::StreamResetReason> deferred_reset_reason_; 196 : Upstream::HostDescriptionConstSharedPtr upstream_host_; 197 : DownstreamWatermarkManager downstream_watermark_manager_{*this}; 198 : Tracing::SpanPtr span_; 199 : StreamInfo::StreamInfoImpl stream_info_; 200 : const MonotonicTime start_time_; 201 : // This is wrapped in an optional, since we want to avoid computing zero size headers when in 202 : // reality we just didn't get a response back. 203 : absl::optional<uint64_t> response_headers_size_{}; 204 : // Copies of upstream headers/trailers. These are only set if upstream 205 : // access logging is configured. 206 : Http::ResponseHeaderMapPtr upstream_headers_; 207 : Http::ResponseTrailerMapPtr upstream_trailers_; 208 : OptRef<UpstreamToDownstream> upstream_interface_; 209 : std::list<Http::UpstreamCallbacks*> upstream_callbacks_; 210 : 211 : Event::TimerPtr max_stream_duration_timer_; 212 : 213 : // Per-stream access log flush duration. This timer is enabled once when the stream is created 214 : // and will log to all access logs once per trigger. 215 : Event::TimerPtr upstream_log_flush_timer_; 216 : 217 : std::unique_ptr<UpstreamRequestFilterManagerCallbacks> filter_manager_callbacks_; 218 : std::unique_ptr<Http::FilterManager> filter_manager_; 219 : 220 : // The number of outstanding readDisable to be called with parameter value true. 221 : // When downstream send buffers get above high watermark before response headers arrive, we 222 : // increment this counter instead of immediately calling readDisable on upstream stream. This is 223 : // to avoid the upstream request from being spuriously retried or reset because of upstream 224 : // timeouts while upstream stream is readDisabled by downstream but the response has actually 225 : // arrived from upstream. See https://github.com/envoyproxy/envoy/issues/25901. During the 226 : // deferring period, if the downstream buffer gets below low watermark, this counter gets 227 : // decremented. Once the response headers arrive, call readDisable the number of times as the 228 : // remaining value of this counter. 229 : size_t deferred_read_disabling_count_{0}; 230 : 231 : // Keep small members (bools and enums) at the end of class, to reduce alignment overhead. 232 : // Tracks the number of times the flow of data from downstream has been disabled. 233 : uint32_t downstream_data_disabled_{}; 234 : bool calling_encode_headers_ : 1; 235 : bool upstream_canary_ : 1; 236 : bool router_sent_end_stream_ : 1; 237 : bool encode_trailers_ : 1; 238 : bool retried_ : 1; 239 : bool awaiting_headers_ : 1; 240 : bool outlier_detection_timeout_recorded_ : 1; 241 : // Tracks whether we deferred a per try timeout because the downstream request 242 : // had not been completed yet. 243 : bool create_per_try_timeout_on_request_complete_ : 1; 244 : // True if the CONNECT headers have been sent but proxying payload is paused 245 : // waiting for response headers. 246 : bool paused_for_connect_ : 1; 247 : bool reset_stream_ : 1; 248 : 249 : // Sentinel to indicate if timeout budget tracking is configured for the cluster, 250 : // and if so, if the per-try histogram should record a value. 251 : bool record_timeout_budget_ : 1; 252 : // Track if one time clean up has been performed. 253 : bool cleaned_up_ : 1; 254 : bool had_upstream_ : 1; 255 : Http::ConnectionPool::Instance::StreamOptions stream_options_; 256 : bool grpc_rq_success_deferred_ : 1; 257 : bool upstream_wait_for_response_headers_before_disabling_read_ : 1; 258 : }; 259 : 260 : class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallbacks, 261 : public Event::DeferredDeletable, 262 : public Http::UpstreamStreamFilterCallbacks { 263 : public: 264 : UpstreamRequestFilterManagerCallbacks(UpstreamRequest& upstream_request) 265 251 : : upstream_request_(upstream_request) {} 266 141 : void encodeHeaders(Http::ResponseHeaderMap&, bool end_stream) override { 267 141 : upstream_request_.decodeHeaders(std::move(response_headers_), end_stream); 268 141 : } 269 0 : void encode1xxHeaders(Http::ResponseHeaderMap&) override { 270 0 : upstream_request_.decode1xxHeaders(std::move(informational_headers_)); 271 0 : } 272 395 : void encodeData(Buffer::Instance& data, bool end_stream) override { 273 395 : upstream_request_.decodeData(data, end_stream); 274 395 : } 275 6 : void encodeTrailers(Http::ResponseTrailerMap&) override { 276 6 : upstream_request_.decodeTrailers(std::move(response_trailers_)); 277 6 : } 278 2 : void encodeMetadata(Http::MetadataMapPtr&& metadata) override { 279 2 : upstream_request_.decodeMetadata(std::move(metadata)); 280 2 : } 281 0 : void setRequestTrailers(Http::RequestTrailerMapPtr&& request_trailers) override { 282 0 : trailers_ = std::move(request_trailers); 283 0 : } 284 0 : void setInformationalHeaders(Http::ResponseHeaderMapPtr&& response_headers) override { 285 0 : informational_headers_ = std::move(response_headers); 286 0 : } 287 141 : void setResponseHeaders(Http::ResponseHeaderMapPtr&& response_headers) override { 288 141 : response_headers_ = std::move(response_headers); 289 141 : } 290 6 : void setResponseTrailers(Http::ResponseTrailerMapPtr&& response_trailers) override { 291 6 : response_trailers_ = std::move(response_trailers); 292 6 : } 293 : Http::RequestHeaderMapOptRef requestHeaders() override; 294 : Http::RequestTrailerMapOptRef requestTrailers() override; 295 0 : Http::ResponseHeaderMapOptRef informationalHeaders() override { 296 0 : if (informational_headers_) { 297 0 : return {*informational_headers_}; 298 0 : } 299 0 : return {}; 300 0 : } 301 141 : Http::ResponseHeaderMapOptRef responseHeaders() override { 302 141 : if (response_headers_) { 303 141 : return {*response_headers_}; 304 141 : } 305 0 : return {}; 306 141 : } 307 401 : Http::ResponseTrailerMapOptRef responseTrailers() override { 308 401 : if (response_trailers_) { 309 6 : return {*response_trailers_}; 310 6 : } 311 395 : return {}; 312 401 : } 313 : // If the filter manager determines a decoder filter has available, tell 314 : // the router to resume the flow of data from downstream. 315 0 : void onDecoderFilterBelowWriteBufferLowWatermark() override { 316 0 : upstream_request_.onBelowWriteBufferLowWatermark(); 317 0 : } 318 : // If the filter manager determines a decoder filter has too much data, tell 319 : // the router to stop the flow of data from downstream. 320 0 : void onDecoderFilterAboveWriteBufferHighWatermark() override { 321 0 : upstream_request_.onAboveWriteBufferHighWatermark(); 322 0 : } 323 : 324 : // These functions are delegated to the downstream HCM/FM 325 : OptRef<const Tracing::Config> tracingConfig() const override; 326 : const ScopeTrackedObject& scope() override; 327 : Tracing::Span& activeSpan() override; 328 : void resetStream(Http::StreamResetReason reset_reason, 329 : absl::string_view transport_failure_reason) override; 330 : Upstream::ClusterInfoConstSharedPtr clusterInfo() override; 331 : Http::Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override; 332 : 333 : // Intentional no-op functions. 334 0 : void onResponseDataTooLarge() override {} 335 0 : void onRequestDataTooLarge() override {} 336 98 : void endStream() override {} 337 357 : void disarmRequestTimeout() override {} 338 1224 : void resetIdleTimer() override {} 339 0 : void onLocalReply(Http::Code) override {} 340 : // Upgrade filter chains not supported. 341 0 : const Router::RouteEntry::UpgradeMap* upgradeMap() override { return nullptr; } 342 : 343 : // Unsupported functions. 344 0 : void recreateStream(StreamInfo::FilterStateSharedPtr) override { 345 0 : IS_ENVOY_BUG("recreateStream called from upstream HTTP filter"); 346 0 : } 347 0 : void upgradeFilterChainCreated() override { 348 0 : IS_ENVOY_BUG("upgradeFilterChainCreated called from upstream HTTP filter"); 349 0 : } 350 3277 : OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {*this}; } 351 : 352 : // Http::UpstreamStreamFilterCallbacks 353 1096 : StreamInfo::StreamInfo& upstreamStreamInfo() override { return upstream_request_.streamInfo(); } 354 1085 : OptRef<GenericUpstream> upstream() override { 355 1085 : return makeOptRefFromPtr(upstream_request_.upstream_.get()); 356 1085 : } 357 0 : void dumpState(std::ostream& os, int indent_level = 0) const override { 358 0 : upstream_request_.dumpState(os, indent_level); 359 0 : } 360 343 : bool pausedForConnect() const override { return upstream_request_.paused_for_connect_; } 361 0 : void setPausedForConnect(bool value) override { upstream_request_.paused_for_connect_ = value; } 362 251 : const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override { 363 251 : return upstream_request_.upstreamStreamOptions(); 364 251 : } 365 251 : void addUpstreamCallbacks(Http::UpstreamCallbacks& callbacks) override { 366 251 : upstream_request_.upstream_callbacks_.push_back(&callbacks); 367 251 : } 368 251 : void setUpstreamToDownstream(UpstreamToDownstream& upstream_to_downstream_interface) override { 369 251 : upstream_request_.upstream_interface_ = upstream_to_downstream_interface; 370 251 : } 371 : 372 : Http::RequestTrailerMapPtr trailers_; 373 : Http::ResponseHeaderMapPtr informational_headers_; 374 : Http::ResponseHeaderMapPtr response_headers_; 375 : Http::ResponseTrailerMapPtr response_trailers_; 376 : UpstreamRequest& upstream_request_; 377 : }; 378 : 379 : } // namespace Router 380 : } // namespace Envoy