/proc/self/cwd/source/common/router/upstream_request.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/common/router/upstream_request.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <functional> |
6 | | #include <memory> |
7 | | #include <string> |
8 | | |
9 | | #include "envoy/event/dispatcher.h" |
10 | | #include "envoy/event/timer.h" |
11 | | #include "envoy/grpc/status.h" |
12 | | #include "envoy/http/conn_pool.h" |
13 | | #include "envoy/http/header_map.h" |
14 | | #include "envoy/runtime/runtime.h" |
15 | | #include "envoy/upstream/cluster_manager.h" |
16 | | #include "envoy/upstream/upstream.h" |
17 | | |
18 | | #include "source/common/common/assert.h" |
19 | | #include "source/common/common/dump_state_utils.h" |
20 | | #include "source/common/common/empty_string.h" |
21 | | #include "source/common/common/enum_to_int.h" |
22 | | #include "source/common/common/scope_tracker.h" |
23 | | #include "source/common/common/utility.h" |
24 | | #include "source/common/grpc/common.h" |
25 | | #include "source/common/http/codes.h" |
26 | | #include "source/common/http/header_map_impl.h" |
27 | | #include "source/common/http/headers.h" |
28 | | #include "source/common/http/message_impl.h" |
29 | | #include "source/common/http/utility.h" |
30 | | #include "source/common/network/application_protocol.h" |
31 | | #include "source/common/network/transport_socket_options_impl.h" |
32 | | #include "source/common/network/upstream_server_name.h" |
33 | | #include "source/common/network/upstream_subject_alt_names.h" |
34 | | #include "source/common/router/config_impl.h" |
35 | | #include "source/common/router/debug_config.h" |
36 | | #include "source/common/router/router.h" |
37 | | #include "source/common/stream_info/uint32_accessor_impl.h" |
38 | | #include "source/common/tracing/http_tracer_impl.h" |
39 | | #include "source/extensions/common/proxy_protocol/proxy_protocol_header.h" |
40 | | |
41 | | namespace Envoy { |
42 | | namespace Router { |
43 | | |
44 | | // The upstream HTTP filter manager class. |
45 | | class UpstreamFilterManager : public Http::FilterManager { |
46 | | public: |
47 | | UpstreamFilterManager(Http::FilterManagerCallbacks& filter_manager_callbacks, |
48 | | Event::Dispatcher& dispatcher, OptRef<const Network::Connection> connection, |
49 | | uint64_t stream_id, Buffer::BufferMemoryAccountSharedPtr account, |
50 | | bool proxy_100_continue, uint32_t buffer_limit, |
51 | | const Http::FilterChainFactory& filter_chain_factory, |
52 | | UpstreamRequest& request) |
53 | | : FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account, |
54 | | proxy_100_continue, buffer_limit, filter_chain_factory), |
55 | 1.81k | upstream_request_(request) {} |
56 | | |
57 | 3.45k | StreamInfo::StreamInfo& streamInfo() override { |
58 | 3.45k | return upstream_request_.parent_.callbacks()->streamInfo(); |
59 | 3.45k | } |
60 | 0 | const StreamInfo::StreamInfo& streamInfo() const override { |
61 | 0 | return upstream_request_.parent_.callbacks()->streamInfo(); |
62 | 0 | } |
63 | | // Send local replies via the downstream HTTP filter manager. |
64 | | // Local replies will not be seen by upstream HTTP filters. |
65 | | void sendLocalReply(Http::Code code, absl::string_view body, |
66 | | const std::function<void(Http::ResponseHeaderMap& headers)>& modify_headers, |
67 | | const absl::optional<Grpc::Status::GrpcStatus> grpc_status, |
68 | 0 | absl::string_view details) override { |
69 | 0 | state().decoder_filter_chain_aborted_ = true; |
70 | 0 | state().encoder_filter_chain_aborted_ = true; |
71 | 0 | state().encoder_filter_chain_complete_ = true; |
72 | 0 | state().observed_encode_end_stream_ = true; |
73 | | // TODO(alyssawilk) this should be done through the router to play well with hedging. |
74 | 0 | upstream_request_.parent_.callbacks()->sendLocalReply(code, body, modify_headers, grpc_status, |
75 | 0 | details); |
76 | 0 | } |
77 | 0 | void executeLocalReplyIfPrepared() override {} |
78 | | UpstreamRequest& upstream_request_; |
79 | | }; |
80 | | |
81 | | UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent, |
82 | | std::unique_ptr<GenericConnPool>&& conn_pool, |
83 | | bool can_send_early_data, bool can_use_http3, |
84 | | bool enable_half_close) |
85 | | : parent_(parent), conn_pool_(std::move(conn_pool)), |
86 | | stream_info_(parent_.callbacks()->dispatcher().timeSource(), nullptr, |
87 | | StreamInfo::FilterState::LifeSpan::FilterChain), |
88 | | start_time_(parent_.callbacks()->dispatcher().timeSource().monotonicTime()), |
89 | | calling_encode_headers_(false), upstream_canary_(false), router_sent_end_stream_(false), |
90 | | encode_trailers_(false), retried_(false), awaiting_headers_(true), |
91 | | outlier_detection_timeout_recorded_(false), |
92 | | create_per_try_timeout_on_request_complete_(false), paused_for_connect_(false), |
93 | | paused_for_websocket_(false), reset_stream_(false), |
94 | | record_timeout_budget_(parent_.cluster()->timeoutBudgetStats().has_value()), |
95 | | cleaned_up_(false), had_upstream_(false), |
96 | | stream_options_({can_send_early_data, can_use_http3}), grpc_rq_success_deferred_(false), |
97 | 1.81k | enable_half_close_(enable_half_close) { |
98 | 1.81k | if (auto tracing_config = parent_.callbacks()->tracingConfig(); tracing_config.has_value()) { |
99 | 829 | if (tracing_config->spawnUpstreamSpan() || parent_.config().start_child_span_) { |
100 | 0 | span_ = parent_.callbacks()->activeSpan().spawnChild( |
101 | 0 | tracing_config.value().get(), |
102 | 0 | absl::StrCat("router ", parent.cluster()->observabilityName(), " egress"), |
103 | 0 | parent_.callbacks()->dispatcher().timeSource().systemTime()); |
104 | 0 | if (parent.attemptCount() != 1) { |
105 | | // This is a retry request, add this metadata to span. |
106 | 0 | span_->setTag(Tracing::Tags::get().RetryCount, std::to_string(parent.attemptCount() - 1)); |
107 | 0 | } |
108 | 0 | } |
109 | 829 | } |
110 | | |
111 | | // The router checks that the connection pool is non-null before creating the upstream request. |
112 | 1.81k | auto upstream_host = conn_pool_->host(); |
113 | 1.81k | Tracing::HttpTraceContext trace_context(*parent_.downstreamHeaders()); |
114 | 1.81k | Tracing::UpstreamContext upstream_context(upstream_host.get(), // host_ |
115 | 1.81k | &upstream_host->cluster(), // cluster_ |
116 | 1.81k | Tracing::ServiceType::Unknown, // service_type_ |
117 | 1.81k | false // async_client_span_ |
118 | 1.81k | ); |
119 | | |
120 | 1.81k | if (span_ != nullptr) { |
121 | 0 | span_->injectContext(trace_context, upstream_context); |
122 | 1.81k | } else { |
123 | | // No independent child span for current upstream request then inject the parent span's tracing |
124 | | // context into the request headers. |
125 | | // The injectContext() of the parent span may be called repeatedly when the request is retried. |
126 | 1.81k | parent_.callbacks()->activeSpan().injectContext(trace_context, upstream_context); |
127 | 1.81k | } |
128 | | |
129 | 1.81k | stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>()); |
130 | 1.81k | stream_info_.route_ = parent_.callbacks()->route(); |
131 | 1.81k | parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo()); |
132 | | |
133 | 1.81k | stream_info_.healthCheck(parent_.callbacks()->streamInfo().healthCheck()); |
134 | 1.81k | stream_info_.setIsShadow(parent_.callbacks()->streamInfo().isShadow()); |
135 | 1.81k | absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info = |
136 | 1.81k | parent_.callbacks()->streamInfo().upstreamClusterInfo(); |
137 | 1.81k | if (cluster_info.has_value()) { |
138 | 1.81k | stream_info_.setUpstreamClusterInfo(*cluster_info); |
139 | 1.81k | } |
140 | | |
141 | | // Set up the upstream HTTP filter manager. |
142 | 1.81k | filter_manager_callbacks_ = std::make_unique<UpstreamRequestFilterManagerCallbacks>(*this); |
143 | 1.81k | filter_manager_ = std::make_unique<UpstreamFilterManager>( |
144 | 1.81k | *filter_manager_callbacks_, parent_.callbacks()->dispatcher(), UpstreamRequest::connection(), |
145 | 1.81k | parent_.callbacks()->streamId(), parent_.callbacks()->account(), true, |
146 | 1.81k | parent_.callbacks()->decoderBufferLimit(), *parent_.cluster(), *this); |
147 | | // Attempt to create custom cluster-specified filter chain |
148 | 1.81k | bool created = parent_.cluster()->createFilterChain(*filter_manager_, |
149 | 1.81k | /*only_create_if_configured=*/true); |
150 | 1.81k | if (!created) { |
151 | | // Attempt to create custom router-specified filter chain. |
152 | 1.81k | created = parent_.config().createFilterChain(*filter_manager_); |
153 | 1.81k | } |
154 | 1.81k | if (!created) { |
155 | | // Neither cluster nor router have a custom filter chain; add the default |
156 | | // cluster filter chain, which only consists of the codec filter. |
157 | 1.81k | created = parent_.cluster()->createFilterChain(*filter_manager_, false); |
158 | 1.81k | } |
159 | | // There will always be a codec filter present, which sets the upstream |
160 | | // interface. Fast-fail any tests that don't set up mocks correctly. |
161 | 1.81k | ASSERT(created && upstream_interface_.has_value()); |
162 | 1.81k | } Unexecuted instantiation: Envoy::Router::UpstreamRequest::UpstreamRequest(Envoy::Router::RouterFilterInterface&, std::__1::unique_ptr<Envoy::Router::GenericConnPool, std::__1::default_delete<Envoy::Router::GenericConnPool> >&&, bool, bool, bool) Envoy::Router::UpstreamRequest::UpstreamRequest(Envoy::Router::RouterFilterInterface&, std::__1::unique_ptr<Envoy::Router::GenericConnPool, std::__1::default_delete<Envoy::Router::GenericConnPool> >&&, bool, bool, bool) Line | Count | Source | 97 | 1.81k | enable_half_close_(enable_half_close) { | 98 | 1.81k | if (auto tracing_config = parent_.callbacks()->tracingConfig(); tracing_config.has_value()) { | 99 | 829 | if (tracing_config->spawnUpstreamSpan() || parent_.config().start_child_span_) { | 100 | 0 | span_ = parent_.callbacks()->activeSpan().spawnChild( | 101 | 0 | tracing_config.value().get(), | 102 | 0 | absl::StrCat("router ", parent.cluster()->observabilityName(), " egress"), | 103 | 0 | parent_.callbacks()->dispatcher().timeSource().systemTime()); | 104 | 0 | if (parent.attemptCount() != 1) { | 105 | | // This is a retry request, add this metadata to span. | 106 | 0 | span_->setTag(Tracing::Tags::get().RetryCount, std::to_string(parent.attemptCount() - 1)); | 107 | 0 | } | 108 | 0 | } | 109 | 829 | } | 110 | | | 111 | | // The router checks that the connection pool is non-null before creating the upstream request. | 112 | 1.81k | auto upstream_host = conn_pool_->host(); | 113 | 1.81k | Tracing::HttpTraceContext trace_context(*parent_.downstreamHeaders()); | 114 | 1.81k | Tracing::UpstreamContext upstream_context(upstream_host.get(), // host_ | 115 | 1.81k | &upstream_host->cluster(), // cluster_ | 116 | 1.81k | Tracing::ServiceType::Unknown, // service_type_ | 117 | 1.81k | false // async_client_span_ | 118 | 1.81k | ); | 119 | | | 120 | 1.81k | if (span_ != nullptr) { | 121 | 0 | span_->injectContext(trace_context, upstream_context); | 122 | 1.81k | } else { | 123 | | // No independent child span for current upstream request then inject the parent span's tracing | 124 | | // context into the request headers. | 125 | | // The injectContext() of the parent span may be called repeatedly when the request is retried. | 126 | 1.81k | parent_.callbacks()->activeSpan().injectContext(trace_context, upstream_context); | 127 | 1.81k | } | 128 | | | 129 | 1.81k | stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>()); | 130 | 1.81k | stream_info_.route_ = parent_.callbacks()->route(); | 131 | 1.81k | parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo()); | 132 | | | 133 | 1.81k | stream_info_.healthCheck(parent_.callbacks()->streamInfo().healthCheck()); | 134 | 1.81k | stream_info_.setIsShadow(parent_.callbacks()->streamInfo().isShadow()); | 135 | 1.81k | absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info = | 136 | 1.81k | parent_.callbacks()->streamInfo().upstreamClusterInfo(); | 137 | 1.81k | if (cluster_info.has_value()) { | 138 | 1.81k | stream_info_.setUpstreamClusterInfo(*cluster_info); | 139 | 1.81k | } | 140 | | | 141 | | // Set up the upstream HTTP filter manager. | 142 | 1.81k | filter_manager_callbacks_ = std::make_unique<UpstreamRequestFilterManagerCallbacks>(*this); | 143 | 1.81k | filter_manager_ = std::make_unique<UpstreamFilterManager>( | 144 | 1.81k | *filter_manager_callbacks_, parent_.callbacks()->dispatcher(), UpstreamRequest::connection(), | 145 | 1.81k | parent_.callbacks()->streamId(), parent_.callbacks()->account(), true, | 146 | 1.81k | parent_.callbacks()->decoderBufferLimit(), *parent_.cluster(), *this); | 147 | | // Attempt to create custom cluster-specified filter chain | 148 | 1.81k | bool created = parent_.cluster()->createFilterChain(*filter_manager_, | 149 | 1.81k | /*only_create_if_configured=*/true); | 150 | 1.81k | if (!created) { | 151 | | // Attempt to create custom router-specified filter chain. | 152 | 1.81k | created = parent_.config().createFilterChain(*filter_manager_); | 153 | 1.81k | } | 154 | 1.81k | if (!created) { | 155 | | // Neither cluster nor router have a custom filter chain; add the default | 156 | | // cluster filter chain, which only consists of the codec filter. | 157 | 1.81k | created = parent_.cluster()->createFilterChain(*filter_manager_, false); | 158 | 1.81k | } | 159 | | // There will always be a codec filter present, which sets the upstream | 160 | | // interface. Fast-fail any tests that don't set up mocks correctly. | 161 | 1.81k | ASSERT(created && upstream_interface_.has_value()); | 162 | 1.81k | } |
|
163 | | |
164 | 1.81k | UpstreamRequest::~UpstreamRequest() { cleanUp(); } |
165 | | |
166 | 3.63k | void UpstreamRequest::cleanUp() { |
167 | 3.63k | if (cleaned_up_) { |
168 | 1.81k | return; |
169 | 1.81k | } |
170 | 1.81k | cleaned_up_ = true; |
171 | | |
172 | 1.81k | filter_manager_->destroyFilters(); |
173 | | |
174 | 1.81k | if (span_ != nullptr) { |
175 | 0 | auto tracing_config = parent_.callbacks()->tracingConfig(); |
176 | 0 | ASSERT(tracing_config.has_value()); |
177 | 0 | Tracing::HttpTracerUtility::finalizeUpstreamSpan(*span_, stream_info_, |
178 | 0 | tracing_config.value().get()); |
179 | 0 | } |
180 | | |
181 | 1.81k | if (per_try_timeout_ != nullptr) { |
182 | | // Allows for testing. |
183 | 0 | per_try_timeout_->disableTimer(); |
184 | 0 | } |
185 | | |
186 | 1.81k | if (per_try_idle_timeout_ != nullptr) { |
187 | | // Allows for testing. |
188 | 0 | per_try_idle_timeout_->disableTimer(); |
189 | 0 | } |
190 | | |
191 | 1.81k | if (max_stream_duration_timer_ != nullptr) { |
192 | 0 | max_stream_duration_timer_->disableTimer(); |
193 | 0 | } |
194 | | |
195 | 1.81k | if (upstream_log_flush_timer_ != nullptr) { |
196 | 0 | upstream_log_flush_timer_->disableTimer(); |
197 | 0 | } |
198 | | |
199 | 1.81k | clearRequestEncoder(); |
200 | | |
201 | | // If desired, fire the per-try histogram when the UpstreamRequest |
202 | | // completes. |
203 | 1.81k | if (record_timeout_budget_) { |
204 | 5 | Event::Dispatcher& dispatcher = parent_.callbacks()->dispatcher(); |
205 | 5 | const MonotonicTime end_time = dispatcher.timeSource().monotonicTime(); |
206 | 5 | const std::chrono::milliseconds response_time = |
207 | 5 | std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time_); |
208 | 5 | Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = parent_.cluster()->timeoutBudgetStats(); |
209 | 5 | tb_stats->get().upstream_rq_timeout_budget_per_try_percent_used_.recordValue( |
210 | 5 | FilterUtility::percentageOfTimeout(response_time, parent_.timeout().per_try_timeout_)); |
211 | 5 | } |
212 | | |
213 | | // Ditto for request/response size histograms. |
214 | 1.81k | Upstream::ClusterRequestResponseSizeStatsOptRef req_resp_stats_opt = |
215 | 1.81k | parent_.cluster()->requestResponseSizeStats(); |
216 | 1.81k | if (req_resp_stats_opt.has_value() && parent_.downstreamHeaders()) { |
217 | 5 | auto& req_resp_stats = req_resp_stats_opt->get(); |
218 | 5 | req_resp_stats.upstream_rq_headers_size_.recordValue(parent_.downstreamHeaders()->byteSize()); |
219 | 5 | req_resp_stats.upstream_rq_body_size_.recordValue(stream_info_.bytesSent()); |
220 | | |
221 | 5 | if (response_headers_size_.has_value()) { |
222 | 0 | req_resp_stats.upstream_rs_headers_size_.recordValue(response_headers_size_.value()); |
223 | 0 | req_resp_stats.upstream_rs_body_size_.recordValue(stream_info_.bytesReceived()); |
224 | 0 | } |
225 | 5 | } |
226 | | |
227 | 1.81k | stream_info_.onRequestComplete(); |
228 | 1.81k | upstreamLog(AccessLog::AccessLogType::UpstreamEnd); |
229 | | |
230 | 1.81k | while (downstream_data_disabled_ != 0) { |
231 | 0 | parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark(); |
232 | 0 | parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc(); |
233 | 0 | --downstream_data_disabled_; |
234 | 0 | } |
235 | | // The upstream HTTP filter chain callbacks own headers/trailers while they are traversing the |
236 | | // filter chain. Make sure to not delete them immediately when the stream ends, as the stream |
237 | | // often ends during filter chain processing and it causes use-after-free violations. |
238 | 1.81k | parent_.callbacks()->dispatcher().deferredDelete(std::move(filter_manager_callbacks_)); |
239 | 1.81k | } |
240 | | |
241 | 1.81k | void UpstreamRequest::upstreamLog(AccessLog::AccessLogType access_log_type) { |
242 | 1.81k | const Formatter::HttpFormatterContext log_context{parent_.downstreamHeaders(), |
243 | 1.81k | upstream_headers_.get(), |
244 | 1.81k | upstream_trailers_.get(), |
245 | 1.81k | {}, |
246 | 1.81k | access_log_type}; |
247 | | |
248 | 1.81k | for (const auto& upstream_log : parent_.config().upstream_logs_) { |
249 | 0 | upstream_log->log(log_context, stream_info_); |
250 | 0 | } |
251 | 1.81k | } |
252 | | |
253 | | // This is called by the FilterManager when all filters have processed 1xx headers. Forward them |
254 | | // on to the router. |
255 | 1 | void UpstreamRequest::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) { |
256 | 1 | ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher()); |
257 | | |
258 | 1 | ASSERT(Http::HeaderUtility::isSpecial1xx(*headers)); |
259 | 1 | addResponseHeadersSize(headers->byteSize()); |
260 | 1 | maybeHandleDeferredReadDisable(); |
261 | 1 | parent_.onUpstream1xxHeaders(std::move(headers), *this); |
262 | 1 | } |
263 | | |
264 | | // This is called by the FilterManager when all filters have processed headers. Forward them |
265 | | // on to the router. |
266 | 1.63k | void UpstreamRequest::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) { |
267 | 1.63k | ASSERT(headers.get()); |
268 | 1.63k | ENVOY_STREAM_LOG(trace, "end_stream: {}, upstream response headers:\n{}", *parent_.callbacks(), |
269 | 1.63k | end_stream, *headers); |
270 | 1.63k | ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher()); |
271 | | |
272 | 1.63k | resetPerTryIdleTimer(); |
273 | | |
274 | 1.63k | addResponseHeadersSize(headers->byteSize()); |
275 | | |
276 | | // We drop unsupported 1xx on the floor here. 101 upgrade headers need to be passed to the client |
277 | | // as part of the final response. Most 1xx headers are handled in onUpstream1xxHeaders. |
278 | | // |
279 | | // We could in principle handle other headers here, but this might result in the double invocation |
280 | | // of decodeHeaders() (once for informational, again for non-informational), which is likely an |
281 | | // easy to miss corner case in the filter and HCM contract. |
282 | | // |
283 | | // This filtering is done early in upstream request, unlike 100 coalescing which is performed in |
284 | | // the router filter, since the filtering only depends on the state of a single upstream, and we |
285 | | // don't want to confuse accounting such as onFirstUpstreamRxByteReceived() with informational |
286 | | // headers. |
287 | 1.63k | const uint64_t response_code = Http::Utility::getResponseStatus(*headers); |
288 | 1.63k | if (Http::CodeUtility::is1xx(response_code) && |
289 | 1.63k | response_code != enumToInt(Http::Code::SwitchingProtocols)) { |
290 | 0 | return; |
291 | 0 | } |
292 | | |
293 | 1.63k | awaiting_headers_ = false; |
294 | 1.63k | if (span_ != nullptr) { |
295 | 0 | Tracing::HttpTracerUtility::onUpstreamResponseHeaders(*span_, headers.get()); |
296 | 0 | } |
297 | 1.63k | if (!parent_.config().upstream_logs_.empty()) { |
298 | 0 | upstream_headers_ = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(*headers); |
299 | 0 | } |
300 | 1.63k | stream_info_.setResponseCode(static_cast<uint32_t>(response_code)); |
301 | | |
302 | 1.63k | maybeHandleDeferredReadDisable(); |
303 | 1.63k | ASSERT(headers.get()); |
304 | | |
305 | 1.63k | parent_.onUpstreamHeaders(response_code, std::move(headers), *this, end_stream); |
306 | 1.63k | } |
307 | | |
308 | 1.63k | void UpstreamRequest::maybeHandleDeferredReadDisable() { |
309 | 1.63k | for (; deferred_read_disabling_count_ > 0; --deferred_read_disabling_count_) { |
310 | | // If the deferred read disabling count hasn't been cancelled out by read |
311 | | // enabling count so far, stop the upstream from reading the rest response. |
312 | | // Because readDisable keeps track of how many time it is called with |
313 | | // "true" or "false", here it has to be called with "true" the same number |
314 | | // of times as it would be called with "false" in the future. |
315 | 0 | parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc(); |
316 | 0 | upstream_->readDisable(true); |
317 | 0 | } |
318 | 1.63k | } |
319 | | |
320 | 3.11k | void UpstreamRequest::decodeData(Buffer::Instance& data, bool end_stream) { |
321 | 3.11k | ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher()); |
322 | | |
323 | 3.11k | resetPerTryIdleTimer(); |
324 | 3.11k | stream_info_.addBytesReceived(data.length()); |
325 | 3.11k | parent_.onUpstreamData(data, *this, end_stream); |
326 | 3.11k | } |
327 | | |
328 | 1 | void UpstreamRequest::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) { |
329 | 1 | ENVOY_STREAM_LOG(trace, "upstream response trailers:\n{}", *parent_.callbacks(), *trailers); |
330 | 1 | ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher()); |
331 | | |
332 | 1 | if (span_ != nullptr) { |
333 | 0 | Tracing::HttpTracerUtility::onUpstreamResponseTrailers(*span_, trailers.get()); |
334 | 0 | } |
335 | 1 | if (!parent_.config().upstream_logs_.empty()) { |
336 | 0 | upstream_trailers_ = Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*trailers); |
337 | 0 | } |
338 | 1 | parent_.onUpstreamTrailers(std::move(trailers), *this); |
339 | 1 | } |
340 | | |
341 | 0 | void UpstreamRequest::dumpState(std::ostream& os, int indent_level) const { |
342 | 0 | const char* spaces = spacesForLevel(indent_level); |
343 | 0 | os << spaces << "UpstreamRequest " << this << "\n"; |
344 | 0 | if (connection()) { |
345 | 0 | const auto addressProvider = connection()->connectionInfoProviderSharedPtr(); |
346 | 0 | DUMP_DETAILS(addressProvider); |
347 | 0 | } |
348 | 0 | const Http::RequestHeaderMap* request_headers = parent_.downstreamHeaders(); |
349 | 0 | DUMP_DETAILS(request_headers); |
350 | 0 | if (filter_manager_) { |
351 | 0 | filter_manager_->dumpState(os, indent_level); |
352 | 0 | } |
353 | 0 | } |
354 | | |
355 | 1.77k | const Route& UpstreamRequest::route() const { return *parent_.callbacks()->route(); } |
356 | | |
357 | 1.81k | OptRef<const Network::Connection> UpstreamRequest::connection() const { |
358 | 1.81k | return parent_.callbacks()->connection(); |
359 | 1.81k | } |
360 | | |
361 | 293 | void UpstreamRequest::decodeMetadata(Http::MetadataMapPtr&& metadata_map) { |
362 | 293 | parent_.onUpstreamMetadata(std::move(metadata_map)); |
363 | 293 | } |
364 | | |
365 | 0 | void UpstreamRequest::maybeEndDecode(bool end_stream) { |
366 | 0 | if (end_stream) { |
367 | 0 | upstreamTiming().onLastUpstreamRxByteReceived(parent_.callbacks()->dispatcher().timeSource()); |
368 | 0 | } |
369 | 0 | } |
370 | | |
371 | | void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host, |
372 | 1.77k | bool pool_success) { |
373 | 1.77k | StreamInfo::UpstreamInfo& upstream_info = *streamInfo().upstreamInfo(); |
374 | 1.77k | upstream_info.setUpstreamHost(host); |
375 | 1.77k | upstream_host_ = host; |
376 | 1.77k | parent_.onUpstreamHostSelected(host, pool_success); |
377 | 1.77k | } |
378 | | |
379 | 1.81k | void UpstreamRequest::acceptHeadersFromRouter(bool end_stream) { |
380 | 1.81k | ASSERT(!router_sent_end_stream_); |
381 | 1.81k | router_sent_end_stream_ = end_stream; |
382 | | |
383 | | // Make sure that when we are forwarding CONNECT payload we do not do so until |
384 | | // the upstream has accepted the CONNECT request. |
385 | | // This must be done before conn_pool->newStream, as onPoolReady un-pauses for CONNECT |
386 | | // termination. |
387 | 1.81k | auto* headers = parent_.downstreamHeaders(); |
388 | 1.81k | if (headers->getMethodValue() == Http::Headers::get().MethodValues.Connect) { |
389 | 0 | paused_for_connect_ = true; |
390 | | // If this is a websocket upgrade request, pause the request until the upstream sends |
391 | | // the 101 Switching Protocols response code. Using the else logic here to obey CONNECT |
392 | | // method which is expecting 2xx response. |
393 | 1.81k | } else if ((Runtime::runtimeFeatureEnabled( |
394 | 1.81k | "envoy.reloadable_features.check_switch_protocol_websocket_handshake")) && |
395 | 1.81k | Http::Utility::isWebSocketUpgradeRequest(*headers)) { |
396 | 0 | paused_for_websocket_ = true; |
397 | 0 | } |
398 | | |
399 | | // Kick off creation of the upstream connection immediately upon receiving headers. |
400 | | // In future it may be possible for upstream HTTP filters to delay this, or influence connection |
401 | | // creation but for now optimize for minimal latency and fetch the connection |
402 | | // as soon as possible. |
403 | 1.81k | conn_pool_->newStream(this); |
404 | | |
405 | 1.81k | if (parent_.config().upstream_log_flush_interval_.has_value()) { |
406 | 0 | upstream_log_flush_timer_ = parent_.callbacks()->dispatcher().createTimer([this]() -> void { |
407 | | // If the request is complete, we've already done the stream-end upstream log, and shouldn't |
408 | | // do the periodic log. |
409 | 0 | if (!streamInfo().requestComplete().has_value()) { |
410 | 0 | upstreamLog(AccessLog::AccessLogType::UpstreamPeriodic); |
411 | 0 | resetUpstreamLogFlushTimer(); |
412 | 0 | } |
413 | | // Both downstream and upstream bytes meters may not be initialized when |
414 | | // the timer goes off, e.g. if it takes longer than the interval for a |
415 | | // connection to be initialized; check for nullptr. |
416 | 0 | auto& downstream_bytes_meter = stream_info_.getDownstreamBytesMeter(); |
417 | 0 | auto& upstream_bytes_meter = stream_info_.getUpstreamBytesMeter(); |
418 | 0 | const SystemTime now = parent_.callbacks()->dispatcher().timeSource().systemTime(); |
419 | 0 | if (downstream_bytes_meter) { |
420 | 0 | downstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now); |
421 | 0 | } |
422 | 0 | if (upstream_bytes_meter) { |
423 | 0 | upstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now); |
424 | 0 | } |
425 | 0 | }); |
426 | |
|
427 | 0 | resetUpstreamLogFlushTimer(); |
428 | 0 | } |
429 | | |
430 | 1.81k | filter_manager_->requestHeadersInitialized(); |
431 | 1.81k | filter_manager_->streamInfo().setRequestHeaders(*parent_.downstreamHeaders()); |
432 | 1.81k | filter_manager_->decodeHeaders(*parent_.downstreamHeaders(), end_stream); |
433 | 1.81k | } |
434 | | |
435 | 3.05k | void UpstreamRequest::acceptDataFromRouter(Buffer::Instance& data, bool end_stream) { |
436 | 3.05k | ASSERT(!router_sent_end_stream_); |
437 | 3.05k | router_sent_end_stream_ = end_stream; |
438 | | |
439 | 3.05k | filter_manager_->decodeData(data, end_stream); |
440 | 3.05k | } |
441 | | |
442 | 0 | void UpstreamRequest::acceptTrailersFromRouter(Http::RequestTrailerMap& trailers) { |
443 | 0 | ASSERT(!router_sent_end_stream_); |
444 | 0 | router_sent_end_stream_ = true; |
445 | 0 | encode_trailers_ = true; |
446 | |
|
447 | 0 | filter_manager_->decodeTrailers(trailers); |
448 | 0 | } |
449 | | |
450 | 26 | void UpstreamRequest::acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr) { |
451 | 26 | filter_manager_->decodeMetadata(*metadata_map_ptr); |
452 | 26 | } |
453 | | |
454 | | void UpstreamRequest::onResetStream(Http::StreamResetReason reason, |
455 | 121 | absl::string_view transport_failure_reason) { |
456 | 121 | ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher()); |
457 | | |
458 | 121 | if (span_ != nullptr) { |
459 | | // Add tags about reset. |
460 | 0 | span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True); |
461 | 0 | span_->setTag(Tracing::Tags::get().ErrorReason, Http::Utility::resetReasonToString(reason)); |
462 | 0 | } |
463 | 121 | clearRequestEncoder(); |
464 | 121 | awaiting_headers_ = false; |
465 | 121 | if (!calling_encode_headers_) { |
466 | 121 | stream_info_.setResponseFlag(Filter::streamResetReasonToResponseFlag(reason)); |
467 | 121 | parent_.onUpstreamReset(reason, transport_failure_reason, *this); |
468 | 121 | } else { |
469 | 0 | deferred_reset_reason_ = reason; |
470 | 0 | } |
471 | 121 | } |
472 | | |
473 | 896 | void UpstreamRequest::resetStream() { |
474 | 896 | if (conn_pool_->cancelAnyPendingStream()) { |
475 | 46 | ENVOY_STREAM_LOG(debug, "canceled pool request", *parent_.callbacks()); |
476 | 46 | ASSERT(!upstream_); |
477 | 46 | } |
478 | | |
479 | | // Don't reset the stream if we're already done with it. |
480 | 896 | if (upstreamTiming().last_upstream_tx_byte_sent_.has_value() && |
481 | 896 | upstreamTiming().last_upstream_rx_byte_received_.has_value()) { |
482 | 0 | return; |
483 | 0 | } |
484 | | |
485 | 896 | if (span_ != nullptr) { |
486 | | // Add tags about the cancellation. |
487 | 0 | span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True); |
488 | 0 | } |
489 | | |
490 | 896 | if (upstream_) { |
491 | 850 | ENVOY_STREAM_LOG(debug, "resetting pool request", *parent_.callbacks()); |
492 | 850 | upstream_->resetStream(); |
493 | 850 | clearRequestEncoder(); |
494 | 850 | } |
495 | 896 | reset_stream_ = true; |
496 | 896 | } |
497 | | |
498 | 4.74k | void UpstreamRequest::resetPerTryIdleTimer() { |
499 | 4.74k | if (per_try_idle_timeout_ != nullptr) { |
500 | 0 | per_try_idle_timeout_->enableTimer(parent_.timeout().per_try_idle_timeout_); |
501 | 0 | } |
502 | 4.74k | } |
503 | | |
504 | 0 | void UpstreamRequest::resetUpstreamLogFlushTimer() { |
505 | 0 | if (upstream_log_flush_timer_ != nullptr) { |
506 | 0 | upstream_log_flush_timer_->enableTimer(parent_.config().upstream_log_flush_interval_.value()); |
507 | 0 | } |
508 | 0 | } |
509 | | |
510 | 1.60k | void UpstreamRequest::setupPerTryTimeout() { |
511 | 1.60k | ASSERT(!per_try_timeout_); |
512 | 1.60k | if (parent_.timeout().per_try_timeout_.count() > 0) { |
513 | 0 | per_try_timeout_ = |
514 | 0 | parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryTimeout(); }); |
515 | 0 | per_try_timeout_->enableTimer(parent_.timeout().per_try_timeout_); |
516 | 0 | } |
517 | | |
518 | 1.60k | ASSERT(!per_try_idle_timeout_); |
519 | 1.60k | if (parent_.timeout().per_try_idle_timeout_.count() > 0) { |
520 | 0 | per_try_idle_timeout_ = |
521 | 0 | parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryIdleTimeout(); }); |
522 | 0 | resetPerTryIdleTimer(); |
523 | 0 | } |
524 | 1.60k | } |
525 | | |
526 | 0 | void UpstreamRequest::onPerTryIdleTimeout() { |
527 | 0 | ENVOY_STREAM_LOG(debug, "upstream per try idle timeout", *parent_.callbacks()); |
528 | 0 | if (per_try_timeout_) { |
529 | | // Disable the per try idle timer, so it does not trigger further retries |
530 | 0 | per_try_timeout_->disableTimer(); |
531 | 0 | } |
532 | 0 | stream_info_.setResponseFlag(StreamInfo::CoreResponseFlag::StreamIdleTimeout); |
533 | 0 | parent_.onPerTryIdleTimeout(*this); |
534 | 0 | } |
535 | | |
536 | 0 | void UpstreamRequest::onPerTryTimeout() { |
537 | 0 | if (per_try_idle_timeout_) { |
538 | | // Delete the per try idle timer, so it does not trigger further retries. |
539 | | // The timer has to be deleted to prevent data flow from re-arming it. |
540 | 0 | per_try_idle_timeout_.reset(); |
541 | 0 | } |
542 | | // If we've sent anything downstream, ignore the per try timeout and let the response continue |
543 | | // up to the global timeout |
544 | 0 | if (!parent_.downstreamResponseStarted()) { |
545 | 0 | ENVOY_STREAM_LOG(debug, "upstream per try timeout", *parent_.callbacks()); |
546 | |
|
547 | 0 | stream_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamRequestTimeout); |
548 | 0 | parent_.onPerTryTimeout(*this); |
549 | 0 | } else { |
550 | 0 | ENVOY_STREAM_LOG(debug, |
551 | 0 | "ignored upstream per try timeout due to already started downstream response", |
552 | 0 | *parent_.callbacks()); |
553 | 0 | } |
554 | 0 | } |
555 | | |
556 | 1.77k | void UpstreamRequest::recordConnectionPoolCallbackLatency() { |
557 | 1.77k | upstreamTiming().recordConnectionPoolCallbackLatency( |
558 | 1.77k | start_time_, parent_.callbacks()->dispatcher().timeSource()); |
559 | 1.77k | } |
560 | | |
561 | | void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason, |
562 | | absl::string_view transport_failure_reason, |
563 | 0 | Upstream::HostDescriptionConstSharedPtr host) { |
564 | 0 | recordConnectionPoolCallbackLatency(); |
565 | 0 | Http::StreamResetReason reset_reason = [](ConnectionPool::PoolFailureReason reason) { |
566 | 0 | switch (reason) { |
567 | 0 | case ConnectionPool::PoolFailureReason::Overflow: |
568 | 0 | return Http::StreamResetReason::Overflow; |
569 | 0 | case ConnectionPool::PoolFailureReason::RemoteConnectionFailure: |
570 | 0 | return Http::StreamResetReason::RemoteConnectionFailure; |
571 | 0 | case ConnectionPool::PoolFailureReason::LocalConnectionFailure: |
572 | 0 | return Http::StreamResetReason::LocalConnectionFailure; |
573 | 0 | case ConnectionPool::PoolFailureReason::Timeout: |
574 | 0 | return Http::StreamResetReason::ConnectionTimeout; |
575 | 0 | } |
576 | 0 | PANIC_DUE_TO_CORRUPT_ENUM; |
577 | 0 | }(reason); |
578 | |
|
579 | 0 | stream_info_.upstreamInfo()->setUpstreamTransportFailureReason(transport_failure_reason); |
580 | | |
581 | | // Mimic an upstream reset. |
582 | 0 | onUpstreamHostSelected(host, false); |
583 | 0 | onResetStream(reset_reason, transport_failure_reason); |
584 | 0 | } |
585 | | |
586 | | void UpstreamRequest::onPoolReady(std::unique_ptr<GenericUpstream>&& upstream, |
587 | | Upstream::HostDescriptionConstSharedPtr host, |
588 | | const Network::ConnectionInfoProvider& address_provider, |
589 | | StreamInfo::StreamInfo& info, |
590 | 1.77k | absl::optional<Http::Protocol> protocol) { |
591 | | // This may be called under an existing ScopeTrackerScopeState but it will unwind correctly. |
592 | 1.77k | ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher()); |
593 | 1.77k | ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks()); |
594 | 1.77k | recordConnectionPoolCallbackLatency(); |
595 | 1.77k | upstream_ = std::move(upstream); |
596 | 1.77k | had_upstream_ = true; |
597 | | // Have the upstream use the account of the downstream. |
598 | 1.77k | upstream_->setAccount(parent_.callbacks()->account()); |
599 | | |
600 | 1.77k | host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess); |
601 | | |
602 | 1.77k | onUpstreamHostSelected(host, true); |
603 | | |
604 | 1.77k | if (protocol) { |
605 | 1.77k | stream_info_.protocol(protocol.value()); |
606 | 1.77k | } else { |
607 | | // We only pause for CONNECT and WebSocket for HTTP upstreams. If this is a TCP upstream, |
608 | | // unpause. |
609 | 0 | paused_for_connect_ = false; |
610 | 0 | paused_for_websocket_ = false; |
611 | 0 | } |
612 | | |
613 | 1.77k | StreamInfo::UpstreamInfo& upstream_info = *stream_info_.upstreamInfo(); |
614 | 1.77k | if (info.upstreamInfo()) { |
615 | 1.77k | auto& upstream_timing = info.upstreamInfo()->upstreamTiming(); |
616 | 1.77k | upstreamTiming().upstream_connect_start_ = upstream_timing.upstream_connect_start_; |
617 | 1.77k | upstreamTiming().upstream_connect_complete_ = upstream_timing.upstream_connect_complete_; |
618 | 1.77k | upstreamTiming().upstream_handshake_complete_ = upstream_timing.upstream_handshake_complete_; |
619 | 1.77k | upstream_info.setUpstreamNumStreams(info.upstreamInfo()->upstreamNumStreams()); |
620 | 1.77k | } |
621 | | |
622 | | // Upstream HTTP filters might have already created/set a filter state. |
623 | 1.77k | const StreamInfo::FilterStateSharedPtr& filter_state = info.filterState(); |
624 | 1.77k | if (!filter_state) { |
625 | 0 | upstream_info.setUpstreamFilterState( |
626 | 0 | std::make_shared<StreamInfo::FilterStateImpl>(StreamInfo::FilterState::LifeSpan::Request)); |
627 | 1.77k | } else { |
628 | 1.77k | upstream_info.setUpstreamFilterState(filter_state); |
629 | 1.77k | } |
630 | 1.77k | upstream_info.setUpstreamLocalAddress(address_provider.localAddress()); |
631 | 1.77k | upstream_info.setUpstreamRemoteAddress(address_provider.remoteAddress()); |
632 | 1.77k | upstream_info.setUpstreamSslConnection(info.downstreamAddressProvider().sslConnection()); |
633 | | |
634 | 1.77k | if (info.downstreamAddressProvider().connectionID().has_value()) { |
635 | 1.76k | upstream_info.setUpstreamConnectionId(info.downstreamAddressProvider().connectionID().value()); |
636 | 1.76k | } |
637 | | |
638 | 1.77k | if (info.downstreamAddressProvider().interfaceName().has_value()) { |
639 | 0 | upstream_info.setUpstreamInterfaceName( |
640 | 0 | info.downstreamAddressProvider().interfaceName().value()); |
641 | 0 | } |
642 | | |
643 | 1.77k | stream_info_.setUpstreamBytesMeter(upstream_->bytesMeter()); |
644 | 1.77k | StreamInfo::StreamInfo::syncUpstreamAndDownstreamBytesMeter(parent_.callbacks()->streamInfo(), |
645 | 1.77k | stream_info_); |
646 | 1.77k | if (protocol) { |
647 | 1.77k | upstream_info.setUpstreamProtocol(protocol.value()); |
648 | 1.77k | } |
649 | | |
650 | 1.77k | if (parent_.downstreamEndStream()) { |
651 | 444 | setupPerTryTimeout(); |
652 | 1.32k | } else { |
653 | 1.32k | create_per_try_timeout_on_request_complete_ = true; |
654 | 1.32k | } |
655 | | |
656 | | // Make sure the connection manager will inform the downstream watermark manager when the |
657 | | // downstream buffers are overrun. This may result in immediate watermark callbacks referencing |
658 | | // the encoder. |
659 | 1.77k | parent_.callbacks()->addDownstreamWatermarkCallbacks(downstream_watermark_manager_); |
660 | | |
661 | 1.77k | absl::optional<std::chrono::milliseconds> max_stream_duration; |
662 | 1.77k | if (parent_.dynamicMaxStreamDuration().has_value()) { |
663 | 0 | max_stream_duration = parent_.dynamicMaxStreamDuration().value(); |
664 | 1.77k | } else if (upstream_host_->cluster().commonHttpProtocolOptions().has_max_stream_duration()) { |
665 | 0 | max_stream_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds( |
666 | 0 | upstream_host_->cluster().commonHttpProtocolOptions().max_stream_duration())); |
667 | 0 | } |
668 | 1.77k | if (max_stream_duration.has_value() && max_stream_duration->count()) { |
669 | 0 | max_stream_duration_timer_ = parent_.callbacks()->dispatcher().createTimer( |
670 | 0 | [this]() -> void { onStreamMaxDurationReached(); }); |
671 | 0 | max_stream_duration_timer_->enableTimer(*max_stream_duration); |
672 | 0 | } |
673 | | |
674 | 1.77k | const auto* route_entry = route().routeEntry(); |
675 | 1.77k | if (route_entry->autoHostRewrite() && !host->hostname().empty()) { |
676 | 0 | Http::Utility::updateAuthority(*parent_.downstreamHeaders(), host->hostname(), |
677 | 0 | route_entry->appendXfh()); |
678 | 0 | } |
679 | | |
680 | 1.77k | stream_info_.setRequestHeaders(*parent_.downstreamHeaders()); |
681 | | |
682 | 1.77k | if (parent_.config().flush_upstream_log_on_upstream_stream_) { |
683 | 0 | upstreamLog(AccessLog::AccessLogType::UpstreamPoolReady); |
684 | 0 | } |
685 | | |
686 | 1.77k | if (address_provider.connectionID() && stream_info_.downstreamAddressProvider().connectionID()) { |
687 | 0 | ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]", |
688 | 0 | address_provider.connectionID().value(), |
689 | 0 | stream_info_.downstreamAddressProvider().connectionID().value()); |
690 | 0 | } |
691 | | |
692 | 1.77k | for (auto* callback : upstream_callbacks_) { |
693 | 1.77k | callback->onUpstreamConnectionEstablished(); |
694 | 1.77k | } |
695 | 1.77k | } |
696 | | |
697 | 5.39k | UpstreamToDownstream& UpstreamRequest::upstreamToDownstream() { return *upstream_interface_; } |
698 | | |
699 | 0 | void UpstreamRequest::onStreamMaxDurationReached() { |
700 | 0 | upstream_host_->cluster().trafficStats()->upstream_rq_max_duration_reached_.inc(); |
701 | | |
702 | | // The upstream had closed then try to retry along with retry policy. |
703 | 0 | parent_.onStreamMaxDurationReached(*this); |
704 | 0 | } |
705 | | |
706 | 2.78k | void UpstreamRequest::clearRequestEncoder() { |
707 | | // Before clearing the encoder, unsubscribe from callbacks. |
708 | 2.78k | if (upstream_) { |
709 | 1.77k | parent_.callbacks()->removeDownstreamWatermarkCallbacks(downstream_watermark_manager_); |
710 | 1.77k | } |
711 | 2.78k | upstream_.reset(); |
712 | 2.78k | } |
713 | | |
714 | 1.30k | void UpstreamRequest::readDisableOrDefer(bool disable) { |
715 | 1.30k | if (disable) { |
716 | | // See comments on deferred_read_disabling_count_ for when we do and don't defer. |
717 | 655 | if (parent_.downstreamResponseStarted()) { |
718 | | // The downstream connection is overrun. Pause reads from upstream. |
719 | | // If there are multiple calls to readDisable either the codec (H2) or the |
720 | | // underlying Network::Connection (H1) will handle reference counting. |
721 | 655 | parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc(); |
722 | 655 | upstream_->readDisable(disable); |
723 | 655 | } else { |
724 | 0 | ++deferred_read_disabling_count_; |
725 | 0 | } |
726 | 655 | return; |
727 | 655 | } |
728 | | |
729 | | // One source of connection blockage has buffer available. |
730 | 651 | if (deferred_read_disabling_count_ > 0) { |
731 | 0 | ASSERT(!parent_.downstreamResponseStarted()); |
732 | | // Cancel out an existing deferred read disabling. |
733 | 0 | --deferred_read_disabling_count_; |
734 | 0 | return; |
735 | 0 | } |
736 | 651 | ASSERT(parent_.downstreamResponseStarted()); |
737 | | // Pass this on to the stream, which |
738 | | // will resume reads if this was the last remaining high watermark. |
739 | 651 | parent_.cluster()->trafficStats()->upstream_flow_control_resumed_reading_total_.inc(); |
740 | 651 | upstream_->readDisable(disable); |
741 | 651 | } |
742 | | |
743 | 655 | void UpstreamRequest::DownstreamWatermarkManager::onAboveWriteBufferHighWatermark() { |
744 | 655 | ASSERT(parent_.upstream_); |
745 | 655 | parent_.readDisableOrDefer(true); |
746 | 655 | } |
747 | | |
748 | 651 | void UpstreamRequest::DownstreamWatermarkManager::onBelowWriteBufferLowWatermark() { |
749 | 651 | ASSERT(parent_.upstream_); |
750 | 651 | parent_.readDisableOrDefer(false); |
751 | 651 | } |
752 | | |
753 | 0 | void UpstreamRequest::disableDataFromDownstreamForFlowControl() { |
754 | 0 | parent_.cluster()->trafficStats()->upstream_flow_control_backed_up_total_.inc(); |
755 | 0 | parent_.callbacks()->onDecoderFilterAboveWriteBufferHighWatermark(); |
756 | 0 | ++downstream_data_disabled_; |
757 | 0 | } |
758 | | |
759 | 0 | void UpstreamRequest::enableDataFromDownstreamForFlowControl() { |
760 | 0 | parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc(); |
761 | 0 | parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark(); |
762 | 0 | ASSERT(downstream_data_disabled_ != 0); |
763 | 0 | if (downstream_data_disabled_ > 0) { |
764 | 0 | --downstream_data_disabled_; |
765 | 0 | } |
766 | 0 | } |
767 | | |
768 | 4.54k | Http::RequestHeaderMapOptRef UpstreamRequestFilterManagerCallbacks::requestHeaders() { |
769 | 4.54k | return {*upstream_request_.parent_.downstreamHeaders()}; |
770 | 4.54k | } |
771 | | |
772 | 8.79k | Http::RequestTrailerMapOptRef UpstreamRequestFilterManagerCallbacks::requestTrailers() { |
773 | 8.79k | if (upstream_request_.parent_.downstreamTrailers()) { |
774 | 0 | return {*upstream_request_.parent_.downstreamTrailers()}; |
775 | 0 | } |
776 | 8.79k | if (trailers_) { |
777 | 0 | return {*trailers_}; |
778 | 0 | } |
779 | 8.79k | return {}; |
780 | 8.79k | } |
781 | | |
782 | 0 | const ScopeTrackedObject& UpstreamRequestFilterManagerCallbacks::scope() { |
783 | 0 | return upstream_request_.parent_.callbacks()->scope(); |
784 | 0 | } |
785 | | |
786 | 0 | OptRef<const Tracing::Config> UpstreamRequestFilterManagerCallbacks::tracingConfig() const { |
787 | 0 | return upstream_request_.parent_.callbacks()->tracingConfig(); |
788 | 0 | } |
789 | | |
790 | 0 | Tracing::Span& UpstreamRequestFilterManagerCallbacks::activeSpan() { |
791 | 0 | return upstream_request_.parent_.callbacks()->activeSpan(); |
792 | 0 | } |
793 | | |
794 | | void UpstreamRequestFilterManagerCallbacks::resetStream( |
795 | 121 | Http::StreamResetReason reset_reason, absl::string_view transport_failure_reason) { |
796 | | // The filter manager needs to disambiguate between a filter-driven reset, |
797 | | // which should force reset the stream, and a codec driven reset, which should |
798 | | // tell the router the stream reset, and let the router make the decision to |
799 | | // send a local reply, or retry the stream. |
800 | 121 | bool is_codec_error; |
801 | 121 | if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.report_stream_reset_error_code")) { |
802 | 121 | is_codec_error = absl::StrContains(transport_failure_reason, "codec_error"); |
803 | 121 | } else { |
804 | 0 | is_codec_error = transport_failure_reason == "codec_error"; |
805 | 0 | } |
806 | 121 | if (reset_reason == Http::StreamResetReason::LocalReset && !is_codec_error) { |
807 | 0 | upstream_request_.parent_.callbacks()->resetStream(); |
808 | 0 | return; |
809 | 0 | } |
810 | 121 | return upstream_request_.onResetStream(reset_reason, transport_failure_reason); |
811 | 121 | } |
812 | | |
813 | 0 | Upstream::ClusterInfoConstSharedPtr UpstreamRequestFilterManagerCallbacks::clusterInfo() { |
814 | 0 | return upstream_request_.parent_.callbacks()->clusterInfo(); |
815 | 0 | } |
816 | | |
817 | | Http::Http1StreamEncoderOptionsOptRef |
818 | 0 | UpstreamRequestFilterManagerCallbacks::http1StreamEncoderOptions() { |
819 | 0 | return upstream_request_.parent_.callbacks()->http1StreamEncoderOptions(); |
820 | 0 | } |
821 | | |
822 | | } // namespace Router |
823 | | } // namespace Envoy |