Line data Source code
1 : #include "source/common/router/upstream_request.h"
2 :
3 : #include <chrono>
4 : #include <cstdint>
5 : #include <functional>
6 : #include <memory>
7 : #include <string>
8 :
9 : #include "envoy/event/dispatcher.h"
10 : #include "envoy/event/timer.h"
11 : #include "envoy/grpc/status.h"
12 : #include "envoy/http/conn_pool.h"
13 : #include "envoy/http/header_map.h"
14 : #include "envoy/runtime/runtime.h"
15 : #include "envoy/upstream/cluster_manager.h"
16 : #include "envoy/upstream/upstream.h"
17 :
18 : #include "source/common/common/assert.h"
19 : #include "source/common/common/dump_state_utils.h"
20 : #include "source/common/common/empty_string.h"
21 : #include "source/common/common/enum_to_int.h"
22 : #include "source/common/common/scope_tracker.h"
23 : #include "source/common/common/utility.h"
24 : #include "source/common/grpc/common.h"
25 : #include "source/common/http/codes.h"
26 : #include "source/common/http/header_map_impl.h"
27 : #include "source/common/http/headers.h"
28 : #include "source/common/http/message_impl.h"
29 : #include "source/common/http/utility.h"
30 : #include "source/common/network/application_protocol.h"
31 : #include "source/common/network/transport_socket_options_impl.h"
32 : #include "source/common/network/upstream_server_name.h"
33 : #include "source/common/network/upstream_subject_alt_names.h"
34 : #include "source/common/router/config_impl.h"
35 : #include "source/common/router/debug_config.h"
36 : #include "source/common/router/router.h"
37 : #include "source/common/stream_info/uint32_accessor_impl.h"
38 : #include "source/common/tracing/http_tracer_impl.h"
39 : #include "source/extensions/common/proxy_protocol/proxy_protocol_header.h"
40 :
41 : namespace Envoy {
42 : namespace Router {
43 :
44 : // The upstream HTTP filter manager class.
45 : class UpstreamFilterManager : public Http::FilterManager {
46 : public:
47 : UpstreamFilterManager(Http::FilterManagerCallbacks& filter_manager_callbacks,
48 : Event::Dispatcher& dispatcher, OptRef<const Network::Connection> connection,
49 : uint64_t stream_id, Buffer::BufferMemoryAccountSharedPtr account,
50 : bool proxy_100_continue, uint32_t buffer_limit,
51 : const Http::FilterChainFactory& filter_chain_factory,
52 : UpstreamRequest& request)
53 : : FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
54 : proxy_100_continue, buffer_limit, filter_chain_factory),
55 251 : upstream_request_(request) {}
56 :
57 392 : StreamInfo::StreamInfo& streamInfo() override {
58 392 : return upstream_request_.parent_.callbacks()->streamInfo();
59 392 : }
60 0 : const StreamInfo::StreamInfo& streamInfo() const override {
61 0 : return upstream_request_.parent_.callbacks()->streamInfo();
62 0 : }
63 : // Send local replies via the downstream HTTP filter manager.
64 : // Local replies will not be seen by upstream HTTP filters.
65 : void sendLocalReply(Http::Code code, absl::string_view body,
66 : const std::function<void(Http::ResponseHeaderMap& headers)>& modify_headers,
67 : const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
68 0 : absl::string_view details) override {
69 0 : state().decoder_filter_chain_aborted_ = true;
70 0 : state().encoder_filter_chain_aborted_ = true;
71 0 : state().remote_encode_complete_ = true;
72 0 : state().local_complete_ = true;
73 : // TODO(alyssawilk) this should be done through the router to play well with hedging.
74 0 : upstream_request_.parent_.callbacks()->sendLocalReply(code, body, modify_headers, grpc_status,
75 0 : details);
76 0 : }
77 0 : void executeLocalReplyIfPrepared() override {}
78 : UpstreamRequest& upstream_request_;
79 : };
80 :
81 : UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
82 : std::unique_ptr<GenericConnPool>&& conn_pool,
83 : bool can_send_early_data, bool can_use_http3)
84 : : parent_(parent), conn_pool_(std::move(conn_pool)),
85 : stream_info_(parent_.callbacks()->dispatcher().timeSource(), nullptr),
86 : start_time_(parent_.callbacks()->dispatcher().timeSource().monotonicTime()),
87 : calling_encode_headers_(false), upstream_canary_(false), router_sent_end_stream_(false),
88 : encode_trailers_(false), retried_(false), awaiting_headers_(true),
89 : outlier_detection_timeout_recorded_(false),
90 : create_per_try_timeout_on_request_complete_(false), paused_for_connect_(false),
91 : reset_stream_(false),
92 : record_timeout_budget_(parent_.cluster()->timeoutBudgetStats().has_value()),
93 : cleaned_up_(false), had_upstream_(false),
94 : stream_options_({can_send_early_data, can_use_http3}), grpc_rq_success_deferred_(false),
95 : upstream_wait_for_response_headers_before_disabling_read_(Runtime::runtimeFeatureEnabled(
96 251 : "envoy.reloadable_features.upstream_wait_for_response_headers_before_disabling_read")) {
97 251 : if (auto tracing_config = parent_.callbacks()->tracingConfig(); tracing_config.has_value()) {
98 68 : if (tracing_config->spawnUpstreamSpan() || parent_.config().start_child_span_) {
99 0 : span_ = parent_.callbacks()->activeSpan().spawnChild(
100 0 : tracing_config.value().get(),
101 0 : absl::StrCat("router ", parent.cluster()->observabilityName(), " egress"),
102 0 : parent_.callbacks()->dispatcher().timeSource().systemTime());
103 0 : if (parent.attemptCount() != 1) {
104 : // This is a retry request, add this metadata to span.
105 0 : span_->setTag(Tracing::Tags::get().RetryCount, std::to_string(parent.attemptCount() - 1));
106 0 : }
107 0 : }
108 68 : }
109 :
110 : // The router checks that the connection pool is non-null before creating the upstream request.
111 251 : auto upstream_host = conn_pool_->host();
112 251 : Tracing::HttpTraceContext trace_context(*parent_.downstreamHeaders());
113 251 : if (span_ != nullptr) {
114 0 : span_->injectContext(trace_context, upstream_host);
115 251 : } else {
116 : // No independent child span for current upstream request then inject the parent span's tracing
117 : // context into the request headers.
118 : // The injectContext() of the parent span may be called repeatedly when the request is retried.
119 251 : parent_.callbacks()->activeSpan().injectContext(trace_context, upstream_host);
120 251 : }
121 :
122 251 : stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
123 251 : stream_info_.route_ = parent_.callbacks()->route();
124 251 : parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo());
125 :
126 251 : stream_info_.healthCheck(parent_.callbacks()->streamInfo().healthCheck());
127 251 : stream_info_.setIsShadow(parent_.callbacks()->streamInfo().isShadow());
128 251 : absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info =
129 251 : parent_.callbacks()->streamInfo().upstreamClusterInfo();
130 251 : if (cluster_info.has_value()) {
131 251 : stream_info_.setUpstreamClusterInfo(*cluster_info);
132 251 : }
133 :
134 : // Set up the upstream HTTP filter manager.
135 251 : filter_manager_callbacks_ = std::make_unique<UpstreamRequestFilterManagerCallbacks>(*this);
136 251 : filter_manager_ = std::make_unique<UpstreamFilterManager>(
137 251 : *filter_manager_callbacks_, parent_.callbacks()->dispatcher(), connection(),
138 251 : parent_.callbacks()->streamId(), parent_.callbacks()->account(), true,
139 251 : parent_.callbacks()->decoderBufferLimit(), *parent_.cluster(), *this);
140 : // Attempt to create custom cluster-specified filter chain
141 251 : bool created = parent_.cluster()->createFilterChain(*filter_manager_,
142 251 : /*only_create_if_configured=*/true);
143 251 : if (!created) {
144 : // Attempt to create custom router-specified filter chain.
145 251 : created = parent_.config().createFilterChain(*filter_manager_);
146 251 : }
147 251 : if (!created) {
148 : // Neither cluster nor router have a custom filter chain; add the default
149 : // cluster filter chain, which only consists of the codec filter.
150 251 : created = parent_.cluster()->createFilterChain(*filter_manager_, false);
151 251 : }
152 : // There will always be a codec filter present, which sets the upstream
153 : // interface. Fast-fail any tests that don't set up mocks correctly.
154 251 : ASSERT(created && upstream_interface_.has_value());
155 251 : }
156 :
157 251 : UpstreamRequest::~UpstreamRequest() { cleanUp(); }
158 :
159 502 : void UpstreamRequest::cleanUp() {
160 502 : if (cleaned_up_) {
161 251 : return;
162 251 : }
163 251 : cleaned_up_ = true;
164 :
165 251 : filter_manager_->destroyFilters();
166 :
167 251 : if (span_ != nullptr) {
168 0 : auto tracing_config = parent_.callbacks()->tracingConfig();
169 0 : ASSERT(tracing_config.has_value());
170 0 : Tracing::HttpTracerUtility::finalizeUpstreamSpan(*span_, stream_info_,
171 0 : tracing_config.value().get());
172 0 : }
173 :
174 251 : if (per_try_timeout_ != nullptr) {
175 : // Allows for testing.
176 0 : per_try_timeout_->disableTimer();
177 0 : }
178 :
179 251 : if (per_try_idle_timeout_ != nullptr) {
180 : // Allows for testing.
181 0 : per_try_idle_timeout_->disableTimer();
182 0 : }
183 :
184 251 : if (max_stream_duration_timer_ != nullptr) {
185 0 : max_stream_duration_timer_->disableTimer();
186 0 : }
187 :
188 251 : if (upstream_log_flush_timer_ != nullptr) {
189 0 : upstream_log_flush_timer_->disableTimer();
190 0 : }
191 :
192 251 : clearRequestEncoder();
193 :
194 : // If desired, fire the per-try histogram when the UpstreamRequest
195 : // completes.
196 251 : if (record_timeout_budget_) {
197 0 : Event::Dispatcher& dispatcher = parent_.callbacks()->dispatcher();
198 0 : const MonotonicTime end_time = dispatcher.timeSource().monotonicTime();
199 0 : const std::chrono::milliseconds response_time =
200 0 : std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time_);
201 0 : Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = parent_.cluster()->timeoutBudgetStats();
202 0 : tb_stats->get().upstream_rq_timeout_budget_per_try_percent_used_.recordValue(
203 0 : FilterUtility::percentageOfTimeout(response_time, parent_.timeout().per_try_timeout_));
204 0 : }
205 :
206 : // Ditto for request/response size histograms.
207 251 : Upstream::ClusterRequestResponseSizeStatsOptRef req_resp_stats_opt =
208 251 : parent_.cluster()->requestResponseSizeStats();
209 251 : if (req_resp_stats_opt.has_value() && parent_.downstreamHeaders()) {
210 0 : auto& req_resp_stats = req_resp_stats_opt->get();
211 0 : req_resp_stats.upstream_rq_headers_size_.recordValue(parent_.downstreamHeaders()->byteSize());
212 0 : req_resp_stats.upstream_rq_body_size_.recordValue(stream_info_.bytesSent());
213 :
214 0 : if (response_headers_size_.has_value()) {
215 0 : req_resp_stats.upstream_rs_headers_size_.recordValue(response_headers_size_.value());
216 0 : req_resp_stats.upstream_rs_body_size_.recordValue(stream_info_.bytesReceived());
217 0 : }
218 0 : }
219 :
220 251 : stream_info_.onRequestComplete();
221 251 : upstreamLog(AccessLog::AccessLogType::UpstreamEnd);
222 :
223 251 : while (downstream_data_disabled_ != 0) {
224 0 : parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
225 0 : parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc();
226 0 : --downstream_data_disabled_;
227 0 : }
228 : // The upstream HTTP filter chain callbacks own headers/trailers while they are traversing the
229 : // filter chain. Make sure to not delete them immediately when the stream ends, as the stream
230 : // often ends during filter chain processing and it causes use-after-free violations.
231 251 : parent_.callbacks()->dispatcher().deferredDelete(std::move(filter_manager_callbacks_));
232 251 : }
233 :
234 251 : void UpstreamRequest::upstreamLog(AccessLog::AccessLogType access_log_type) {
235 251 : const Formatter::HttpFormatterContext log_context{parent_.downstreamHeaders(),
236 251 : upstream_headers_.get(),
237 251 : upstream_trailers_.get(),
238 251 : {},
239 251 : access_log_type};
240 :
241 251 : for (const auto& upstream_log : parent_.config().upstream_logs_) {
242 0 : upstream_log->log(log_context, stream_info_);
243 0 : }
244 251 : }
245 :
246 : // This is called by the FilterManager when all filters have processed 1xx headers. Forward them
247 : // on to the router.
248 0 : void UpstreamRequest::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) {
249 0 : ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
250 :
251 0 : ASSERT(Http::HeaderUtility::isSpecial1xx(*headers));
252 0 : addResponseHeadersSize(headers->byteSize());
253 0 : maybeHandleDeferredReadDisable();
254 0 : parent_.onUpstream1xxHeaders(std::move(headers), *this);
255 0 : }
256 :
257 : // This is called by the FilterManager when all filters have processed headers. Forward them
258 : // on to the router.
259 141 : void UpstreamRequest::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
260 141 : ASSERT(headers.get());
261 141 : ENVOY_STREAM_LOG(trace, "upstream response headers:\n{}", *parent_.callbacks(), *headers);
262 141 : ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
263 :
264 141 : resetPerTryIdleTimer();
265 :
266 141 : addResponseHeadersSize(headers->byteSize());
267 :
268 : // We drop unsupported 1xx on the floor here. 101 upgrade headers need to be passed to the client
269 : // as part of the final response. Most 1xx headers are handled in onUpstream1xxHeaders.
270 : //
271 : // We could in principle handle other headers here, but this might result in the double invocation
272 : // of decodeHeaders() (once for informational, again for non-informational), which is likely an
273 : // easy to miss corner case in the filter and HCM contract.
274 : //
275 : // This filtering is done early in upstream request, unlike 100 coalescing which is performed in
276 : // the router filter, since the filtering only depends on the state of a single upstream, and we
277 : // don't want to confuse accounting such as onFirstUpstreamRxByteReceived() with informational
278 : // headers.
279 141 : const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
280 141 : if (Http::CodeUtility::is1xx(response_code) &&
281 141 : response_code != enumToInt(Http::Code::SwitchingProtocols)) {
282 0 : return;
283 0 : }
284 :
285 141 : awaiting_headers_ = false;
286 141 : if (span_ != nullptr) {
287 0 : Tracing::HttpTracerUtility::onUpstreamResponseHeaders(*span_, headers.get());
288 0 : }
289 141 : if (!parent_.config().upstream_logs_.empty()) {
290 0 : upstream_headers_ = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(*headers);
291 0 : }
292 141 : stream_info_.setResponseCode(static_cast<uint32_t>(response_code));
293 :
294 141 : maybeHandleDeferredReadDisable();
295 141 : ASSERT(headers.get());
296 :
297 141 : parent_.onUpstreamHeaders(response_code, std::move(headers), *this, end_stream);
298 141 : }
299 :
300 141 : void UpstreamRequest::maybeHandleDeferredReadDisable() {
301 141 : for (; deferred_read_disabling_count_ > 0; --deferred_read_disabling_count_) {
302 : // If the deferred read disabling count hasn't been cancelled out by read
303 : // enabling count so far, stop the upstream from reading the rest response.
304 : // Because readDisable keeps track of how many time it is called with
305 : // "true" or "false", here it has to be called with "true" the same number
306 : // of times as it would be called with "false" in the future.
307 0 : parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
308 0 : upstream_->readDisable(true);
309 0 : }
310 141 : }
311 :
312 395 : void UpstreamRequest::decodeData(Buffer::Instance& data, bool end_stream) {
313 395 : ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
314 :
315 395 : resetPerTryIdleTimer();
316 395 : stream_info_.addBytesReceived(data.length());
317 395 : parent_.onUpstreamData(data, *this, end_stream);
318 395 : }
319 :
320 6 : void UpstreamRequest::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) {
321 6 : ENVOY_STREAM_LOG(trace, "upstream response trailers:\n{}", *parent_.callbacks(), *trailers);
322 6 : ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
323 :
324 6 : if (span_ != nullptr) {
325 0 : Tracing::HttpTracerUtility::onUpstreamResponseTrailers(*span_, trailers.get());
326 0 : }
327 6 : if (!parent_.config().upstream_logs_.empty()) {
328 0 : upstream_trailers_ = Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*trailers);
329 0 : }
330 6 : parent_.onUpstreamTrailers(std::move(trailers), *this);
331 6 : }
332 :
333 0 : void UpstreamRequest::dumpState(std::ostream& os, int indent_level) const {
334 0 : const char* spaces = spacesForLevel(indent_level);
335 0 : os << spaces << "UpstreamRequest " << this << "\n";
336 0 : if (connection()) {
337 0 : const auto addressProvider = connection()->connectionInfoProviderSharedPtr();
338 0 : DUMP_DETAILS(addressProvider);
339 0 : }
340 0 : const Http::RequestHeaderMap* request_headers = parent_.downstreamHeaders();
341 0 : DUMP_DETAILS(request_headers);
342 0 : if (filter_manager_) {
343 0 : filter_manager_->dumpState(os, indent_level);
344 0 : }
345 0 : }
346 :
347 202 : const Route& UpstreamRequest::route() const { return *parent_.callbacks()->route(); }
348 :
349 251 : OptRef<const Network::Connection> UpstreamRequest::connection() const {
350 251 : return parent_.callbacks()->connection();
351 251 : }
352 :
353 2 : void UpstreamRequest::decodeMetadata(Http::MetadataMapPtr&& metadata_map) {
354 2 : parent_.onUpstreamMetadata(std::move(metadata_map));
355 2 : }
356 :
357 0 : void UpstreamRequest::maybeEndDecode(bool end_stream) {
358 0 : if (end_stream) {
359 0 : upstreamTiming().onLastUpstreamRxByteReceived(parent_.callbacks()->dispatcher().timeSource());
360 0 : }
361 0 : }
362 :
363 : void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
364 208 : bool pool_success) {
365 208 : StreamInfo::UpstreamInfo& upstream_info = *streamInfo().upstreamInfo();
366 208 : upstream_info.setUpstreamHost(host);
367 208 : upstream_host_ = host;
368 208 : parent_.onUpstreamHostSelected(host, pool_success);
369 208 : }
370 :
371 251 : void UpstreamRequest::acceptHeadersFromRouter(bool end_stream) {
372 251 : ASSERT(!router_sent_end_stream_);
373 251 : router_sent_end_stream_ = end_stream;
374 :
375 : // Make sure that when we are forwarding CONNECT payload we do not do so until
376 : // the upstream has accepted the CONNECT request.
377 : // This must be done before conn_pool->newStream, as onPoolReady un-pauses for CONNECT
378 : // termination.
379 251 : auto* headers = parent_.downstreamHeaders();
380 251 : if (headers->getMethodValue() == Http::Headers::get().MethodValues.Connect) {
381 0 : paused_for_connect_ = true;
382 0 : }
383 :
384 : // Kick off creation of the upstream connection immediately upon receiving headers.
385 : // In future it may be possible for upstream HTTP filters to delay this, or influence connection
386 : // creation but for now optimize for minimal latency and fetch the connection
387 : // as soon as possible.
388 251 : conn_pool_->newStream(this);
389 :
390 251 : if (parent_.config().upstream_log_flush_interval_.has_value()) {
391 0 : upstream_log_flush_timer_ = parent_.callbacks()->dispatcher().createTimer([this]() -> void {
392 : // If the request is complete, we've already done the stream-end upstream log, and shouldn't
393 : // do the periodic log.
394 0 : if (!streamInfo().requestComplete().has_value()) {
395 0 : upstreamLog(AccessLog::AccessLogType::UpstreamPeriodic);
396 0 : resetUpstreamLogFlushTimer();
397 0 : }
398 : // Both downstream and upstream bytes meters may not be initialized when
399 : // the timer goes off, e.g. if it takes longer than the interval for a
400 : // connection to be initialized; check for nullptr.
401 0 : auto& downstream_bytes_meter = stream_info_.getDownstreamBytesMeter();
402 0 : auto& upstream_bytes_meter = stream_info_.getUpstreamBytesMeter();
403 0 : const SystemTime now = parent_.callbacks()->dispatcher().timeSource().systemTime();
404 0 : if (downstream_bytes_meter) {
405 0 : downstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now);
406 0 : }
407 0 : if (upstream_bytes_meter) {
408 0 : upstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now);
409 0 : }
410 0 : });
411 :
412 0 : resetUpstreamLogFlushTimer();
413 0 : }
414 :
415 251 : filter_manager_->requestHeadersInitialized();
416 251 : filter_manager_->streamInfo().setRequestHeaders(*parent_.downstreamHeaders());
417 251 : filter_manager_->decodeHeaders(*parent_.downstreamHeaders(), end_stream);
418 251 : }
419 :
420 596 : void UpstreamRequest::acceptDataFromRouter(Buffer::Instance& data, bool end_stream) {
421 596 : ASSERT(!router_sent_end_stream_);
422 596 : router_sent_end_stream_ = end_stream;
423 :
424 596 : filter_manager_->decodeData(data, end_stream);
425 596 : }
426 :
427 0 : void UpstreamRequest::acceptTrailersFromRouter(Http::RequestTrailerMap& trailers) {
428 0 : ASSERT(!router_sent_end_stream_);
429 0 : router_sent_end_stream_ = true;
430 0 : encode_trailers_ = true;
431 :
432 0 : filter_manager_->decodeTrailers(trailers);
433 0 : }
434 :
435 0 : void UpstreamRequest::acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr) {
436 0 : filter_manager_->decodeMetadata(*metadata_map_ptr);
437 0 : }
438 :
439 : void UpstreamRequest::onResetStream(Http::StreamResetReason reason,
440 86 : absl::string_view transport_failure_reason) {
441 86 : ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
442 :
443 86 : if (span_ != nullptr) {
444 : // Add tags about reset.
445 0 : span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
446 0 : span_->setTag(Tracing::Tags::get().ErrorReason, Http::Utility::resetReasonToString(reason));
447 0 : }
448 86 : clearRequestEncoder();
449 86 : awaiting_headers_ = false;
450 86 : if (!calling_encode_headers_) {
451 86 : stream_info_.setResponseFlag(Filter::streamResetReasonToResponseFlag(reason));
452 86 : parent_.onUpstreamReset(reason, transport_failure_reason, *this);
453 86 : } else {
454 0 : deferred_reset_reason_ = reason;
455 0 : }
456 86 : }
457 :
458 92 : void UpstreamRequest::resetStream() {
459 92 : if (conn_pool_->cancelAnyPendingStream()) {
460 43 : ENVOY_STREAM_LOG(debug, "canceled pool request", *parent_.callbacks());
461 43 : ASSERT(!upstream_);
462 43 : }
463 :
464 : // Don't reset the stream if we're already done with it.
465 92 : if (upstreamTiming().last_upstream_tx_byte_sent_.has_value() &&
466 92 : upstreamTiming().last_upstream_rx_byte_received_.has_value()) {
467 0 : return;
468 0 : }
469 :
470 92 : if (span_ != nullptr) {
471 : // Add tags about the cancellation.
472 0 : span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True);
473 0 : }
474 :
475 92 : if (upstream_) {
476 49 : ENVOY_STREAM_LOG(debug, "resetting pool request", *parent_.callbacks());
477 49 : upstream_->resetStream();
478 49 : clearRequestEncoder();
479 49 : }
480 92 : reset_stream_ = true;
481 92 : }
482 :
483 536 : void UpstreamRequest::resetPerTryIdleTimer() {
484 536 : if (per_try_idle_timeout_ != nullptr) {
485 0 : per_try_idle_timeout_->enableTimer(parent_.timeout().per_try_idle_timeout_);
486 0 : }
487 536 : }
488 :
489 0 : void UpstreamRequest::resetUpstreamLogFlushTimer() {
490 0 : if (upstream_log_flush_timer_ != nullptr) {
491 0 : upstream_log_flush_timer_->enableTimer(parent_.config().upstream_log_flush_interval_.value());
492 0 : }
493 0 : }
494 :
495 147 : void UpstreamRequest::setupPerTryTimeout() {
496 147 : ASSERT(!per_try_timeout_);
497 147 : if (parent_.timeout().per_try_timeout_.count() > 0) {
498 0 : per_try_timeout_ =
499 0 : parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryTimeout(); });
500 0 : per_try_timeout_->enableTimer(parent_.timeout().per_try_timeout_);
501 0 : }
502 :
503 147 : ASSERT(!per_try_idle_timeout_);
504 147 : if (parent_.timeout().per_try_idle_timeout_.count() > 0) {
505 0 : per_try_idle_timeout_ =
506 0 : parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryIdleTimeout(); });
507 0 : resetPerTryIdleTimer();
508 0 : }
509 147 : }
510 :
511 0 : void UpstreamRequest::onPerTryIdleTimeout() {
512 0 : ENVOY_STREAM_LOG(debug, "upstream per try idle timeout", *parent_.callbacks());
513 0 : stream_info_.setResponseFlag(StreamInfo::ResponseFlag::StreamIdleTimeout);
514 0 : parent_.onPerTryIdleTimeout(*this);
515 0 : }
516 :
517 0 : void UpstreamRequest::onPerTryTimeout() {
518 : // If we've sent anything downstream, ignore the per try timeout and let the response continue
519 : // up to the global timeout
520 0 : if (!parent_.downstreamResponseStarted()) {
521 0 : ENVOY_STREAM_LOG(debug, "upstream per try timeout", *parent_.callbacks());
522 :
523 0 : stream_info_.setResponseFlag(StreamInfo::ResponseFlag::UpstreamRequestTimeout);
524 0 : parent_.onPerTryTimeout(*this);
525 0 : } else {
526 0 : ENVOY_STREAM_LOG(debug,
527 0 : "ignored upstream per try timeout due to already started downstream response",
528 0 : *parent_.callbacks());
529 0 : }
530 0 : }
531 :
532 208 : void UpstreamRequest::recordConnectionPoolCallbackLatency() {
533 208 : upstreamTiming().recordConnectionPoolCallbackLatency(
534 208 : start_time_, parent_.callbacks()->dispatcher().timeSource());
535 208 : }
536 :
537 : void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason,
538 : absl::string_view transport_failure_reason,
539 6 : Upstream::HostDescriptionConstSharedPtr host) {
540 6 : recordConnectionPoolCallbackLatency();
541 6 : Http::StreamResetReason reset_reason = [](ConnectionPool::PoolFailureReason reason) {
542 6 : switch (reason) {
543 0 : case ConnectionPool::PoolFailureReason::Overflow:
544 0 : return Http::StreamResetReason::Overflow;
545 6 : case ConnectionPool::PoolFailureReason::RemoteConnectionFailure:
546 6 : return Http::StreamResetReason::RemoteConnectionFailure;
547 0 : case ConnectionPool::PoolFailureReason::LocalConnectionFailure:
548 0 : return Http::StreamResetReason::LocalConnectionFailure;
549 0 : case ConnectionPool::PoolFailureReason::Timeout:
550 0 : return Http::StreamResetReason::ConnectionTimeout;
551 6 : }
552 0 : PANIC_DUE_TO_CORRUPT_ENUM;
553 0 : }(reason);
554 :
555 6 : stream_info_.upstreamInfo()->setUpstreamTransportFailureReason(transport_failure_reason);
556 :
557 : // Mimic an upstream reset.
558 6 : onUpstreamHostSelected(host, false);
559 6 : onResetStream(reset_reason, transport_failure_reason);
560 6 : }
561 :
562 : void UpstreamRequest::onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
563 : Upstream::HostDescriptionConstSharedPtr host,
564 : const Network::ConnectionInfoProvider& address_provider,
565 : StreamInfo::StreamInfo& info,
566 202 : absl::optional<Http::Protocol> protocol) {
567 : // This may be called under an existing ScopeTrackerScopeState but it will unwind correctly.
568 202 : ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
569 202 : ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks());
570 202 : recordConnectionPoolCallbackLatency();
571 202 : upstream_ = std::move(upstream);
572 202 : had_upstream_ = true;
573 : // Have the upstream use the account of the downstream.
574 202 : upstream_->setAccount(parent_.callbacks()->account());
575 :
576 202 : host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess);
577 :
578 202 : onUpstreamHostSelected(host, true);
579 :
580 202 : if (protocol) {
581 202 : stream_info_.protocol(protocol.value());
582 202 : } else {
583 : // We only pause for CONNECT for HTTP upstreams. If this is a TCP upstream, unpause.
584 0 : paused_for_connect_ = false;
585 0 : }
586 :
587 202 : StreamInfo::UpstreamInfo& upstream_info = *stream_info_.upstreamInfo();
588 202 : if (info.upstreamInfo()) {
589 202 : auto& upstream_timing = info.upstreamInfo()->upstreamTiming();
590 202 : upstreamTiming().upstream_connect_start_ = upstream_timing.upstream_connect_start_;
591 202 : upstreamTiming().upstream_connect_complete_ = upstream_timing.upstream_connect_complete_;
592 202 : upstreamTiming().upstream_handshake_complete_ = upstream_timing.upstream_handshake_complete_;
593 202 : upstream_info.setUpstreamNumStreams(info.upstreamInfo()->upstreamNumStreams());
594 202 : }
595 :
596 : // Upstream HTTP filters might have already created/set a filter state.
597 202 : const StreamInfo::FilterStateSharedPtr& filter_state = info.filterState();
598 202 : if (!filter_state) {
599 0 : upstream_info.setUpstreamFilterState(
600 0 : std::make_shared<StreamInfo::FilterStateImpl>(StreamInfo::FilterState::LifeSpan::Request));
601 202 : } else {
602 202 : upstream_info.setUpstreamFilterState(filter_state);
603 202 : }
604 202 : upstream_info.setUpstreamLocalAddress(address_provider.localAddress());
605 202 : upstream_info.setUpstreamRemoteAddress(address_provider.remoteAddress());
606 202 : upstream_info.setUpstreamSslConnection(info.downstreamAddressProvider().sslConnection());
607 :
608 202 : if (info.downstreamAddressProvider().connectionID().has_value()) {
609 202 : upstream_info.setUpstreamConnectionId(info.downstreamAddressProvider().connectionID().value());
610 202 : }
611 :
612 202 : if (info.downstreamAddressProvider().interfaceName().has_value()) {
613 0 : upstream_info.setUpstreamInterfaceName(
614 0 : info.downstreamAddressProvider().interfaceName().value());
615 0 : }
616 :
617 202 : stream_info_.setUpstreamBytesMeter(upstream_->bytesMeter());
618 202 : StreamInfo::StreamInfo::syncUpstreamAndDownstreamBytesMeter(parent_.callbacks()->streamInfo(),
619 202 : stream_info_);
620 202 : if (protocol) {
621 202 : upstream_info.setUpstreamProtocol(protocol.value());
622 202 : }
623 :
624 202 : if (parent_.downstreamEndStream()) {
625 89 : setupPerTryTimeout();
626 185 : } else {
627 113 : create_per_try_timeout_on_request_complete_ = true;
628 113 : }
629 :
630 : // Make sure the connection manager will inform the downstream watermark manager when the
631 : // downstream buffers are overrun. This may result in immediate watermark callbacks referencing
632 : // the encoder.
633 202 : parent_.callbacks()->addDownstreamWatermarkCallbacks(downstream_watermark_manager_);
634 :
635 202 : absl::optional<std::chrono::milliseconds> max_stream_duration;
636 202 : if (parent_.dynamicMaxStreamDuration().has_value()) {
637 0 : max_stream_duration = parent_.dynamicMaxStreamDuration().value();
638 202 : } else if (upstream_host_->cluster().commonHttpProtocolOptions().has_max_stream_duration()) {
639 0 : max_stream_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
640 0 : upstream_host_->cluster().commonHttpProtocolOptions().max_stream_duration()));
641 0 : }
642 202 : if (max_stream_duration.has_value() && max_stream_duration->count()) {
643 0 : max_stream_duration_timer_ = parent_.callbacks()->dispatcher().createTimer(
644 0 : [this]() -> void { onStreamMaxDurationReached(); });
645 0 : max_stream_duration_timer_->enableTimer(*max_stream_duration);
646 0 : }
647 :
648 202 : const auto* route_entry = route().routeEntry();
649 202 : if (route_entry->autoHostRewrite() && !host->hostname().empty()) {
650 0 : Http::Utility::updateAuthority(*parent_.downstreamHeaders(), host->hostname(),
651 0 : route_entry->appendXfh());
652 0 : }
653 :
654 202 : stream_info_.setRequestHeaders(*parent_.downstreamHeaders());
655 :
656 202 : if (parent_.config().flush_upstream_log_on_upstream_stream_) {
657 0 : upstreamLog(AccessLog::AccessLogType::UpstreamPoolReady);
658 0 : }
659 :
660 202 : if (address_provider.connectionID() && stream_info_.downstreamAddressProvider().connectionID()) {
661 0 : ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
662 0 : address_provider.connectionID().value(),
663 0 : stream_info_.downstreamAddressProvider().connectionID().value());
664 0 : }
665 :
666 202 : for (auto* callback : upstream_callbacks_) {
667 202 : callback->onUpstreamConnectionEstablished();
668 202 : }
669 202 : }
670 :
671 704 : UpstreamToDownstream& UpstreamRequest::upstreamToDownstream() { return *upstream_interface_; }
672 :
673 0 : void UpstreamRequest::onStreamMaxDurationReached() {
674 0 : upstream_host_->cluster().trafficStats()->upstream_rq_max_duration_reached_.inc();
675 :
676 : // The upstream had closed then try to retry along with retry policy.
677 0 : parent_.onStreamMaxDurationReached(*this);
678 0 : }
679 :
680 386 : void UpstreamRequest::clearRequestEncoder() {
681 : // Before clearing the encoder, unsubscribe from callbacks.
682 386 : if (upstream_) {
683 202 : parent_.callbacks()->removeDownstreamWatermarkCallbacks(downstream_watermark_manager_);
684 202 : }
685 386 : upstream_.reset();
686 386 : }
687 :
688 32 : void UpstreamRequest::readDisableOrDefer(bool disable) {
689 32 : if (!upstream_wait_for_response_headers_before_disabling_read_) {
690 0 : if (disable) {
691 0 : parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
692 0 : upstream_->readDisable(true);
693 0 : } else {
694 0 : parent_.cluster()->trafficStats()->upstream_flow_control_resumed_reading_total_.inc();
695 0 : upstream_->readDisable(false);
696 0 : }
697 0 : return;
698 0 : }
699 :
700 32 : if (disable) {
701 : // See comments on deferred_read_disabling_count_ for when we do and don't defer.
702 16 : if (parent_.downstreamResponseStarted()) {
703 : // The downstream connection is overrun. Pause reads from upstream.
704 : // If there are multiple calls to readDisable either the codec (H2) or the
705 : // underlying Network::Connection (H1) will handle reference counting.
706 16 : parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
707 16 : upstream_->readDisable(disable);
708 16 : } else {
709 0 : ++deferred_read_disabling_count_;
710 0 : }
711 16 : return;
712 16 : }
713 :
714 : // One source of connection blockage has buffer available.
715 16 : if (deferred_read_disabling_count_ > 0) {
716 0 : ASSERT(!parent_.downstreamResponseStarted());
717 : // Cancel out an existing deferred read disabling.
718 0 : --deferred_read_disabling_count_;
719 0 : return;
720 0 : }
721 16 : ASSERT(parent_.downstreamResponseStarted());
722 : // Pass this on to the stream, which
723 : // will resume reads if this was the last remaining high watermark.
724 16 : parent_.cluster()->trafficStats()->upstream_flow_control_resumed_reading_total_.inc();
725 16 : upstream_->readDisable(disable);
726 16 : }
727 :
728 16 : void UpstreamRequest::DownstreamWatermarkManager::onAboveWriteBufferHighWatermark() {
729 16 : ASSERT(parent_.upstream_);
730 16 : parent_.readDisableOrDefer(true);
731 16 : }
732 :
733 16 : void UpstreamRequest::DownstreamWatermarkManager::onBelowWriteBufferLowWatermark() {
734 16 : ASSERT(parent_.upstream_);
735 16 : parent_.readDisableOrDefer(false);
736 16 : }
737 :
738 0 : void UpstreamRequest::disableDataFromDownstreamForFlowControl() {
739 0 : parent_.cluster()->trafficStats()->upstream_flow_control_backed_up_total_.inc();
740 0 : parent_.callbacks()->onDecoderFilterAboveWriteBufferHighWatermark();
741 0 : ++downstream_data_disabled_;
742 0 : }
743 :
744 0 : void UpstreamRequest::enableDataFromDownstreamForFlowControl() {
745 0 : parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc();
746 0 : parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
747 0 : ASSERT(downstream_data_disabled_ != 0);
748 0 : if (downstream_data_disabled_ > 0) {
749 0 : --downstream_data_disabled_;
750 0 : }
751 0 : }
752 :
753 626 : Http::RequestHeaderMapOptRef UpstreamRequestFilterManagerCallbacks::requestHeaders() {
754 626 : return {*upstream_request_.parent_.downstreamHeaders()};
755 626 : }
756 :
757 1436 : Http::RequestTrailerMapOptRef UpstreamRequestFilterManagerCallbacks::requestTrailers() {
758 1436 : if (upstream_request_.parent_.downstreamTrailers()) {
759 0 : return {*upstream_request_.parent_.downstreamTrailers()};
760 0 : }
761 1436 : if (trailers_) {
762 0 : return {*trailers_};
763 0 : }
764 1436 : return {};
765 1436 : }
766 :
767 0 : const ScopeTrackedObject& UpstreamRequestFilterManagerCallbacks::scope() {
768 0 : return upstream_request_.parent_.callbacks()->scope();
769 0 : }
770 :
771 0 : OptRef<const Tracing::Config> UpstreamRequestFilterManagerCallbacks::tracingConfig() const {
772 0 : return upstream_request_.parent_.callbacks()->tracingConfig();
773 0 : }
774 :
775 0 : Tracing::Span& UpstreamRequestFilterManagerCallbacks::activeSpan() {
776 0 : return upstream_request_.parent_.callbacks()->activeSpan();
777 0 : }
778 :
779 : void UpstreamRequestFilterManagerCallbacks::resetStream(
780 80 : Http::StreamResetReason reset_reason, absl::string_view transport_failure_reason) {
781 : // The filter manager needs to disambiguate between a filter-driven reset,
782 : // which should force reset the stream, and a codec driven reset, which should
783 : // tell the router the stream reset, and let the router make the decision to
784 : // send a local reply, or retry the stream.
785 80 : if (reset_reason == Http::StreamResetReason::LocalReset &&
786 80 : transport_failure_reason != "codec_error") {
787 0 : upstream_request_.parent_.callbacks()->resetStream();
788 0 : return;
789 0 : }
790 80 : return upstream_request_.onResetStream(reset_reason, transport_failure_reason);
791 80 : }
792 :
793 0 : Upstream::ClusterInfoConstSharedPtr UpstreamRequestFilterManagerCallbacks::clusterInfo() {
794 0 : return upstream_request_.parent_.callbacks()->clusterInfo();
795 0 : }
796 :
797 : Http::Http1StreamEncoderOptionsOptRef
798 0 : UpstreamRequestFilterManagerCallbacks::http1StreamEncoderOptions() {
799 0 : return upstream_request_.parent_.callbacks()->http1StreamEncoderOptions();
800 0 : }
801 :
802 : } // namespace Router
803 : } // namespace Envoy
|