/proc/self/cwd/source/common/router/upstream_request.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <functional> |
6 | | #include <memory> |
7 | | #include <string> |
8 | | |
9 | | #include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h" |
10 | | #include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.validate.h" |
11 | | #include "envoy/http/codec.h" |
12 | | #include "envoy/http/codes.h" |
13 | | #include "envoy/http/conn_pool.h" |
14 | | #include "envoy/http/filter.h" |
15 | | #include "envoy/stats/scope.h" |
16 | | #include "envoy/tcp/conn_pool.h" |
17 | | |
18 | | #include "source/common/buffer/watermark_buffer.h" |
19 | | #include "source/common/common/cleanup.h" |
20 | | #include "source/common/common/hash.h" |
21 | | #include "source/common/common/hex.h" |
22 | | #include "source/common/common/linked_object.h" |
23 | | #include "source/common/common/logger.h" |
24 | | #include "source/common/config/well_known_names.h" |
25 | | #include "source/common/http/filter_manager.h" |
26 | | #include "source/common/stream_info/stream_info_impl.h" |
27 | | #include "source/common/tracing/null_span_impl.h" |
28 | | #include "source/extensions/filters/http/common/factory_base.h" |
29 | | |
30 | | namespace Envoy { |
31 | | namespace Router { |
32 | | |
33 | | class GenericUpstream; |
34 | | class GenericConnectionPoolCallbacks; |
35 | | class RouterFilterInterface; |
36 | | class UpstreamRequest; |
37 | | class UpstreamRequestFilterManagerCallbacks; |
38 | | class UpstreamFilterManager; |
39 | | class UpstreamCodecFilter; |
40 | | |
41 | | /* The Upstream request is the base class for forwarding HTTP upstream. |
42 | | * |
43 | | * On the new request path, payload (headers/body/metadata/data) still arrives via |
44 | | * the accept[X]fromRouter functions. Said data is immediately passed off to the |
45 | | * UpstreamFilterManager, which passes each item through the filter chain until |
46 | | * it arrives at the last filter in the chain, the UpstreamCodecFilter. If the upstream |
47 | | * stream is not established, the UpstreamCodecFilter returns StopAllIteration, and the |
48 | | * FilterManager will buffer data, using watermarks to push back to the router |
49 | | * filter if buffers become overrun. When an upstream connection is established, |
50 | | * the UpstreamCodecFilter will send data upstream. |
51 | | * |
52 | | * On the new response path, payload arrives from upstream via the UpstreamCodecFilter's |
53 | | * CodecBridge. It is passed off directly to the FilterManager, traverses the |
54 | | * filter chain, and completion is signaled via the |
55 | | * UpstreamRequestFilterManagerCallbacks's encode[X] functions. These somewhat |
56 | | * confusingly pass through the UpstreamRequest's legacy decode[X] functions |
57 | | * (required due to the UpstreamToDownstream interface, but will be renamed once |
58 | | * the classic mode is deprecated), and are finally passed to the router via the |
59 | | * RouterFilterInterface onUpstream[X] functions. |
60 | | * |
61 | | * There is some required communication between the UpstreamRequest and |
62 | | * UpstreamCodecFilter. This is accomplished via the UpstreamStreamFilterCallbacks |
63 | | * interface, with the UpstreamFilterManager acting as intermediary. |
64 | | * |
65 | | */ |
66 | | class UpstreamRequest : public Logger::Loggable<Logger::Id::router>, |
67 | | public UpstreamToDownstream, |
68 | | public LinkedObject<UpstreamRequest>, |
69 | | public GenericConnectionPoolCallbacks, |
70 | | public Event::DeferredDeletable { |
71 | | public: |
72 | | UpstreamRequest(RouterFilterInterface& parent, std::unique_ptr<GenericConnPool>&& conn_pool, |
73 | | bool can_send_early_data, bool can_use_http3, bool enable_half_close); |
74 | | ~UpstreamRequest() override; |
75 | 1.81k | void deleteIsPending() override { cleanUp(); } |
76 | | |
77 | | // To be called from the destructor, or prior to deferred delete. |
78 | | void cleanUp(); |
79 | | |
80 | | virtual void acceptHeadersFromRouter(bool end_stream); |
81 | | virtual void acceptDataFromRouter(Buffer::Instance& data, bool end_stream); |
82 | | void acceptTrailersFromRouter(Http::RequestTrailerMap& trailers); |
83 | | void acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr); |
84 | | |
85 | | virtual void resetStream(); |
86 | | void setupPerTryTimeout(); |
87 | | void maybeEndDecode(bool end_stream); |
88 | | void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host, bool pool_success); |
89 | | |
90 | | // Http::StreamDecoder |
91 | | void decodeData(Buffer::Instance& data, bool end_stream) override; |
92 | | void decodeMetadata(Http::MetadataMapPtr&& metadata_map) override; |
93 | | |
94 | | // UpstreamToDownstream (Http::ResponseDecoder) |
95 | | void decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) override; |
96 | | void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override; |
97 | | void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override; |
98 | | void dumpState(std::ostream& os, int indent_level) const override; |
99 | | |
100 | | // UpstreamToDownstream (Http::StreamCallbacks) |
101 | | void onResetStream(Http::StreamResetReason reason, |
102 | | absl::string_view transport_failure_reason) override; |
103 | 0 | void onAboveWriteBufferHighWatermark() override { disableDataFromDownstreamForFlowControl(); } |
104 | 0 | void onBelowWriteBufferLowWatermark() override { enableDataFromDownstreamForFlowControl(); } |
105 | | // UpstreamToDownstream |
106 | | const Route& route() const override; |
107 | | OptRef<const Network::Connection> connection() const override; |
108 | 1.81k | const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override { |
109 | 1.81k | return stream_options_; |
110 | 1.81k | } |
111 | | |
112 | | void disableDataFromDownstreamForFlowControl(); |
113 | | void enableDataFromDownstreamForFlowControl(); |
114 | | |
115 | | // GenericConnPool |
116 | | void onPoolFailure(ConnectionPool::PoolFailureReason reason, |
117 | | absl::string_view transport_failure_reason, |
118 | | Upstream::HostDescriptionConstSharedPtr host) override; |
119 | | void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream, |
120 | | Upstream::HostDescriptionConstSharedPtr host, |
121 | | const Network::ConnectionInfoProvider& address_provider, |
122 | | StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) override; |
123 | | UpstreamToDownstream& upstreamToDownstream() override; |
124 | | |
125 | | void clearRequestEncoder(); |
126 | | void onStreamMaxDurationReached(); |
127 | | |
128 | | // Either disable upstream reading immediately or defer it and keep tracking |
129 | | // of how many read disabling has happened. |
130 | | void readDisableOrDefer(bool disable); |
131 | | // Called upon receiving the first response headers from the upstream. And |
132 | | // applies read disabling to it if there is any pending read disabling. |
133 | | void maybeHandleDeferredReadDisable(); |
134 | | |
135 | | struct DownstreamWatermarkManager : public Http::DownstreamWatermarkCallbacks { |
136 | 1.81k | DownstreamWatermarkManager(UpstreamRequest& parent) : parent_(parent) {} |
137 | | |
138 | | // Http::DownstreamWatermarkCallbacks |
139 | | void onBelowWriteBufferLowWatermark() override; |
140 | | void onAboveWriteBufferHighWatermark() override; |
141 | | |
142 | | UpstreamRequest& parent_; |
143 | | }; |
144 | | |
145 | | void readEnable(); |
146 | | void encodeBodyAndTrailers(); |
147 | | |
148 | | // Getters and setters |
149 | 9.40k | Upstream::HostDescriptionConstSharedPtr& upstreamHost() { return upstream_host_; } |
150 | 0 | void outlierDetectionTimeoutRecorded(bool recorded) { |
151 | 0 | outlier_detection_timeout_recorded_ = recorded; |
152 | 0 | } |
153 | 0 | bool outlierDetectionTimeoutRecorded() { return outlier_detection_timeout_recorded_; } |
154 | 0 | void retried(bool value) { retried_ = value; } |
155 | 0 | bool retried() { return retried_; } |
156 | 799 | bool grpcRqSuccessDeferred() { return grpc_rq_success_deferred_; } |
157 | 729 | void grpcRqSuccessDeferred(bool deferred) { grpc_rq_success_deferred_ = deferred; } |
158 | 1.63k | void upstreamCanary(bool value) { upstream_canary_ = value; } |
159 | 799 | bool upstreamCanary() { return upstream_canary_; } |
160 | 0 | bool awaitingHeaders() { return awaiting_headers_; } |
161 | 0 | void recordTimeoutBudget(bool value) { record_timeout_budget_ = value; } |
162 | 1.62k | bool createPerTryTimeoutOnRequestComplete() { |
163 | 1.62k | return create_per_try_timeout_on_request_complete_; |
164 | 1.62k | } |
165 | 0 | bool encodeComplete() const { return router_sent_end_stream_; } |
166 | | // Exposes streamInfo for the upstream stream. |
167 | 12.2k | StreamInfo::StreamInfo& streamInfo() { return stream_info_; } |
168 | 0 | bool hadUpstream() const { return had_upstream_; } |
169 | | |
170 | | private: |
171 | | friend class UpstreamFilterManager; |
172 | | friend class UpstreamCodecFilter; |
173 | | friend class UpstreamRequestFilterManagerCallbacks; |
174 | 8.70k | StreamInfo::UpstreamTiming& upstreamTiming() { |
175 | 8.70k | return stream_info_.upstreamInfo()->upstreamTiming(); |
176 | 8.70k | } |
177 | | // Records the latency from when the upstream request was first created to |
178 | | // when the pool callback fires. This latency can be useful to track excessive |
179 | | // queuing. |
180 | | void recordConnectionPoolCallbackLatency(); |
181 | | |
182 | 1.63k | void addResponseHeadersSize(uint64_t size) { |
183 | 1.63k | response_headers_size_ = response_headers_size_.value_or(0) + size; |
184 | 1.63k | } |
185 | | void resetPerTryIdleTimer(); |
186 | | void onPerTryTimeout(); |
187 | | void onPerTryIdleTimeout(); |
188 | | void upstreamLog(AccessLog::AccessLogType access_log_type); |
189 | | void resetUpstreamLogFlushTimer(); |
190 | | |
191 | | RouterFilterInterface& parent_; |
192 | | std::unique_ptr<GenericConnPool> conn_pool_; |
193 | | Event::TimerPtr per_try_timeout_; |
194 | | Event::TimerPtr per_try_idle_timeout_; |
195 | | std::unique_ptr<GenericUpstream> upstream_; |
196 | | absl::optional<Http::StreamResetReason> deferred_reset_reason_; |
197 | | Upstream::HostDescriptionConstSharedPtr upstream_host_; |
198 | | DownstreamWatermarkManager downstream_watermark_manager_{*this}; |
199 | | Tracing::SpanPtr span_; |
200 | | StreamInfo::StreamInfoImpl stream_info_; |
201 | | const MonotonicTime start_time_; |
202 | | // This is wrapped in an optional, since we want to avoid computing zero size headers when in |
203 | | // reality we just didn't get a response back. |
204 | | absl::optional<uint64_t> response_headers_size_{}; |
205 | | // Copies of upstream headers/trailers. These are only set if upstream |
206 | | // access logging is configured. |
207 | | Http::ResponseHeaderMapPtr upstream_headers_; |
208 | | Http::ResponseTrailerMapPtr upstream_trailers_; |
209 | | OptRef<UpstreamToDownstream> upstream_interface_; |
210 | | std::list<Http::UpstreamCallbacks*> upstream_callbacks_; |
211 | | |
212 | | Event::TimerPtr max_stream_duration_timer_; |
213 | | |
214 | | // Per-stream access log flush duration. This timer is enabled once when the stream is created |
215 | | // and will log to all access logs once per trigger. |
216 | | Event::TimerPtr upstream_log_flush_timer_; |
217 | | |
218 | | std::unique_ptr<UpstreamRequestFilterManagerCallbacks> filter_manager_callbacks_; |
219 | | std::unique_ptr<Http::FilterManager> filter_manager_; |
220 | | |
221 | | // The number of outstanding readDisable to be called with parameter value true. |
222 | | // When downstream send buffers get above high watermark before response headers arrive, we |
223 | | // increment this counter instead of immediately calling readDisable on upstream stream. This is |
224 | | // to avoid the upstream request from being spuriously retried or reset because of upstream |
225 | | // timeouts while upstream stream is readDisabled by downstream but the response has actually |
226 | | // arrived from upstream. See https://github.com/envoyproxy/envoy/issues/25901. During the |
227 | | // deferring period, if the downstream buffer gets below low watermark, this counter gets |
228 | | // decremented. Once the response headers arrive, call readDisable the number of times as the |
229 | | // remaining value of this counter. |
230 | | size_t deferred_read_disabling_count_{0}; |
231 | | |
232 | | // Keep small members (bools and enums) at the end of class, to reduce alignment overhead. |
233 | | // Tracks the number of times the flow of data from downstream has been disabled. |
234 | | uint32_t downstream_data_disabled_{}; |
235 | | bool calling_encode_headers_ : 1; |
236 | | bool upstream_canary_ : 1; |
237 | | bool router_sent_end_stream_ : 1; |
238 | | bool encode_trailers_ : 1; |
239 | | bool retried_ : 1; |
240 | | bool awaiting_headers_ : 1; |
241 | | bool outlier_detection_timeout_recorded_ : 1; |
242 | | // Tracks whether we deferred a per try timeout because the downstream request |
243 | | // had not been completed yet. |
244 | | bool create_per_try_timeout_on_request_complete_ : 1; |
245 | | // True if the CONNECT headers have been sent but proxying payload is paused |
246 | | // waiting for response headers. |
247 | | bool paused_for_connect_ : 1; |
248 | | bool paused_for_websocket_ : 1; |
249 | | bool reset_stream_ : 1; |
250 | | |
251 | | // Sentinel to indicate if timeout budget tracking is configured for the cluster, |
252 | | // and if so, if the per-try histogram should record a value. |
253 | | bool record_timeout_budget_ : 1; |
254 | | // Track if one time clean up has been performed. |
255 | | bool cleaned_up_ : 1; |
256 | | bool had_upstream_ : 1; |
257 | | Http::ConnectionPool::Instance::StreamOptions stream_options_; |
258 | | bool grpc_rq_success_deferred_ : 1; |
259 | | bool enable_half_close_ : 1; |
260 | | }; |
261 | | |
262 | | class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallbacks, |
263 | | public Event::DeferredDeletable, |
264 | | public Http::UpstreamStreamFilterCallbacks { |
265 | | public: |
266 | | UpstreamRequestFilterManagerCallbacks(UpstreamRequest& upstream_request) |
267 | 1.81k | : upstream_request_(upstream_request) {} |
268 | 1.63k | void encodeHeaders(Http::ResponseHeaderMap&, bool end_stream) override { |
269 | 1.63k | upstream_request_.decodeHeaders(std::move(response_headers_), end_stream); |
270 | 1.63k | } |
271 | 1 | void encode1xxHeaders(Http::ResponseHeaderMap&) override { |
272 | 1 | upstream_request_.decode1xxHeaders(std::move(informational_headers_)); |
273 | 1 | } |
274 | 3.11k | void encodeData(Buffer::Instance& data, bool end_stream) override { |
275 | 3.11k | upstream_request_.decodeData(data, end_stream); |
276 | 3.11k | } |
277 | 1 | void encodeTrailers(Http::ResponseTrailerMap&) override { |
278 | 1 | upstream_request_.decodeTrailers(std::move(response_trailers_)); |
279 | 1 | } |
280 | 293 | void encodeMetadata(Http::MetadataMapPtr&& metadata) override { |
281 | 293 | upstream_request_.decodeMetadata(std::move(metadata)); |
282 | 293 | } |
283 | 0 | void setRequestTrailers(Http::RequestTrailerMapPtr&& request_trailers) override { |
284 | 0 | trailers_ = std::move(request_trailers); |
285 | 0 | } |
286 | 1 | void setInformationalHeaders(Http::ResponseHeaderMapPtr&& response_headers) override { |
287 | 1 | informational_headers_ = std::move(response_headers); |
288 | 1 | } |
289 | 1.63k | void setResponseHeaders(Http::ResponseHeaderMapPtr&& response_headers) override { |
290 | 1.63k | response_headers_ = std::move(response_headers); |
291 | 1.63k | } |
292 | 1 | void setResponseTrailers(Http::ResponseTrailerMapPtr&& response_trailers) override { |
293 | 1 | response_trailers_ = std::move(response_trailers); |
294 | 1 | } |
295 | | Http::RequestHeaderMapOptRef requestHeaders() override; |
296 | | Http::RequestTrailerMapOptRef requestTrailers() override; |
297 | 1 | Http::ResponseHeaderMapOptRef informationalHeaders() override { |
298 | 1 | if (informational_headers_) { |
299 | 1 | return {*informational_headers_}; |
300 | 1 | } |
301 | 0 | return {}; |
302 | 1 | } |
303 | 1.63k | Http::ResponseHeaderMapOptRef responseHeaders() override { |
304 | 1.63k | if (response_headers_) { |
305 | 1.63k | return {*response_headers_}; |
306 | 1.63k | } |
307 | 0 | return {}; |
308 | 1.63k | } |
309 | 3.11k | Http::ResponseTrailerMapOptRef responseTrailers() override { |
310 | 3.11k | if (response_trailers_) { |
311 | 1 | return {*response_trailers_}; |
312 | 1 | } |
313 | 3.11k | return {}; |
314 | 3.11k | } |
315 | | // If the filter manager determines a decoder filter has available, tell |
316 | | // the router to resume the flow of data from downstream. |
317 | 0 | void onDecoderFilterBelowWriteBufferLowWatermark() override { |
318 | 0 | upstream_request_.onBelowWriteBufferLowWatermark(); |
319 | 0 | } |
320 | | // If the filter manager determines a decoder filter has too much data, tell |
321 | | // the router to stop the flow of data from downstream. |
322 | 0 | void onDecoderFilterAboveWriteBufferHighWatermark() override { |
323 | 0 | upstream_request_.onAboveWriteBufferHighWatermark(); |
324 | 0 | } |
325 | | |
326 | | // These functions are delegated to the downstream HCM/FM |
327 | | OptRef<const Tracing::Config> tracingConfig() const override; |
328 | | const ScopeTrackedObject& scope() override; |
329 | | Tracing::Span& activeSpan() override; |
330 | | void resetStream(Http::StreamResetReason reset_reason, |
331 | | absl::string_view transport_failure_reason) override; |
332 | | Upstream::ClusterInfoConstSharedPtr clusterInfo() override; |
333 | | Http::Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override; |
334 | | |
335 | | // Intentional no-op functions. |
336 | 0 | void onResponseDataTooLarge() override {} |
337 | 0 | void onRequestDataTooLarge() override {} |
338 | 902 | void endStream() override {} |
339 | 3.52k | void disarmRequestTimeout() override {} |
340 | 8.73k | void resetIdleTimer() override {} |
341 | 0 | void onLocalReply(Http::Code) override {} |
342 | | // Upgrade filter chains not supported. |
343 | 0 | const Router::RouteEntry::UpgradeMap* upgradeMap() override { return nullptr; } |
344 | | |
345 | | // Unsupported functions. |
346 | 0 | void recreateStream(StreamInfo::FilterStateSharedPtr) override { |
347 | 0 | IS_ENVOY_BUG("recreateStream called from upstream HTTP filter"); |
348 | 0 | } |
349 | 0 | void upgradeFilterChainCreated() override { |
350 | 0 | IS_ENVOY_BUG("upgradeFilterChainCreated called from upstream HTTP filter"); |
351 | 0 | } |
352 | 34.1k | OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {*this}; } |
353 | | |
354 | | // Http::UpstreamStreamFilterCallbacks |
355 | 8.83k | StreamInfo::StreamInfo& upstreamStreamInfo() override { return upstream_request_.streamInfo(); } |
356 | 7.44k | OptRef<GenericUpstream> upstream() override { |
357 | 7.44k | return makeOptRefFromPtr(upstream_request_.upstream_.get()); |
358 | 7.44k | } |
359 | 0 | void dumpState(std::ostream& os, int indent_level = 0) const override { |
360 | 0 | upstream_request_.dumpState(os, indent_level); |
361 | 0 | } |
362 | 6.35k | bool pausedForConnect() const override { return upstream_request_.paused_for_connect_; } |
363 | 0 | void setPausedForConnect(bool value) override { upstream_request_.paused_for_connect_ = value; } |
364 | | |
365 | 3.40k | bool pausedForWebsocketUpgrade() const override { |
366 | 3.40k | return upstream_request_.paused_for_websocket_; |
367 | 3.40k | } |
368 | 0 | void setPausedForWebsocketUpgrade(bool value) override { |
369 | 0 | upstream_request_.paused_for_websocket_ = value; |
370 | 0 | } |
371 | | |
372 | 1.81k | const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override { |
373 | 1.81k | return upstream_request_.upstreamStreamOptions(); |
374 | 1.81k | } |
375 | 1.81k | void addUpstreamCallbacks(Http::UpstreamCallbacks& callbacks) override { |
376 | 1.81k | upstream_request_.upstream_callbacks_.push_back(&callbacks); |
377 | 1.81k | } |
378 | 1.81k | void setUpstreamToDownstream(UpstreamToDownstream& upstream_to_downstream_interface) override { |
379 | 1.81k | upstream_request_.upstream_interface_ = upstream_to_downstream_interface; |
380 | 1.81k | } |
381 | 3.42k | bool isHalfCloseEnabled() override { return upstream_request_.enable_half_close_; } |
382 | | Http::RequestTrailerMapPtr trailers_; |
383 | | Http::ResponseHeaderMapPtr informational_headers_; |
384 | | Http::ResponseHeaderMapPtr response_headers_; |
385 | | Http::ResponseTrailerMapPtr response_trailers_; |
386 | | UpstreamRequest& upstream_request_; |
387 | | }; |
388 | | |
389 | | } // namespace Router |
390 | | } // namespace Envoy |