/proc/self/cwd/source/common/router/upstream_request.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <functional> |
6 | | #include <memory> |
7 | | #include <string> |
8 | | |
9 | | #include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h" |
10 | | #include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.validate.h" |
11 | | #include "envoy/http/codec.h" |
12 | | #include "envoy/http/codes.h" |
13 | | #include "envoy/http/conn_pool.h" |
14 | | #include "envoy/http/filter.h" |
15 | | #include "envoy/stats/scope.h" |
16 | | #include "envoy/tcp/conn_pool.h" |
17 | | |
18 | | #include "source/common/buffer/watermark_buffer.h" |
19 | | #include "source/common/common/cleanup.h" |
20 | | #include "source/common/common/hash.h" |
21 | | #include "source/common/common/hex.h" |
22 | | #include "source/common/common/linked_object.h" |
23 | | #include "source/common/common/logger.h" |
24 | | #include "source/common/config/well_known_names.h" |
25 | | #include "source/common/http/filter_manager.h" |
26 | | #include "source/common/stream_info/stream_info_impl.h" |
27 | | #include "source/common/tracing/null_span_impl.h" |
28 | | #include "source/extensions/filters/http/common/factory_base.h" |
29 | | |
30 | | namespace Envoy { |
31 | | namespace Router { |
32 | | |
33 | | class GenericUpstream; |
34 | | class GenericConnectionPoolCallbacks; |
35 | | class RouterFilterInterface; |
36 | | class UpstreamRequest; |
37 | | class UpstreamRequestFilterManagerCallbacks; |
38 | | class UpstreamFilterManager; |
39 | | class UpstreamCodecFilter; |
40 | | |
41 | | /* The Upstream request is the base class for forwarding HTTP upstream. |
42 | | * |
43 | | * On the new request path, payload (headers/body/metadata/data) still arrives via |
44 | | * the accept[X]fromRouter functions. Said data is immediately passed off to the |
45 | | * UpstreamFilterManager, which passes each item through the filter chain until |
46 | | * it arrives at the last filter in the chain, the UpstreamCodecFilter. If the upstream |
47 | | * stream is not established, the UpstreamCodecFilter returns StopAllIteration, and the |
48 | | * FilterManager will buffer data, using watermarks to push back to the router |
49 | | * filter if buffers become overrun. When an upstream connection is established, |
50 | | * the UpstreamCodecFilter will send data upstream. |
51 | | * |
52 | | * On the new response path, payload arrives from upstream via the UpstreamCodecFilter's |
53 | | * CodecBridge. It is passed off directly to the FilterManager, traverses the |
54 | | * filter chain, and completion is signaled via the |
55 | | * UpstreamRequestFilterManagerCallbacks's encode[X] functions. These somewhat |
56 | | * confusingly pass through the UpstreamRequest's legacy decode[X] functions |
57 | | * (required due to the UpstreamToDownstream interface, but will be renamed once |
58 | | * the classic mode is deprecated), and are finally passed to the router via the |
59 | | * RouterFilterInterface onUpstream[X] functions. |
60 | | * |
61 | | * There is some required communication between the UpstreamRequest and |
62 | | * UpstreamCodecFilter. This is accomplished via the UpstreamStreamFilterCallbacks |
63 | | * interface, with the UpstreamFilterManager acting as intermediary. |
64 | | */ |
65 | | class UpstreamRequest : public Logger::Loggable<Logger::Id::router>, |
66 | | public UpstreamToDownstream, |
67 | | public LinkedObject<UpstreamRequest>, |
68 | | public GenericConnectionPoolCallbacks, |
69 | | public Event::DeferredDeletable { |
70 | | public: |
71 | | UpstreamRequest(RouterFilterInterface& parent, std::unique_ptr<GenericConnPool>&& conn_pool, |
72 | | bool can_send_early_data, bool can_use_http3); |
73 | | ~UpstreamRequest() override; |
74 | 2.40k | void deleteIsPending() override { cleanUp(); } |
75 | | |
76 | | // To be called from the destructor, or prior to deferred delete. |
77 | | void cleanUp(); |
78 | | |
79 | | void acceptHeadersFromRouter(bool end_stream); |
80 | | void acceptDataFromRouter(Buffer::Instance& data, bool end_stream); |
81 | | void acceptTrailersFromRouter(Http::RequestTrailerMap& trailers); |
82 | | void acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr); |
83 | | |
84 | | void resetStream(); |
85 | | void setupPerTryTimeout(); |
86 | | void maybeEndDecode(bool end_stream); |
87 | | void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host, bool pool_success); |
88 | | |
89 | | // Http::StreamDecoder |
90 | | void decodeData(Buffer::Instance& data, bool end_stream) override; |
91 | | void decodeMetadata(Http::MetadataMapPtr&& metadata_map) override; |
92 | | |
93 | | // UpstreamToDownstream (Http::ResponseDecoder) |
94 | | void decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) override; |
95 | | void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override; |
96 | | void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override; |
97 | | void dumpState(std::ostream& os, int indent_level) const override; |
98 | | |
99 | | // UpstreamToDownstream (Http::StreamCallbacks) |
100 | | void onResetStream(Http::StreamResetReason reason, |
101 | | absl::string_view transport_failure_reason) override; |
102 | 0 | void onAboveWriteBufferHighWatermark() override { disableDataFromDownstreamForFlowControl(); } |
103 | 0 | void onBelowWriteBufferLowWatermark() override { enableDataFromDownstreamForFlowControl(); } |
104 | | // UpstreamToDownstream |
105 | | const Route& route() const override; |
106 | | OptRef<const Network::Connection> connection() const override; |
107 | 2.39k | const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override { |
108 | 2.39k | return stream_options_; |
109 | 2.39k | } |
110 | | |
111 | | void disableDataFromDownstreamForFlowControl(); |
112 | | void enableDataFromDownstreamForFlowControl(); |
113 | | |
114 | | // GenericConnPool |
115 | | void onPoolFailure(ConnectionPool::PoolFailureReason reason, |
116 | | absl::string_view transport_failure_reason, |
117 | | Upstream::HostDescriptionConstSharedPtr host) override; |
118 | | void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream, |
119 | | Upstream::HostDescriptionConstSharedPtr host, |
120 | | const Network::ConnectionInfoProvider& address_provider, |
121 | | StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) override; |
122 | | UpstreamToDownstream& upstreamToDownstream() override; |
123 | | |
124 | | void clearRequestEncoder(); |
125 | | void onStreamMaxDurationReached(); |
126 | | |
127 | | // Either disable upstream reading immediately or defer it and keep tracking |
128 | | // of how many read disabling has happened. |
129 | | void readDisableOrDefer(bool disable); |
130 | | // Called upon receiving the first response headers from the upstream. And |
131 | | // applies read disabling to it if there is any pending read disabling. |
132 | | void maybeHandleDeferredReadDisable(); |
133 | | |
134 | | struct DownstreamWatermarkManager : public Http::DownstreamWatermarkCallbacks { |
135 | 2.40k | DownstreamWatermarkManager(UpstreamRequest& parent) : parent_(parent) {} |
136 | | |
137 | | // Http::DownstreamWatermarkCallbacks |
138 | | void onBelowWriteBufferLowWatermark() override; |
139 | | void onAboveWriteBufferHighWatermark() override; |
140 | | |
141 | | UpstreamRequest& parent_; |
142 | | }; |
143 | | |
144 | | void readEnable(); |
145 | | void encodeBodyAndTrailers(); |
146 | | |
147 | | // Getters and setters |
148 | 10.2k | Upstream::HostDescriptionConstSharedPtr& upstreamHost() { return upstream_host_; } |
149 | 0 | void outlierDetectionTimeoutRecorded(bool recorded) { |
150 | 0 | outlier_detection_timeout_recorded_ = recorded; |
151 | 0 | } |
152 | 0 | bool outlierDetectionTimeoutRecorded() { return outlier_detection_timeout_recorded_; } |
153 | 0 | void retried(bool value) { retried_ = value; } |
154 | 0 | bool retried() { return retried_; } |
155 | 1.08k | bool grpcRqSuccessDeferred() { return grpc_rq_success_deferred_; } |
156 | 1.02k | void grpcRqSuccessDeferred(bool deferred) { grpc_rq_success_deferred_ = deferred; } |
157 | 2.22k | void upstreamCanary(bool value) { upstream_canary_ = value; } |
158 | 1.08k | bool upstreamCanary() { return upstream_canary_; } |
159 | 0 | bool awaitingHeaders() { return awaiting_headers_; } |
160 | 0 | void recordTimeoutBudget(bool value) { record_timeout_budget_ = value; } |
161 | 2.21k | bool createPerTryTimeoutOnRequestComplete() { |
162 | 2.21k | return create_per_try_timeout_on_request_complete_; |
163 | 2.21k | } |
164 | 0 | bool encodeComplete() const { return router_sent_end_stream_; } |
165 | | // Exposes streamInfo for the upstream stream. |
166 | 16.7k | StreamInfo::StreamInfo& streamInfo() { return stream_info_; } |
167 | 0 | bool hadUpstream() const { return had_upstream_; } |
168 | | |
169 | | private: |
170 | | friend class UpstreamFilterManager; |
171 | | friend class UpstreamCodecFilter; |
172 | | friend class UpstreamRequestFilterManagerCallbacks; |
173 | 11.6k | StreamInfo::UpstreamTiming& upstreamTiming() { |
174 | 11.6k | return stream_info_.upstreamInfo()->upstreamTiming(); |
175 | 11.6k | } |
176 | | // Records the latency from when the upstream request was first created to |
177 | | // when the pool callback fires. This latency can be useful to track excessive |
178 | | // queuing. |
179 | | void recordConnectionPoolCallbackLatency(); |
180 | | |
181 | 2.22k | void addResponseHeadersSize(uint64_t size) { |
182 | 2.22k | response_headers_size_ = response_headers_size_.value_or(0) + size; |
183 | 2.22k | } |
184 | | void resetPerTryIdleTimer(); |
185 | | void onPerTryTimeout(); |
186 | | void onPerTryIdleTimeout(); |
187 | | void upstreamLog(AccessLog::AccessLogType access_log_type); |
188 | | void resetUpstreamLogFlushTimer(); |
189 | | |
190 | | RouterFilterInterface& parent_; |
191 | | std::unique_ptr<GenericConnPool> conn_pool_; |
192 | | Event::TimerPtr per_try_timeout_; |
193 | | Event::TimerPtr per_try_idle_timeout_; |
194 | | std::unique_ptr<GenericUpstream> upstream_; |
195 | | absl::optional<Http::StreamResetReason> deferred_reset_reason_; |
196 | | Upstream::HostDescriptionConstSharedPtr upstream_host_; |
197 | | DownstreamWatermarkManager downstream_watermark_manager_{*this}; |
198 | | Tracing::SpanPtr span_; |
199 | | StreamInfo::StreamInfoImpl stream_info_; |
200 | | const MonotonicTime start_time_; |
201 | | // This is wrapped in an optional, since we want to avoid computing zero size headers when in |
202 | | // reality we just didn't get a response back. |
203 | | absl::optional<uint64_t> response_headers_size_{}; |
204 | | // Copies of upstream headers/trailers. These are only set if upstream |
205 | | // access logging is configured. |
206 | | Http::ResponseHeaderMapPtr upstream_headers_; |
207 | | Http::ResponseTrailerMapPtr upstream_trailers_; |
208 | | OptRef<UpstreamToDownstream> upstream_interface_; |
209 | | std::list<Http::UpstreamCallbacks*> upstream_callbacks_; |
210 | | |
211 | | Event::TimerPtr max_stream_duration_timer_; |
212 | | |
213 | | // Per-stream access log flush duration. This timer is enabled once when the stream is created |
214 | | // and will log to all access logs once per trigger. |
215 | | Event::TimerPtr upstream_log_flush_timer_; |
216 | | |
217 | | std::unique_ptr<UpstreamRequestFilterManagerCallbacks> filter_manager_callbacks_; |
218 | | std::unique_ptr<Http::FilterManager> filter_manager_; |
219 | | |
220 | | // The number of outstanding readDisable to be called with parameter value true. |
221 | | // When downstream send buffers get above high watermark before response headers arrive, we |
222 | | // increment this counter instead of immediately calling readDisable on upstream stream. This is |
223 | | // to avoid the upstream request from being spuriously retried or reset because of upstream |
224 | | // timeouts while upstream stream is readDisabled by downstream but the response has actually |
225 | | // arrived from upstream. See https://github.com/envoyproxy/envoy/issues/25901. During the |
226 | | // deferring period, if the downstream buffer gets below low watermark, this counter gets |
227 | | // decremented. Once the response headers arrive, call readDisable the number of times as the |
228 | | // remaining value of this counter. |
229 | | size_t deferred_read_disabling_count_{0}; |
230 | | |
231 | | // Keep small members (bools and enums) at the end of class, to reduce alignment overhead. |
232 | | // Tracks the number of times the flow of data from downstream has been disabled. |
233 | | uint32_t downstream_data_disabled_{}; |
234 | | bool calling_encode_headers_ : 1; |
235 | | bool upstream_canary_ : 1; |
236 | | bool router_sent_end_stream_ : 1; |
237 | | bool encode_trailers_ : 1; |
238 | | bool retried_ : 1; |
239 | | bool awaiting_headers_ : 1; |
240 | | bool outlier_detection_timeout_recorded_ : 1; |
241 | | // Tracks whether we deferred a per try timeout because the downstream request |
242 | | // had not been completed yet. |
243 | | bool create_per_try_timeout_on_request_complete_ : 1; |
244 | | // True if the CONNECT headers have been sent but proxying payload is paused |
245 | | // waiting for response headers. |
246 | | bool paused_for_connect_ : 1; |
247 | | bool reset_stream_ : 1; |
248 | | |
249 | | // Sentinel to indicate if timeout budget tracking is configured for the cluster, |
250 | | // and if so, if the per-try histogram should record a value. |
251 | | bool record_timeout_budget_ : 1; |
252 | | // Track if one time clean up has been performed. |
253 | | bool cleaned_up_ : 1; |
254 | | bool had_upstream_ : 1; |
255 | | Http::ConnectionPool::Instance::StreamOptions stream_options_; |
256 | | bool grpc_rq_success_deferred_ : 1; |
257 | | bool upstream_wait_for_response_headers_before_disabling_read_ : 1; |
258 | | }; |
259 | | |
260 | | class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallbacks, |
261 | | public Event::DeferredDeletable, |
262 | | public Http::UpstreamStreamFilterCallbacks { |
263 | | public: |
264 | | UpstreamRequestFilterManagerCallbacks(UpstreamRequest& upstream_request) |
265 | 2.40k | : upstream_request_(upstream_request) {} |
266 | 2.22k | void encodeHeaders(Http::ResponseHeaderMap&, bool end_stream) override { |
267 | 2.22k | upstream_request_.decodeHeaders(std::move(response_headers_), end_stream); |
268 | 2.22k | } |
269 | 1 | void encode1xxHeaders(Http::ResponseHeaderMap&) override { |
270 | 1 | upstream_request_.decode1xxHeaders(std::move(informational_headers_)); |
271 | 1 | } |
272 | 4.28k | void encodeData(Buffer::Instance& data, bool end_stream) override { |
273 | 4.28k | upstream_request_.decodeData(data, end_stream); |
274 | 4.28k | } |
275 | 3 | void encodeTrailers(Http::ResponseTrailerMap&) override { |
276 | 3 | upstream_request_.decodeTrailers(std::move(response_trailers_)); |
277 | 3 | } |
278 | 181 | void encodeMetadata(Http::MetadataMapPtr&& metadata) override { |
279 | 181 | upstream_request_.decodeMetadata(std::move(metadata)); |
280 | 181 | } |
281 | 0 | void setRequestTrailers(Http::RequestTrailerMapPtr&& request_trailers) override { |
282 | 0 | trailers_ = std::move(request_trailers); |
283 | 0 | } |
284 | 1 | void setInformationalHeaders(Http::ResponseHeaderMapPtr&& response_headers) override { |
285 | 1 | informational_headers_ = std::move(response_headers); |
286 | 1 | } |
287 | 2.22k | void setResponseHeaders(Http::ResponseHeaderMapPtr&& response_headers) override { |
288 | 2.22k | response_headers_ = std::move(response_headers); |
289 | 2.22k | } |
290 | 3 | void setResponseTrailers(Http::ResponseTrailerMapPtr&& response_trailers) override { |
291 | 3 | response_trailers_ = std::move(response_trailers); |
292 | 3 | } |
293 | | Http::RequestHeaderMapOptRef requestHeaders() override; |
294 | | Http::RequestTrailerMapOptRef requestTrailers() override; |
295 | 1 | Http::ResponseHeaderMapOptRef informationalHeaders() override { |
296 | 1 | if (informational_headers_) { |
297 | 1 | return {*informational_headers_}; |
298 | 1 | } |
299 | 0 | return {}; |
300 | 1 | } |
301 | 2.22k | Http::ResponseHeaderMapOptRef responseHeaders() override { |
302 | 2.22k | if (response_headers_) { |
303 | 2.22k | return {*response_headers_}; |
304 | 2.22k | } |
305 | 0 | return {}; |
306 | 2.22k | } |
307 | 4.29k | Http::ResponseTrailerMapOptRef responseTrailers() override { |
308 | 4.29k | if (response_trailers_) { |
309 | 3 | return {*response_trailers_}; |
310 | 3 | } |
311 | 4.28k | return {}; |
312 | 4.29k | } |
313 | | // If the filter manager determines a decoder filter has available, tell |
314 | | // the router to resume the flow of data from downstream. |
315 | 0 | void onDecoderFilterBelowWriteBufferLowWatermark() override { |
316 | 0 | upstream_request_.onBelowWriteBufferLowWatermark(); |
317 | 0 | } |
318 | | // If the filter manager determines a decoder filter has too much data, tell |
319 | | // the router to stop the flow of data from downstream. |
320 | 0 | void onDecoderFilterAboveWriteBufferHighWatermark() override { |
321 | 0 | upstream_request_.onAboveWriteBufferHighWatermark(); |
322 | 0 | } |
323 | | |
324 | | // These functions are delegated to the downstream HCM/FM |
325 | | OptRef<const Tracing::Config> tracingConfig() const override; |
326 | | const ScopeTrackedObject& scope() override; |
327 | | Tracing::Span& activeSpan() override; |
328 | | void resetStream(Http::StreamResetReason reset_reason, |
329 | | absl::string_view transport_failure_reason) override; |
330 | | Upstream::ClusterInfoConstSharedPtr clusterInfo() override; |
331 | | Http::Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override; |
332 | | |
333 | | // Intentional no-op functions. |
334 | 0 | void onResponseDataTooLarge() override {} |
335 | 0 | void onRequestDataTooLarge() override {} |
336 | 1.18k | void endStream() override {} |
337 | 4.85k | void disarmRequestTimeout() override {} |
338 | 12.1k | void resetIdleTimer() override {} |
339 | 0 | void onLocalReply(Http::Code) override {} |
340 | | // Upgrade filter chains not supported. |
341 | 0 | const Router::RouteEntry::UpgradeMap* upgradeMap() override { return nullptr; } |
342 | | |
343 | | // Unsupported functions. |
344 | 0 | void recreateStream(StreamInfo::FilterStateSharedPtr) override { |
345 | 0 | IS_ENVOY_BUG("recreateStream called from upstream HTTP filter"); |
346 | 0 | } |
347 | 0 | void upgradeFilterChainCreated() override { |
348 | 0 | IS_ENVOY_BUG("upgradeFilterChainCreated called from upstream HTTP filter"); |
349 | 0 | } |
350 | 42.4k | OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {*this}; } |
351 | | |
352 | | // Http::UpstreamStreamFilterCallbacks |
353 | 12.2k | StreamInfo::StreamInfo& upstreamStreamInfo() override { return upstream_request_.streamInfo(); } |
354 | 10.3k | OptRef<GenericUpstream> upstream() override { |
355 | 10.3k | return makeOptRefFromPtr(upstream_request_.upstream_.get()); |
356 | 10.3k | } |
357 | 0 | void dumpState(std::ostream& os, int indent_level = 0) const override { |
358 | 0 | upstream_request_.dumpState(os, indent_level); |
359 | 0 | } |
360 | 8.91k | bool pausedForConnect() const override { return upstream_request_.paused_for_connect_; } |
361 | 0 | void setPausedForConnect(bool value) override { upstream_request_.paused_for_connect_ = value; } |
362 | 2.39k | const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override { |
363 | 2.39k | return upstream_request_.upstreamStreamOptions(); |
364 | 2.39k | } |
365 | 2.40k | void addUpstreamCallbacks(Http::UpstreamCallbacks& callbacks) override { |
366 | 2.40k | upstream_request_.upstream_callbacks_.push_back(&callbacks); |
367 | 2.40k | } |
368 | 2.40k | void setUpstreamToDownstream(UpstreamToDownstream& upstream_to_downstream_interface) override { |
369 | 2.40k | upstream_request_.upstream_interface_ = upstream_to_downstream_interface; |
370 | 2.40k | } |
371 | | |
372 | | Http::RequestTrailerMapPtr trailers_; |
373 | | Http::ResponseHeaderMapPtr informational_headers_; |
374 | | Http::ResponseHeaderMapPtr response_headers_; |
375 | | Http::ResponseTrailerMapPtr response_trailers_; |
376 | | UpstreamRequest& upstream_request_; |
377 | | }; |
378 | | |
379 | | } // namespace Router |
380 | | } // namespace Envoy |