Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/router/upstream_request.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/router/upstream_request.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8
9
#include "envoy/event/dispatcher.h"
10
#include "envoy/event/timer.h"
11
#include "envoy/grpc/status.h"
12
#include "envoy/http/conn_pool.h"
13
#include "envoy/http/header_map.h"
14
#include "envoy/runtime/runtime.h"
15
#include "envoy/upstream/cluster_manager.h"
16
#include "envoy/upstream/upstream.h"
17
18
#include "source/common/common/assert.h"
19
#include "source/common/common/dump_state_utils.h"
20
#include "source/common/common/empty_string.h"
21
#include "source/common/common/enum_to_int.h"
22
#include "source/common/common/scope_tracker.h"
23
#include "source/common/common/utility.h"
24
#include "source/common/grpc/common.h"
25
#include "source/common/http/codes.h"
26
#include "source/common/http/header_map_impl.h"
27
#include "source/common/http/headers.h"
28
#include "source/common/http/message_impl.h"
29
#include "source/common/http/utility.h"
30
#include "source/common/network/application_protocol.h"
31
#include "source/common/network/transport_socket_options_impl.h"
32
#include "source/common/network/upstream_server_name.h"
33
#include "source/common/network/upstream_subject_alt_names.h"
34
#include "source/common/router/config_impl.h"
35
#include "source/common/router/debug_config.h"
36
#include "source/common/router/router.h"
37
#include "source/common/stream_info/uint32_accessor_impl.h"
38
#include "source/common/tracing/http_tracer_impl.h"
39
#include "source/extensions/common/proxy_protocol/proxy_protocol_header.h"
40
41
namespace Envoy {
42
namespace Router {
43
44
// The upstream HTTP filter manager class.
45
class UpstreamFilterManager : public Http::FilterManager {
46
public:
47
  UpstreamFilterManager(Http::FilterManagerCallbacks& filter_manager_callbacks,
48
                        Event::Dispatcher& dispatcher, OptRef<const Network::Connection> connection,
49
                        uint64_t stream_id, Buffer::BufferMemoryAccountSharedPtr account,
50
                        bool proxy_100_continue, uint32_t buffer_limit,
51
                        const Http::FilterChainFactory& filter_chain_factory,
52
                        UpstreamRequest& request)
53
      : FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
54
                      proxy_100_continue, buffer_limit, filter_chain_factory),
55
2.40k
        upstream_request_(request) {}
56
57
4.62k
  StreamInfo::StreamInfo& streamInfo() override {
58
4.62k
    return upstream_request_.parent_.callbacks()->streamInfo();
59
4.62k
  }
60
0
  const StreamInfo::StreamInfo& streamInfo() const override {
61
0
    return upstream_request_.parent_.callbacks()->streamInfo();
62
0
  }
63
  // Send local replies via the downstream HTTP filter manager.
64
  // Local replies will not be seen by upstream HTTP filters.
65
  void sendLocalReply(Http::Code code, absl::string_view body,
66
                      const std::function<void(Http::ResponseHeaderMap& headers)>& modify_headers,
67
                      const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
68
0
                      absl::string_view details) override {
69
0
    state().decoder_filter_chain_aborted_ = true;
70
0
    state().encoder_filter_chain_aborted_ = true;
71
0
    state().remote_encode_complete_ = true;
72
0
    state().local_complete_ = true;
73
    // TODO(alyssawilk) this should be done through the router to play well with hedging.
74
0
    upstream_request_.parent_.callbacks()->sendLocalReply(code, body, modify_headers, grpc_status,
75
0
                                                          details);
76
0
  }
77
0
  void executeLocalReplyIfPrepared() override {}
78
  UpstreamRequest& upstream_request_;
79
};
80
81
UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
82
                                 std::unique_ptr<GenericConnPool>&& conn_pool,
83
                                 bool can_send_early_data, bool can_use_http3)
84
    : parent_(parent), conn_pool_(std::move(conn_pool)),
85
      stream_info_(parent_.callbacks()->dispatcher().timeSource(), nullptr),
86
      start_time_(parent_.callbacks()->dispatcher().timeSource().monotonicTime()),
87
      calling_encode_headers_(false), upstream_canary_(false), router_sent_end_stream_(false),
88
      encode_trailers_(false), retried_(false), awaiting_headers_(true),
89
      outlier_detection_timeout_recorded_(false),
90
      create_per_try_timeout_on_request_complete_(false), paused_for_connect_(false),
91
      reset_stream_(false),
92
      record_timeout_budget_(parent_.cluster()->timeoutBudgetStats().has_value()),
93
      cleaned_up_(false), had_upstream_(false),
94
      stream_options_({can_send_early_data, can_use_http3}), grpc_rq_success_deferred_(false),
95
      upstream_wait_for_response_headers_before_disabling_read_(Runtime::runtimeFeatureEnabled(
96
2.40k
          "envoy.reloadable_features.upstream_wait_for_response_headers_before_disabling_read")) {
97
2.40k
  if (auto tracing_config = parent_.callbacks()->tracingConfig(); tracing_config.has_value()) {
98
1.12k
    if (tracing_config->spawnUpstreamSpan() || parent_.config().start_child_span_) {
99
0
      span_ = parent_.callbacks()->activeSpan().spawnChild(
100
0
          tracing_config.value().get(),
101
0
          absl::StrCat("router ", parent.cluster()->observabilityName(), " egress"),
102
0
          parent_.callbacks()->dispatcher().timeSource().systemTime());
103
0
      if (parent.attemptCount() != 1) {
104
        // This is a retry request, add this metadata to span.
105
0
        span_->setTag(Tracing::Tags::get().RetryCount, std::to_string(parent.attemptCount() - 1));
106
0
      }
107
0
    }
108
1.12k
  }
109
110
  // The router checks that the connection pool is non-null before creating the upstream request.
111
2.40k
  auto upstream_host = conn_pool_->host();
112
2.40k
  if (span_ != nullptr) {
113
0
    span_->injectContext(*parent_.downstreamHeaders(), upstream_host);
114
2.40k
  } else {
115
    // No independent child span for current upstream request then inject the parent span's tracing
116
    // context into the request headers.
117
    // The injectContext() of the parent span may be called repeatedly when the request is retried.
118
2.40k
    parent_.callbacks()->activeSpan().injectContext(*parent_.downstreamHeaders(), upstream_host);
119
2.40k
  }
120
121
2.40k
  stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
122
2.40k
  stream_info_.route_ = parent_.callbacks()->route();
123
2.40k
  parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo());
124
125
2.40k
  stream_info_.healthCheck(parent_.callbacks()->streamInfo().healthCheck());
126
2.40k
  stream_info_.setIsShadow(parent_.callbacks()->streamInfo().isShadow());
127
2.40k
  absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info =
128
2.40k
      parent_.callbacks()->streamInfo().upstreamClusterInfo();
129
2.40k
  if (cluster_info.has_value()) {
130
2.40k
    stream_info_.setUpstreamClusterInfo(*cluster_info);
131
2.40k
  }
132
133
  // Set up the upstream HTTP filter manager.
134
2.40k
  filter_manager_callbacks_ = std::make_unique<UpstreamRequestFilterManagerCallbacks>(*this);
135
2.40k
  filter_manager_ = std::make_unique<UpstreamFilterManager>(
136
2.40k
      *filter_manager_callbacks_, parent_.callbacks()->dispatcher(), connection(),
137
2.40k
      parent_.callbacks()->streamId(), parent_.callbacks()->account(), true,
138
2.40k
      parent_.callbacks()->decoderBufferLimit(), *parent_.cluster(), *this);
139
  // Attempt to create custom cluster-specified filter chain
140
2.40k
  bool created = parent_.cluster()->createFilterChain(*filter_manager_,
141
2.40k
                                                      /*only_create_if_configured=*/true);
142
2.40k
  if (!created) {
143
    // Attempt to create custom router-specified filter chain.
144
2.40k
    created = parent_.config().createFilterChain(*filter_manager_);
145
2.40k
  }
146
2.40k
  if (!created) {
147
    // Neither cluster nor router have a custom filter chain; add the default
148
    // cluster filter chain, which only consists of the codec filter.
149
2.40k
    created = parent_.cluster()->createFilterChain(*filter_manager_, false);
150
2.40k
  }
151
  // There will always be a codec filter present, which sets the upstream
152
  // interface. Fast-fail any tests that don't set up mocks correctly.
153
2.40k
  ASSERT(created && upstream_interface_.has_value());
154
2.40k
}
Unexecuted instantiation: Envoy::Router::UpstreamRequest::UpstreamRequest(Envoy::Router::RouterFilterInterface&, std::__1::unique_ptr<Envoy::Router::GenericConnPool, std::__1::default_delete<Envoy::Router::GenericConnPool> >&&, bool, bool)
Envoy::Router::UpstreamRequest::UpstreamRequest(Envoy::Router::RouterFilterInterface&, std::__1::unique_ptr<Envoy::Router::GenericConnPool, std::__1::default_delete<Envoy::Router::GenericConnPool> >&&, bool, bool)
Line
Count
Source
96
2.40k
          "envoy.reloadable_features.upstream_wait_for_response_headers_before_disabling_read")) {
97
2.40k
  if (auto tracing_config = parent_.callbacks()->tracingConfig(); tracing_config.has_value()) {
98
1.12k
    if (tracing_config->spawnUpstreamSpan() || parent_.config().start_child_span_) {
99
0
      span_ = parent_.callbacks()->activeSpan().spawnChild(
100
0
          tracing_config.value().get(),
101
0
          absl::StrCat("router ", parent.cluster()->observabilityName(), " egress"),
102
0
          parent_.callbacks()->dispatcher().timeSource().systemTime());
103
0
      if (parent.attemptCount() != 1) {
104
        // This is a retry request, add this metadata to span.
105
0
        span_->setTag(Tracing::Tags::get().RetryCount, std::to_string(parent.attemptCount() - 1));
106
0
      }
107
0
    }
108
1.12k
  }
109
110
  // The router checks that the connection pool is non-null before creating the upstream request.
111
2.40k
  auto upstream_host = conn_pool_->host();
112
2.40k
  if (span_ != nullptr) {
113
0
    span_->injectContext(*parent_.downstreamHeaders(), upstream_host);
114
2.40k
  } else {
115
    // No independent child span for current upstream request then inject the parent span's tracing
116
    // context into the request headers.
117
    // The injectContext() of the parent span may be called repeatedly when the request is retried.
118
2.40k
    parent_.callbacks()->activeSpan().injectContext(*parent_.downstreamHeaders(), upstream_host);
119
2.40k
  }
120
121
2.40k
  stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
122
2.40k
  stream_info_.route_ = parent_.callbacks()->route();
123
2.40k
  parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo());
124
125
2.40k
  stream_info_.healthCheck(parent_.callbacks()->streamInfo().healthCheck());
126
2.40k
  stream_info_.setIsShadow(parent_.callbacks()->streamInfo().isShadow());
127
2.40k
  absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info =
128
2.40k
      parent_.callbacks()->streamInfo().upstreamClusterInfo();
129
2.40k
  if (cluster_info.has_value()) {
130
2.40k
    stream_info_.setUpstreamClusterInfo(*cluster_info);
131
2.40k
  }
132
133
  // Set up the upstream HTTP filter manager.
134
2.40k
  filter_manager_callbacks_ = std::make_unique<UpstreamRequestFilterManagerCallbacks>(*this);
135
2.40k
  filter_manager_ = std::make_unique<UpstreamFilterManager>(
136
2.40k
      *filter_manager_callbacks_, parent_.callbacks()->dispatcher(), connection(),
137
2.40k
      parent_.callbacks()->streamId(), parent_.callbacks()->account(), true,
138
2.40k
      parent_.callbacks()->decoderBufferLimit(), *parent_.cluster(), *this);
139
  // Attempt to create custom cluster-specified filter chain
140
2.40k
  bool created = parent_.cluster()->createFilterChain(*filter_manager_,
141
2.40k
                                                      /*only_create_if_configured=*/true);
142
2.40k
  if (!created) {
143
    // Attempt to create custom router-specified filter chain.
144
2.40k
    created = parent_.config().createFilterChain(*filter_manager_);
145
2.40k
  }
146
2.40k
  if (!created) {
147
    // Neither cluster nor router have a custom filter chain; add the default
148
    // cluster filter chain, which only consists of the codec filter.
149
2.40k
    created = parent_.cluster()->createFilterChain(*filter_manager_, false);
150
2.40k
  }
151
  // There will always be a codec filter present, which sets the upstream
152
  // interface. Fast-fail any tests that don't set up mocks correctly.
153
2.40k
  ASSERT(created && upstream_interface_.has_value());
154
2.40k
}
155
156
2.40k
UpstreamRequest::~UpstreamRequest() { cleanUp(); }
157
158
4.80k
void UpstreamRequest::cleanUp() {
159
4.80k
  if (cleaned_up_) {
160
2.40k
    return;
161
2.40k
  }
162
2.40k
  cleaned_up_ = true;
163
164
2.40k
  filter_manager_->destroyFilters();
165
166
2.40k
  if (span_ != nullptr) {
167
0
    auto tracing_config = parent_.callbacks()->tracingConfig();
168
0
    ASSERT(tracing_config.has_value());
169
0
    Tracing::HttpTracerUtility::finalizeUpstreamSpan(*span_, stream_info_,
170
0
                                                     tracing_config.value().get());
171
0
  }
172
173
2.40k
  if (per_try_timeout_ != nullptr) {
174
    // Allows for testing.
175
0
    per_try_timeout_->disableTimer();
176
0
  }
177
178
2.40k
  if (per_try_idle_timeout_ != nullptr) {
179
    // Allows for testing.
180
0
    per_try_idle_timeout_->disableTimer();
181
0
  }
182
183
2.40k
  if (max_stream_duration_timer_ != nullptr) {
184
0
    max_stream_duration_timer_->disableTimer();
185
0
  }
186
187
2.40k
  if (upstream_log_flush_timer_ != nullptr) {
188
0
    upstream_log_flush_timer_->disableTimer();
189
0
  }
190
191
2.40k
  clearRequestEncoder();
192
193
  // If desired, fire the per-try histogram when the UpstreamRequest
194
  // completes.
195
2.40k
  if (record_timeout_budget_) {
196
6
    Event::Dispatcher& dispatcher = parent_.callbacks()->dispatcher();
197
6
    const MonotonicTime end_time = dispatcher.timeSource().monotonicTime();
198
6
    const std::chrono::milliseconds response_time =
199
6
        std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time_);
200
6
    Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = parent_.cluster()->timeoutBudgetStats();
201
6
    tb_stats->get().upstream_rq_timeout_budget_per_try_percent_used_.recordValue(
202
6
        FilterUtility::percentageOfTimeout(response_time, parent_.timeout().per_try_timeout_));
203
6
  }
204
205
  // Ditto for request/response size histograms.
206
2.40k
  Upstream::ClusterRequestResponseSizeStatsOptRef req_resp_stats_opt =
207
2.40k
      parent_.cluster()->requestResponseSizeStats();
208
2.40k
  if (req_resp_stats_opt.has_value() && parent_.downstreamHeaders()) {
209
6
    auto& req_resp_stats = req_resp_stats_opt->get();
210
6
    req_resp_stats.upstream_rq_headers_size_.recordValue(parent_.downstreamHeaders()->byteSize());
211
6
    req_resp_stats.upstream_rq_body_size_.recordValue(stream_info_.bytesSent());
212
213
6
    if (response_headers_size_.has_value()) {
214
0
      req_resp_stats.upstream_rs_headers_size_.recordValue(response_headers_size_.value());
215
0
      req_resp_stats.upstream_rs_body_size_.recordValue(stream_info_.bytesReceived());
216
0
    }
217
6
  }
218
219
2.40k
  stream_info_.onRequestComplete();
220
2.40k
  upstreamLog(AccessLog::AccessLogType::UpstreamEnd);
221
222
2.40k
  while (downstream_data_disabled_ != 0) {
223
0
    parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
224
0
    parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc();
225
0
    --downstream_data_disabled_;
226
0
  }
227
  // The upstream HTTP filter chain callbacks own headers/trailers while they are traversing the
228
  // filter chain. Make sure to not delete them immediately when the stream ends, as the stream
229
  // often ends during filter chain processing and it causes use-after-free violations.
230
2.40k
  parent_.callbacks()->dispatcher().deferredDelete(std::move(filter_manager_callbacks_));
231
2.40k
}
232
233
2.40k
void UpstreamRequest::upstreamLog(AccessLog::AccessLogType access_log_type) {
234
2.40k
  const Formatter::HttpFormatterContext log_context{parent_.downstreamHeaders(),
235
2.40k
                                                    upstream_headers_.get(),
236
2.40k
                                                    upstream_trailers_.get(),
237
2.40k
                                                    {},
238
2.40k
                                                    access_log_type};
239
240
2.40k
  for (const auto& upstream_log : parent_.config().upstream_logs_) {
241
0
    upstream_log->log(log_context, stream_info_);
242
0
  }
243
2.40k
}
244
245
// This is called by the FilterManager when all filters have processed 1xx headers. Forward them
246
// on to the router.
247
1
void UpstreamRequest::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) {
248
1
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
249
250
1
  ASSERT(Http::HeaderUtility::isSpecial1xx(*headers));
251
1
  addResponseHeadersSize(headers->byteSize());
252
1
  maybeHandleDeferredReadDisable();
253
1
  parent_.onUpstream1xxHeaders(std::move(headers), *this);
254
1
}
255
256
// This is called by the FilterManager when all filters have processed headers. Forward them
257
// on to the router.
258
2.22k
void UpstreamRequest::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
259
2.22k
  ASSERT(headers.get());
260
2.22k
  ENVOY_STREAM_LOG(trace, "upstream response headers:\n{}", *parent_.callbacks(), *headers);
261
2.22k
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
262
263
2.22k
  resetPerTryIdleTimer();
264
265
2.22k
  addResponseHeadersSize(headers->byteSize());
266
267
  // We drop unsupported 1xx on the floor here. 101 upgrade headers need to be passed to the client
268
  // as part of the final response. Most 1xx headers are handled in onUpstream1xxHeaders.
269
  //
270
  // We could in principle handle other headers here, but this might result in the double invocation
271
  // of decodeHeaders() (once for informational, again for non-informational), which is likely an
272
  // easy to miss corner case in the filter and HCM contract.
273
  //
274
  // This filtering is done early in upstream request, unlike 100 coalescing which is performed in
275
  // the router filter, since the filtering only depends on the state of a single upstream, and we
276
  // don't want to confuse accounting such as onFirstUpstreamRxByteReceived() with informational
277
  // headers.
278
2.22k
  const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
279
2.22k
  if (Http::CodeUtility::is1xx(response_code) &&
280
2.22k
      response_code != enumToInt(Http::Code::SwitchingProtocols)) {
281
0
    return;
282
0
  }
283
284
2.22k
  awaiting_headers_ = false;
285
2.22k
  if (span_ != nullptr) {
286
0
    Tracing::HttpTracerUtility::onUpstreamResponseHeaders(*span_, headers.get());
287
0
  }
288
2.22k
  if (!parent_.config().upstream_logs_.empty()) {
289
0
    upstream_headers_ = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(*headers);
290
0
  }
291
2.22k
  stream_info_.setResponseCode(static_cast<uint32_t>(response_code));
292
293
2.22k
  maybeHandleDeferredReadDisable();
294
2.22k
  ASSERT(headers.get());
295
296
2.22k
  parent_.onUpstreamHeaders(response_code, std::move(headers), *this, end_stream);
297
2.22k
}
298
299
2.22k
void UpstreamRequest::maybeHandleDeferredReadDisable() {
300
2.22k
  for (; deferred_read_disabling_count_ > 0; --deferred_read_disabling_count_) {
301
    // If the deferred read disabling count hasn't been cancelled out by read
302
    // enabling count so far, stop the upstream from reading the rest response.
303
    // Because readDisable keeps track of how many time it is called with
304
    // "true" or "false", here it has to be called with "true" the same number
305
    // of times as it would be called with "false" in the future.
306
0
    parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
307
0
    upstream_->readDisable(true);
308
0
  }
309
2.22k
}
310
311
4.28k
void UpstreamRequest::decodeData(Buffer::Instance& data, bool end_stream) {
312
4.28k
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
313
314
4.28k
  resetPerTryIdleTimer();
315
4.28k
  stream_info_.addBytesReceived(data.length());
316
4.28k
  parent_.onUpstreamData(data, *this, end_stream);
317
4.28k
}
318
319
3
void UpstreamRequest::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) {
320
3
  ENVOY_STREAM_LOG(trace, "upstream response trailers:\n{}", *parent_.callbacks(), *trailers);
321
3
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
322
323
3
  if (span_ != nullptr) {
324
0
    Tracing::HttpTracerUtility::onUpstreamResponseTrailers(*span_, trailers.get());
325
0
  }
326
3
  if (!parent_.config().upstream_logs_.empty()) {
327
0
    upstream_trailers_ = Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*trailers);
328
0
  }
329
3
  parent_.onUpstreamTrailers(std::move(trailers), *this);
330
3
}
331
332
0
void UpstreamRequest::dumpState(std::ostream& os, int indent_level) const {
333
0
  const char* spaces = spacesForLevel(indent_level);
334
0
  os << spaces << "UpstreamRequest " << this << "\n";
335
0
  if (connection()) {
336
0
    const auto addressProvider = connection()->connectionInfoProviderSharedPtr();
337
0
    DUMP_DETAILS(addressProvider);
338
0
  }
339
0
  const Http::RequestHeaderMap* request_headers = parent_.downstreamHeaders();
340
0
  DUMP_DETAILS(request_headers);
341
0
  if (filter_manager_) {
342
0
    filter_manager_->dumpState(os, indent_level);
343
0
  }
344
0
}
345
346
2.32k
const Route& UpstreamRequest::route() const { return *parent_.callbacks()->route(); }
347
348
2.40k
OptRef<const Network::Connection> UpstreamRequest::connection() const {
349
2.40k
  return parent_.callbacks()->connection();
350
2.40k
}
351
352
181
void UpstreamRequest::decodeMetadata(Http::MetadataMapPtr&& metadata_map) {
353
181
  parent_.onUpstreamMetadata(std::move(metadata_map));
354
181
}
355
356
0
void UpstreamRequest::maybeEndDecode(bool end_stream) {
357
0
  if (end_stream) {
358
0
    upstreamTiming().onLastUpstreamRxByteReceived(parent_.callbacks()->dispatcher().timeSource());
359
0
  }
360
0
}
361
362
void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
363
2.32k
                                             bool pool_success) {
364
2.32k
  StreamInfo::UpstreamInfo& upstream_info = *streamInfo().upstreamInfo();
365
2.32k
  upstream_info.setUpstreamHost(host);
366
2.32k
  upstream_host_ = host;
367
2.32k
  parent_.onUpstreamHostSelected(host, pool_success);
368
2.32k
}
369
370
2.40k
void UpstreamRequest::acceptHeadersFromRouter(bool end_stream) {
371
2.40k
  ASSERT(!router_sent_end_stream_);
372
2.40k
  router_sent_end_stream_ = end_stream;
373
374
  // Make sure that when we are forwarding CONNECT payload we do not do so until
375
  // the upstream has accepted the CONNECT request.
376
  // This must be done before conn_pool->newStream, as onPoolReady un-pauses for CONNECT
377
  // termination.
378
2.40k
  auto* headers = parent_.downstreamHeaders();
379
2.40k
  if (headers->getMethodValue() == Http::Headers::get().MethodValues.Connect) {
380
0
    paused_for_connect_ = true;
381
0
  }
382
383
  // Kick off creation of the upstream connection immediately upon receiving headers.
384
  // In future it may be possible for upstream HTTP filters to delay this, or influence connection
385
  // creation but for now optimize for minimal latency and fetch the connection
386
  // as soon as possible.
387
2.40k
  conn_pool_->newStream(this);
388
389
2.40k
  if (parent_.config().upstream_log_flush_interval_.has_value()) {
390
0
    upstream_log_flush_timer_ = parent_.callbacks()->dispatcher().createTimer([this]() -> void {
391
      // If the request is complete, we've already done the stream-end upstream log, and shouldn't
392
      // do the periodic log.
393
0
      if (!streamInfo().requestComplete().has_value()) {
394
0
        upstreamLog(AccessLog::AccessLogType::UpstreamPeriodic);
395
0
        resetUpstreamLogFlushTimer();
396
0
      }
397
      // Both downstream and upstream bytes meters may not be initialized when
398
      // the timer goes off, e.g. if it takes longer than the interval for a
399
      // connection to be initialized; check for nullptr.
400
0
      auto& downstream_bytes_meter = stream_info_.getDownstreamBytesMeter();
401
0
      auto& upstream_bytes_meter = stream_info_.getUpstreamBytesMeter();
402
0
      const SystemTime now = parent_.callbacks()->dispatcher().timeSource().systemTime();
403
0
      if (downstream_bytes_meter) {
404
0
        downstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now);
405
0
      }
406
0
      if (upstream_bytes_meter) {
407
0
        upstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now);
408
0
      }
409
0
    });
410
411
0
    resetUpstreamLogFlushTimer();
412
0
  }
413
414
2.40k
  filter_manager_->requestHeadersInitialized();
415
2.40k
  filter_manager_->streamInfo().setRequestHeaders(*parent_.downstreamHeaders());
416
2.40k
  filter_manager_->decodeHeaders(*parent_.downstreamHeaders(), end_stream);
417
2.40k
}
418
419
4.66k
void UpstreamRequest::acceptDataFromRouter(Buffer::Instance& data, bool end_stream) {
420
4.66k
  ASSERT(!router_sent_end_stream_);
421
4.66k
  router_sent_end_stream_ = end_stream;
422
423
4.66k
  filter_manager_->decodeData(data, end_stream);
424
4.66k
}
425
426
0
void UpstreamRequest::acceptTrailersFromRouter(Http::RequestTrailerMap& trailers) {
427
0
  ASSERT(!router_sent_end_stream_);
428
0
  router_sent_end_stream_ = true;
429
0
  encode_trailers_ = true;
430
431
0
  filter_manager_->decodeTrailers(trailers);
432
0
}
433
434
6
void UpstreamRequest::acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr) {
435
6
  filter_manager_->decodeMetadata(*metadata_map_ptr);
436
6
}
437
438
void UpstreamRequest::onResetStream(Http::StreamResetReason reason,
439
74
                                    absl::string_view transport_failure_reason) {
440
74
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
441
442
74
  if (span_ != nullptr) {
443
    // Add tags about reset.
444
0
    span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
445
0
    span_->setTag(Tracing::Tags::get().ErrorReason, Http::Utility::resetReasonToString(reason));
446
0
  }
447
74
  clearRequestEncoder();
448
74
  awaiting_headers_ = false;
449
74
  if (!calling_encode_headers_) {
450
74
    stream_info_.setResponseFlag(Filter::streamResetReasonToResponseFlag(reason));
451
74
    parent_.onUpstreamReset(reason, transport_failure_reason, *this);
452
74
  } else {
453
0
    deferred_reset_reason_ = reason;
454
0
  }
455
74
}
456
457
1.24k
void UpstreamRequest::resetStream() {
458
1.24k
  if (conn_pool_->cancelAnyPendingStream()) {
459
71
    ENVOY_STREAM_LOG(debug, "canceled pool request", *parent_.callbacks());
460
71
    ASSERT(!upstream_);
461
71
  }
462
463
  // Don't reset the stream if we're already done with it.
464
1.24k
  if (upstreamTiming().last_upstream_tx_byte_sent_.has_value() &&
465
1.24k
      upstreamTiming().last_upstream_rx_byte_received_.has_value()) {
466
0
    return;
467
0
  }
468
469
1.24k
  if (span_ != nullptr) {
470
    // Add tags about the cancellation.
471
0
    span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True);
472
0
  }
473
474
1.24k
  if (upstream_) {
475
1.17k
    ENVOY_STREAM_LOG(debug, "resetting pool request", *parent_.callbacks());
476
1.17k
    upstream_->resetStream();
477
1.17k
    clearRequestEncoder();
478
1.17k
  }
479
1.24k
  reset_stream_ = true;
480
1.24k
}
481
482
6.50k
void UpstreamRequest::resetPerTryIdleTimer() {
483
6.50k
  if (per_try_idle_timeout_ != nullptr) {
484
0
    per_try_idle_timeout_->enableTimer(parent_.timeout().per_try_idle_timeout_);
485
0
  }
486
6.50k
}
487
488
0
void UpstreamRequest::resetUpstreamLogFlushTimer() {
489
0
  if (upstream_log_flush_timer_ != nullptr) {
490
0
    upstream_log_flush_timer_->enableTimer(parent_.config().upstream_log_flush_interval_.value());
491
0
  }
492
0
}
493
494
2.15k
void UpstreamRequest::setupPerTryTimeout() {
495
2.15k
  ASSERT(!per_try_timeout_);
496
2.15k
  if (parent_.timeout().per_try_timeout_.count() > 0) {
497
0
    per_try_timeout_ =
498
0
        parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryTimeout(); });
499
0
    per_try_timeout_->enableTimer(parent_.timeout().per_try_timeout_);
500
0
  }
501
502
2.15k
  ASSERT(!per_try_idle_timeout_);
503
2.15k
  if (parent_.timeout().per_try_idle_timeout_.count() > 0) {
504
0
    per_try_idle_timeout_ =
505
0
        parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryIdleTimeout(); });
506
0
    resetPerTryIdleTimer();
507
0
  }
508
2.15k
}
509
510
0
void UpstreamRequest::onPerTryIdleTimeout() {
511
0
  ENVOY_STREAM_LOG(debug, "upstream per try idle timeout", *parent_.callbacks());
512
0
  stream_info_.setResponseFlag(StreamInfo::ResponseFlag::StreamIdleTimeout);
513
0
  parent_.onPerTryIdleTimeout(*this);
514
0
}
515
516
0
void UpstreamRequest::onPerTryTimeout() {
517
  // If we've sent anything downstream, ignore the per try timeout and let the response continue
518
  // up to the global timeout
519
0
  if (!parent_.downstreamResponseStarted()) {
520
0
    ENVOY_STREAM_LOG(debug, "upstream per try timeout", *parent_.callbacks());
521
522
0
    stream_info_.setResponseFlag(StreamInfo::ResponseFlag::UpstreamRequestTimeout);
523
0
    parent_.onPerTryTimeout(*this);
524
0
  } else {
525
0
    ENVOY_STREAM_LOG(debug,
526
0
                     "ignored upstream per try timeout due to already started downstream response",
527
0
                     *parent_.callbacks());
528
0
  }
529
0
}
530
531
2.32k
void UpstreamRequest::recordConnectionPoolCallbackLatency() {
532
2.32k
  upstreamTiming().recordConnectionPoolCallbackLatency(
533
2.32k
      start_time_, parent_.callbacks()->dispatcher().timeSource());
534
2.32k
}
535
536
void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason,
537
                                    absl::string_view transport_failure_reason,
538
0
                                    Upstream::HostDescriptionConstSharedPtr host) {
539
0
  recordConnectionPoolCallbackLatency();
540
0
  Http::StreamResetReason reset_reason = [](ConnectionPool::PoolFailureReason reason) {
541
0
    switch (reason) {
542
0
    case ConnectionPool::PoolFailureReason::Overflow:
543
0
      return Http::StreamResetReason::Overflow;
544
0
    case ConnectionPool::PoolFailureReason::RemoteConnectionFailure:
545
0
      return Http::StreamResetReason::RemoteConnectionFailure;
546
0
    case ConnectionPool::PoolFailureReason::LocalConnectionFailure:
547
0
      return Http::StreamResetReason::LocalConnectionFailure;
548
0
    case ConnectionPool::PoolFailureReason::Timeout:
549
0
      return Http::StreamResetReason::ConnectionTimeout;
550
0
    }
551
0
    PANIC_DUE_TO_CORRUPT_ENUM;
552
0
  }(reason);
553
554
0
  stream_info_.upstreamInfo()->setUpstreamTransportFailureReason(transport_failure_reason);
555
556
  // Mimic an upstream reset.
557
0
  onUpstreamHostSelected(host, false);
558
0
  onResetStream(reset_reason, transport_failure_reason);
559
0
}
560
561
void UpstreamRequest::onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
562
                                  Upstream::HostDescriptionConstSharedPtr host,
563
                                  const Network::ConnectionInfoProvider& address_provider,
564
                                  StreamInfo::StreamInfo& info,
565
2.32k
                                  absl::optional<Http::Protocol> protocol) {
566
  // This may be called under an existing ScopeTrackerScopeState but it will unwind correctly.
567
2.32k
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
568
2.32k
  ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks());
569
2.32k
  recordConnectionPoolCallbackLatency();
570
2.32k
  upstream_ = std::move(upstream);
571
2.32k
  had_upstream_ = true;
572
  // Have the upstream use the account of the downstream.
573
2.32k
  upstream_->setAccount(parent_.callbacks()->account());
574
575
2.32k
  host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess);
576
577
2.32k
  onUpstreamHostSelected(host, true);
578
579
2.32k
  if (protocol) {
580
2.32k
    stream_info_.protocol(protocol.value());
581
2.32k
  } else {
582
    // We only pause for CONNECT for HTTP upstreams. If this is a TCP upstream, unpause.
583
0
    paused_for_connect_ = false;
584
0
  }
585
586
2.32k
  StreamInfo::UpstreamInfo& upstream_info = *stream_info_.upstreamInfo();
587
2.32k
  if (info.upstreamInfo()) {
588
2.32k
    auto& upstream_timing = info.upstreamInfo()->upstreamTiming();
589
2.32k
    upstreamTiming().upstream_connect_start_ = upstream_timing.upstream_connect_start_;
590
2.32k
    upstreamTiming().upstream_connect_complete_ = upstream_timing.upstream_connect_complete_;
591
2.32k
    upstreamTiming().upstream_handshake_complete_ = upstream_timing.upstream_handshake_complete_;
592
2.32k
    upstream_info.setUpstreamNumStreams(info.upstreamInfo()->upstreamNumStreams());
593
2.32k
  }
594
595
  // Upstream HTTP filters might have already created/set a filter state.
596
2.32k
  const StreamInfo::FilterStateSharedPtr& filter_state = info.filterState();
597
2.32k
  if (!filter_state) {
598
0
    upstream_info.setUpstreamFilterState(
599
0
        std::make_shared<StreamInfo::FilterStateImpl>(StreamInfo::FilterState::LifeSpan::Request));
600
2.32k
  } else {
601
2.32k
    upstream_info.setUpstreamFilterState(filter_state);
602
2.32k
  }
603
2.32k
  upstream_info.setUpstreamLocalAddress(address_provider.localAddress());
604
2.32k
  upstream_info.setUpstreamRemoteAddress(address_provider.remoteAddress());
605
2.32k
  upstream_info.setUpstreamSslConnection(info.downstreamAddressProvider().sslConnection());
606
607
2.32k
  if (info.downstreamAddressProvider().connectionID().has_value()) {
608
2.32k
    upstream_info.setUpstreamConnectionId(info.downstreamAddressProvider().connectionID().value());
609
2.32k
  }
610
611
2.32k
  if (info.downstreamAddressProvider().interfaceName().has_value()) {
612
0
    upstream_info.setUpstreamInterfaceName(
613
0
        info.downstreamAddressProvider().interfaceName().value());
614
0
  }
615
616
2.32k
  stream_info_.setUpstreamBytesMeter(upstream_->bytesMeter());
617
2.32k
  StreamInfo::StreamInfo::syncUpstreamAndDownstreamBytesMeter(parent_.callbacks()->streamInfo(),
618
2.32k
                                                              stream_info_);
619
2.32k
  if (protocol) {
620
2.32k
    upstream_info.setUpstreamProtocol(protocol.value());
621
2.32k
  }
622
623
2.32k
  if (parent_.downstreamEndStream()) {
624
615
    setupPerTryTimeout();
625
1.71k
  } else {
626
1.71k
    create_per_try_timeout_on_request_complete_ = true;
627
1.71k
  }
628
629
  // Make sure the connection manager will inform the downstream watermark manager when the
630
  // downstream buffers are overrun. This may result in immediate watermark callbacks referencing
631
  // the encoder.
632
2.32k
  parent_.callbacks()->addDownstreamWatermarkCallbacks(downstream_watermark_manager_);
633
634
2.32k
  absl::optional<std::chrono::milliseconds> max_stream_duration;
635
2.32k
  if (parent_.dynamicMaxStreamDuration().has_value()) {
636
0
    max_stream_duration = parent_.dynamicMaxStreamDuration().value();
637
2.32k
  } else if (upstream_host_->cluster().commonHttpProtocolOptions().has_max_stream_duration()) {
638
0
    max_stream_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
639
0
        upstream_host_->cluster().commonHttpProtocolOptions().max_stream_duration()));
640
0
  }
641
2.32k
  if (max_stream_duration.has_value() && max_stream_duration->count()) {
642
0
    max_stream_duration_timer_ = parent_.callbacks()->dispatcher().createTimer(
643
0
        [this]() -> void { onStreamMaxDurationReached(); });
644
0
    max_stream_duration_timer_->enableTimer(*max_stream_duration);
645
0
  }
646
647
2.32k
  const auto* route_entry = route().routeEntry();
648
2.32k
  if (route_entry->autoHostRewrite() && !host->hostname().empty()) {
649
0
    Http::Utility::updateAuthority(*parent_.downstreamHeaders(), host->hostname(),
650
0
                                   route_entry->appendXfh());
651
0
  }
652
653
2.32k
  stream_info_.setRequestHeaders(*parent_.downstreamHeaders());
654
655
2.32k
  if (parent_.config().flush_upstream_log_on_upstream_stream_) {
656
0
    upstreamLog(AccessLog::AccessLogType::UpstreamPoolReady);
657
0
  }
658
659
2.32k
  if (address_provider.connectionID() && stream_info_.downstreamAddressProvider().connectionID()) {
660
0
    ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
661
0
              address_provider.connectionID().value(),
662
0
              stream_info_.downstreamAddressProvider().connectionID().value());
663
0
  }
664
665
2.32k
  for (auto* callback : upstream_callbacks_) {
666
2.32k
    callback->onUpstreamConnectionEstablished();
667
2.32k
  }
668
2.32k
}
669
670
7.12k
UpstreamToDownstream& UpstreamRequest::upstreamToDownstream() { return *upstream_interface_; }
671
672
0
void UpstreamRequest::onStreamMaxDurationReached() {
673
0
  upstream_host_->cluster().trafficStats()->upstream_rq_max_duration_reached_.inc();
674
675
  // The upstream had closed then try to retry along with retry policy.
676
0
  parent_.onStreamMaxDurationReached(*this);
677
0
}
678
679
3.64k
void UpstreamRequest::clearRequestEncoder() {
680
  // Before clearing the encoder, unsubscribe from callbacks.
681
3.64k
  if (upstream_) {
682
2.32k
    parent_.callbacks()->removeDownstreamWatermarkCallbacks(downstream_watermark_manager_);
683
2.32k
  }
684
3.64k
  upstream_.reset();
685
3.64k
}
686
687
1.89k
void UpstreamRequest::readDisableOrDefer(bool disable) {
688
1.89k
  if (!upstream_wait_for_response_headers_before_disabling_read_) {
689
0
    if (disable) {
690
0
      parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
691
0
      upstream_->readDisable(true);
692
0
    } else {
693
0
      parent_.cluster()->trafficStats()->upstream_flow_control_resumed_reading_total_.inc();
694
0
      upstream_->readDisable(false);
695
0
    }
696
0
    return;
697
0
  }
698
699
1.89k
  if (disable) {
700
    // See comments on deferred_read_disabling_count_ for when we do and don't defer.
701
954
    if (parent_.downstreamResponseStarted()) {
702
      // The downstream connection is overrun. Pause reads from upstream.
703
      // If there are multiple calls to readDisable either the codec (H2) or the
704
      // underlying Network::Connection (H1) will handle reference counting.
705
954
      parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
706
954
      upstream_->readDisable(disable);
707
954
    } else {
708
0
      ++deferred_read_disabling_count_;
709
0
    }
710
954
    return;
711
954
  }
712
713
  // One source of connection blockage has buffer available.
714
942
  if (deferred_read_disabling_count_ > 0) {
715
0
    ASSERT(!parent_.downstreamResponseStarted());
716
    // Cancel out an existing deferred read disabling.
717
0
    --deferred_read_disabling_count_;
718
0
    return;
719
0
  }
720
942
  ASSERT(parent_.downstreamResponseStarted());
721
  // Pass this on to the stream, which
722
  // will resume reads if this was the last remaining high watermark.
723
942
  parent_.cluster()->trafficStats()->upstream_flow_control_resumed_reading_total_.inc();
724
942
  upstream_->readDisable(disable);
725
942
}
726
727
954
void UpstreamRequest::DownstreamWatermarkManager::onAboveWriteBufferHighWatermark() {
728
954
  ASSERT(parent_.upstream_);
729
954
  parent_.readDisableOrDefer(true);
730
954
}
731
732
942
void UpstreamRequest::DownstreamWatermarkManager::onBelowWriteBufferLowWatermark() {
733
942
  ASSERT(parent_.upstream_);
734
942
  parent_.readDisableOrDefer(false);
735
942
}
736
737
0
void UpstreamRequest::disableDataFromDownstreamForFlowControl() {
738
0
  parent_.cluster()->trafficStats()->upstream_flow_control_backed_up_total_.inc();
739
0
  parent_.callbacks()->onDecoderFilterAboveWriteBufferHighWatermark();
740
0
  ++downstream_data_disabled_;
741
0
}
742
743
0
void UpstreamRequest::enableDataFromDownstreamForFlowControl() {
744
0
  parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc();
745
0
  parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
746
0
  ASSERT(downstream_data_disabled_ != 0);
747
0
  if (downstream_data_disabled_ > 0) {
748
0
    --downstream_data_disabled_;
749
0
  }
750
0
}
751
752
6.08k
Http::RequestHeaderMapOptRef UpstreamRequestFilterManagerCallbacks::requestHeaders() {
753
6.08k
  return {*upstream_request_.parent_.downstreamHeaders()};
754
6.08k
}
755
756
12.9k
Http::RequestTrailerMapOptRef UpstreamRequestFilterManagerCallbacks::requestTrailers() {
757
12.9k
  if (upstream_request_.parent_.downstreamTrailers()) {
758
0
    return {*upstream_request_.parent_.downstreamTrailers()};
759
0
  }
760
12.9k
  if (trailers_) {
761
0
    return {*trailers_};
762
0
  }
763
12.9k
  return {};
764
12.9k
}
765
766
0
const ScopeTrackedObject& UpstreamRequestFilterManagerCallbacks::scope() {
767
0
  return upstream_request_.parent_.callbacks()->scope();
768
0
}
769
770
0
OptRef<const Tracing::Config> UpstreamRequestFilterManagerCallbacks::tracingConfig() const {
771
0
  return upstream_request_.parent_.callbacks()->tracingConfig();
772
0
}
773
774
0
Tracing::Span& UpstreamRequestFilterManagerCallbacks::activeSpan() {
775
0
  return upstream_request_.parent_.callbacks()->activeSpan();
776
0
}
777
778
void UpstreamRequestFilterManagerCallbacks::resetStream(
779
74
    Http::StreamResetReason reset_reason, absl::string_view transport_failure_reason) {
780
  // The filter manager needs to disambiguate between a filter-driven reset,
781
  // which should force reset the stream, and a codec driven reset, which should
782
  // tell the router the stream reset, and let the router make the decision to
783
  // send a local reply, or retry the stream.
784
74
  if (reset_reason == Http::StreamResetReason::LocalReset &&
785
74
      transport_failure_reason != "codec_error") {
786
0
    upstream_request_.parent_.callbacks()->resetStream();
787
0
    return;
788
0
  }
789
74
  return upstream_request_.onResetStream(reset_reason, transport_failure_reason);
790
74
}
791
792
0
Upstream::ClusterInfoConstSharedPtr UpstreamRequestFilterManagerCallbacks::clusterInfo() {
793
0
  return upstream_request_.parent_.callbacks()->clusterInfo();
794
0
}
795
796
Http::Http1StreamEncoderOptionsOptRef
797
0
UpstreamRequestFilterManagerCallbacks::http1StreamEncoderOptions() {
798
0
  return upstream_request_.parent_.callbacks()->http1StreamEncoderOptions();
799
0
}
800
801
} // namespace Router
802
} // namespace Envoy