1
#include "source/common/router/upstream_request.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8

            
9
#include "envoy/event/dispatcher.h"
10
#include "envoy/event/timer.h"
11
#include "envoy/grpc/status.h"
12
#include "envoy/http/conn_pool.h"
13
#include "envoy/http/header_map.h"
14
#include "envoy/runtime/runtime.h"
15
#include "envoy/upstream/cluster_manager.h"
16
#include "envoy/upstream/upstream.h"
17

            
18
#include "source/common/common/assert.h"
19
#include "source/common/common/dump_state_utils.h"
20
#include "source/common/common/empty_string.h"
21
#include "source/common/common/enum_to_int.h"
22
#include "source/common/common/scope_tracker.h"
23
#include "source/common/common/utility.h"
24
#include "source/common/grpc/common.h"
25
#include "source/common/http/codes.h"
26
#include "source/common/http/header_map_impl.h"
27
#include "source/common/http/headers.h"
28
#include "source/common/http/message_impl.h"
29
#include "source/common/http/utility.h"
30
#include "source/common/network/application_protocol.h"
31
#include "source/common/network/transport_socket_options_impl.h"
32
#include "source/common/network/upstream_server_name.h"
33
#include "source/common/network/upstream_subject_alt_names.h"
34
#include "source/common/router/config_impl.h"
35
#include "source/common/router/debug_config.h"
36
#include "source/common/router/router.h"
37
#include "source/common/router/upstream_codec_filter.h"
38
#include "source/common/stream_info/uint32_accessor_impl.h"
39
#include "source/common/tracing/http_tracer_impl.h"
40
#include "source/extensions/common/proxy_protocol/proxy_protocol_header.h"
41

            
42
namespace Envoy {
43
namespace Router {
44

            
45
// The upstream HTTP filter manager class.
46
class UpstreamFilterManager : public Http::FilterManager {
47
public:
48
  UpstreamFilterManager(Http::FilterManagerCallbacks& filter_manager_callbacks,
49
                        Event::Dispatcher& dispatcher, OptRef<const Network::Connection> connection,
50
                        uint64_t stream_id, Buffer::BufferMemoryAccountSharedPtr account,
51
                        bool proxy_100_continue, uint32_t buffer_limit, UpstreamRequest& request)
52
47497
      : FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
53
47497
                      proxy_100_continue, buffer_limit),
54
47497
        upstream_request_(request) {}
55

            
56
230985
  StreamInfo::StreamInfo& streamInfo() override {
57
230985
    return upstream_request_.parent_.callbacks()->streamInfo();
58
230985
  }
59
4
  const StreamInfo::StreamInfo& streamInfo() const override {
60
4
    return upstream_request_.parent_.callbacks()->streamInfo();
61
4
  }
62
  // Send local replies via the downstream HTTP filter manager.
63
  // Local replies will not be seen by upstream HTTP filters.
64
  void sendLocalReply(Http::Code code, absl::string_view body,
65
                      const std::function<void(Http::ResponseHeaderMap& headers)>& modify_headers,
66
                      const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
67
717
                      absl::string_view details) override {
68
717
    state().decoder_filter_chain_aborted_ = true;
69
717
    state().encoder_filter_chain_aborted_ = true;
70
717
    state().encoder_filter_chain_complete_ = true;
71
717
    state().observed_encode_end_stream_ = true;
72
    // TODO(alyssawilk) this should be done through the router to play well with hedging.
73
717
    upstream_request_.parent_.callbacks()->sendLocalReply(code, body, modify_headers, grpc_status,
74
717
                                                          details);
75
717
  }
76
675
  void executeLocalReplyIfPrepared() override {}
77
  UpstreamRequest& upstream_request_;
78
};
79

            
80
UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
81
                                 std::unique_ptr<GenericConnPool>&& conn_pool,
82
                                 bool can_send_early_data, bool can_use_http3,
83
                                 bool enable_half_close)
84
47497
    : parent_(parent), conn_pool_(std::move(conn_pool)),
85
47497
      stream_info_(parent_.callbacks()->dispatcher().timeSource(),
86
47497
                   parent_.callbacks()->connection().has_value()
87
47497
                       ? parent_.callbacks()->connection()->connectionInfoProviderSharedPtr()
88
47497
                       : nullptr,
89
47497
                   StreamInfo::FilterState::LifeSpan::FilterChain),
90
47497
      start_time_(parent_.callbacks()->dispatcher().timeSource().monotonicTime()),
91
47497
      upstream_canary_(false), router_sent_end_stream_(false), encode_trailers_(false),
92
47497
      retried_(false), awaiting_headers_(true), outlier_detection_timeout_recorded_(false),
93
47497
      create_per_try_timeout_on_request_complete_(false), paused_for_connect_(false),
94
47497
      paused_for_websocket_(false), reset_stream_(false),
95
47497
      record_timeout_budget_(parent_.cluster()->timeoutBudgetStats().has_value()),
96
47497
      cleaned_up_(false), had_upstream_(false),
97
47497
      stream_options_({can_send_early_data, can_use_http3}), grpc_rq_success_deferred_(false),
98
47497
      enable_half_close_(enable_half_close) {
99
  // Get tracing config once and reuse it.
100
47497
  auto tracing_config = parent_.callbacks()->tracingConfig();
101

            
102
47497
  if (tracing_config.has_value()) {
103
3873
    if (tracing_config->spawnUpstreamSpan() || parent_.config().start_child_span_) {
104
22
      span_ = parent_.callbacks()->activeSpan().spawnChild(
105
22
          tracing_config.value().get(),
106
22
          absl::StrCat("router ", parent.cluster()->observabilityName(), " egress"),
107
22
          parent_.callbacks()->dispatcher().timeSource().systemTime());
108
22
      if (parent.attemptCount() != 1) {
109
        // This is a retry request, add this metadata to span.
110
3
        span_->setTag(Tracing::Tags::get().RetryCount, std::to_string(parent.attemptCount() - 1));
111
3
      }
112
22
    }
113
3873
  }
114

            
115
  // The router checks that the connection pool is non-null before creating the upstream request.
116
47497
  auto upstream_host = conn_pool_->host();
117

            
118
  // Only inject trace context if propagation is not disabled.
119
  // When noContextPropagation is true, spans are still reported but trace context
120
  // headers (e.g., traceparent, X-B3-*) are not injected into upstream requests.
121
47497
  const bool no_context_propagation =
122
47497
      tracing_config.has_value() && tracing_config->noContextPropagation();
123

            
124
47497
  if (!no_context_propagation) {
125
47495
    Tracing::HttpTraceContext trace_context(*parent_.downstreamHeaders());
126
47495
    Tracing::UpstreamContext upstream_context(upstream_host.get(),           // host_
127
47495
                                              &upstream_host->cluster(),     // cluster_
128
47495
                                              Tracing::ServiceType::Unknown, // service_type_
129
47495
                                              false                          // async_client_span_
130
47495
    );
131

            
132
47495
    if (span_ != nullptr) {
133
21
      span_->injectContext(trace_context, upstream_context);
134
47476
    } else {
135
      // No independent child span for current upstream request then inject the parent span's
136
      // tracing context into the request headers.
137
      // The injectContext() of the parent span may be called repeatedly when the request is
138
      // retried.
139
47474
      parent_.callbacks()->activeSpan().injectContext(trace_context, upstream_context);
140
47474
    }
141
47495
  }
142

            
143
47497
  stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
144
47497
  stream_info_.route_ = parent_.callbacks()->route();
145
47497
  stream_info_.upstreamInfo()->setUpstreamHost(upstream_host);
146
47497
  parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo());
147

            
148
47497
  stream_info_.healthCheck(parent_.callbacks()->streamInfo().healthCheck());
149
47497
  stream_info_.setIsShadow(parent_.callbacks()->streamInfo().isShadow());
150
47497
  absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info =
151
47497
      parent_.callbacks()->streamInfo().upstreamClusterInfo();
152
47497
  if (cluster_info.has_value()) {
153
47220
    stream_info_.setUpstreamClusterInfo(*cluster_info);
154
47220
  }
155

            
156
  // Set up the upstream HTTP filter manager.
157
47497
  filter_manager_callbacks_ = std::make_unique<UpstreamRequestFilterManagerCallbacks>(*this);
158
47497
  filter_manager_ = std::make_unique<UpstreamFilterManager>(
159
47497
      *filter_manager_callbacks_, parent_.callbacks()->dispatcher(), UpstreamRequest::connection(),
160
47497
      parent_.callbacks()->streamId(), parent_.callbacks()->account(), true,
161
47497
      parent_.callbacks()->bufferLimit(), *this);
162
  // Attempt to create custom cluster-specified filter chain
163
47497
  bool created = filter_manager_->createFilterChain(*parent_.cluster()).created();
164

            
165
47497
  if (!created) {
166
    // Attempt to create custom router-specified filter chain.
167
46110
    created = filter_manager_->createFilterChain(parent_.config()).created();
168
46110
  }
169
47497
  if (!created) {
170
    // Neither cluster nor router have a custom filter chain; add the default
171
    // cluster filter chain, which only consists of the codec filter.
172
46071
    created = filter_manager_->createFilterChain(defaultUpstreamHttpFilterChainFactory()).created();
173
46071
  }
174
  // There will always be a codec filter present, which sets the upstream
175
  // interface. Fast-fail any tests that don't set up mocks correctly.
176
47497
  ASSERT(created && upstream_interface_.has_value());
177
47497
}
178

            
179
47497
UpstreamRequest::~UpstreamRequest() { cleanUp(); }
180

            
181
94292
void UpstreamRequest::cleanUp() {
182
94292
  if (cleaned_up_) {
183
46795
    return;
184
46795
  }
185
47497
  cleaned_up_ = true;
186

            
187
47497
  filter_manager_->destroyFilters();
188

            
189
47497
  if (span_ != nullptr) {
190
22
    auto tracing_config = parent_.callbacks()->tracingConfig();
191
22
    ASSERT(tracing_config.has_value());
192
22
    Tracing::HttpTracerUtility::finalizeUpstreamSpan(*span_, stream_info_,
193
22
                                                     tracing_config.value().get());
194
22
  }
195

            
196
47497
  if (per_try_timeout_ != nullptr) {
197
    // Allows for testing.
198
2318
    per_try_timeout_->disableTimer();
199
2318
  }
200

            
201
47497
  if (per_try_idle_timeout_ != nullptr) {
202
    // Allows for testing.
203
15
    per_try_idle_timeout_->disableTimer();
204
15
  }
205

            
206
47497
  if (max_stream_duration_timer_ != nullptr) {
207
74
    max_stream_duration_timer_->disableTimer();
208
74
  }
209

            
210
47497
  if (upstream_log_flush_timer_ != nullptr) {
211
2
    upstream_log_flush_timer_->disableTimer();
212
2
  }
213

            
214
47497
  clearRequestEncoder();
215

            
216
  // If desired, fire the per-try histogram when the UpstreamRequest
217
  // completes.
218
47497
  if (record_timeout_budget_) {
219
374
    Event::Dispatcher& dispatcher = parent_.callbacks()->dispatcher();
220
374
    const MonotonicTime end_time = dispatcher.timeSource().monotonicTime();
221
374
    const std::chrono::milliseconds response_time =
222
374
        std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time_);
223
374
    Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = parent_.cluster()->timeoutBudgetStats();
224
374
    tb_stats->get().upstream_rq_timeout_budget_per_try_percent_used_.recordValue(
225
374
        FilterUtility::percentageOfTimeout(response_time, parent_.timeout().per_try_timeout_));
226
374
  }
227

            
228
  // Ditto for request/response size histograms.
229
47497
  Upstream::ClusterRequestResponseSizeStatsOptRef req_resp_stats_opt =
230
47497
      parent_.cluster()->requestResponseSizeStats();
231
47497
  if (req_resp_stats_opt.has_value() && parent_.downstreamHeaders()) {
232
442
    auto& req_resp_stats = req_resp_stats_opt->get();
233
442
    req_resp_stats.upstream_rq_headers_size_.recordValue(parent_.downstreamHeaders()->byteSize());
234
442
    req_resp_stats.upstream_rq_headers_count_.recordValue(parent_.downstreamHeaders()->size());
235
442
    req_resp_stats.upstream_rq_body_size_.recordValue(stream_info_.bytesSent());
236

            
237
442
    if (response_headers_size_.has_value()) {
238
295
      req_resp_stats.upstream_rs_headers_size_.recordValue(response_headers_size_.value());
239
295
      req_resp_stats.upstream_rs_headers_count_.recordValue(response_headers_count_.value());
240
295
      req_resp_stats.upstream_rs_body_size_.recordValue(stream_info_.bytesReceived());
241
295
    }
242
442
  }
243

            
244
47497
  stream_info_.onRequestComplete();
245
47497
  upstreamLog(AccessLog::AccessLogType::UpstreamEnd);
246

            
247
47526
  while (downstream_data_disabled_ != 0) {
248
29
    parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
249
29
    parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc();
250
29
    --downstream_data_disabled_;
251
29
  }
252
  // The upstream HTTP filter chain callbacks own headers/trailers while they are traversing the
253
  // filter chain. Make sure to not delete them immediately when the stream ends, as the stream
254
  // often ends during filter chain processing and it causes use-after-free violations.
255
47497
  parent_.callbacks()->dispatcher().deferredDelete(std::move(filter_manager_callbacks_));
256
47497
}
257

            
258
47504
void UpstreamRequest::upstreamLog(AccessLog::AccessLogType access_log_type) {
259
47504
  const Formatter::Context log_context{parent_.downstreamHeaders(),
260
47504
                                       upstream_headers_.get(),
261
47504
                                       upstream_trailers_.get(),
262
47504
                                       {},
263
47504
                                       access_log_type};
264

            
265
47515
  for (const auto& upstream_log : parent_.config().upstream_logs_) {
266
46
    upstream_log->log(log_context, stream_info_);
267
46
  }
268
47504
}
269

            
270
// This is called by the FilterManager when all filters have processed 1xx headers. Forward them
271
// on to the router.
272
150
void UpstreamRequest::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) {
273
150
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
274

            
275
150
  ASSERT(Http::HeaderUtility::isSpecial1xx(*headers));
276
150
  addResponseHeadersStat(headers->byteSize(), headers->size());
277
150
  maybeHandleDeferredReadDisable();
278
150
  parent_.onUpstream1xxHeaders(std::move(headers), *this);
279
150
}
280

            
281
// This is called by the FilterManager when all filters have processed headers. Forward them
282
// on to the router.
283
40156
void UpstreamRequest::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
284
40156
  ASSERT(headers.get());
285
40156
  ENVOY_STREAM_LOG(trace, "end_stream: {}, upstream response headers:\n{}", *parent_.callbacks(),
286
40156
                   end_stream, *headers);
287
40156
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
288

            
289
40156
  resetPerTryIdleTimer();
290

            
291
40156
  addResponseHeadersStat(headers->byteSize(), headers->size());
292

            
293
  // We drop unsupported 1xx on the floor here. 101 upgrade headers need to be passed to the client
294
  // as part of the final response. Most 1xx headers are handled in onUpstream1xxHeaders.
295
  //
296
  // We could in principle handle other headers here, but this might result in the double invocation
297
  // of decodeHeaders() (once for informational, again for non-informational), which is likely an
298
  // easy to miss corner case in the filter and HCM contract.
299
  //
300
  // This filtering is done early in upstream request, unlike 100 coalescing which is performed in
301
  // the router filter, since the filtering only depends on the state of a single upstream, and we
302
  // don't want to confuse accounting such as onFirstUpstreamRxByteReceived() with informational
303
  // headers.
304
40156
  const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
305
40156
  if (Http::CodeUtility::is1xx(response_code) &&
306
40156
      response_code != enumToInt(Http::Code::SwitchingProtocols)) {
307
1
    return;
308
1
  }
309

            
310
40155
  awaiting_headers_ = false;
311
40155
  if (span_ != nullptr) {
312
20
    Tracing::HttpTracerUtility::onUpstreamResponseHeaders(*span_, headers.get());
313
20
  }
314
40155
  if (!parent_.config().upstream_logs_.empty()) {
315
26
    upstream_headers_ = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(*headers);
316
26
  }
317
40155
  stream_info_.setResponseCode(static_cast<uint32_t>(response_code));
318

            
319
40155
  maybeHandleDeferredReadDisable();
320
40155
  ASSERT(headers.get());
321

            
322
40155
  parent_.onUpstreamHeaders(response_code, std::move(headers), *this, end_stream);
323
40155
}
324

            
325
40305
void UpstreamRequest::maybeHandleDeferredReadDisable() {
326
40361
  for (; deferred_read_disabling_count_ > 0; --deferred_read_disabling_count_) {
327
    // If the deferred read disabling count hasn't been cancelled out by read
328
    // enabling count so far, stop the upstream from reading the rest response.
329
    // Because readDisable keeps track of how many time it is called with
330
    // "true" or "false", here it has to be called with "true" the same number
331
    // of times as it would be called with "false" in the future.
332
56
    parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
333
56
    upstream_->readDisable(true);
334
56
  }
335
40305
}
336

            
337
384028
void UpstreamRequest::decodeData(Buffer::Instance& data, bool end_stream) {
338
384028
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
339

            
340
384028
  resetPerTryIdleTimer();
341
384028
  stream_info_.addBytesReceived(data.length());
342
384028
  parent_.onUpstreamData(data, *this, end_stream);
343
384028
}
344

            
345
780
void UpstreamRequest::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) {
346
780
  ENVOY_STREAM_LOG(trace, "upstream response trailers:\n{}", *parent_.callbacks(), *trailers);
347
780
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
348

            
349
780
  if (span_ != nullptr) {
350
    Tracing::HttpTracerUtility::onUpstreamResponseTrailers(*span_, trailers.get());
351
  }
352
780
  if (!parent_.config().upstream_logs_.empty()) {
353
10
    upstream_trailers_ = Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*trailers);
354
10
  }
355
780
  parent_.onUpstreamTrailers(std::move(trailers), *this);
356
780
}
357

            
358
2
void UpstreamRequest::dumpState(std::ostream& os, int indent_level) const {
359
2
  const char* spaces = spacesForLevel(indent_level);
360
2
  os << spaces << "UpstreamRequest " << this << "\n";
361
2
  if (connection()) {
362
2
    const auto addressProvider = connection()->connectionInfoProviderSharedPtr();
363
2
    DUMP_DETAILS(addressProvider);
364
2
  }
365
2
  const Http::RequestHeaderMap* request_headers = parent_.downstreamHeaders();
366
2
  DUMP_DETAILS(request_headers);
367
2
  if (filter_manager_) {
368
2
    filter_manager_->dumpState(os, indent_level);
369
2
  }
370
2
}
371

            
372
46680
const Route& UpstreamRequest::route() const { return *parent_.callbacks()->route(); }
373

            
374
47505
OptRef<const Network::Connection> UpstreamRequest::connection() const {
375
47505
  return parent_.callbacks()->connection();
376
47505
}
377

            
378
1111
void UpstreamRequest::decodeMetadata(Http::MetadataMapPtr&& metadata_map) {
379
1111
  parent_.onUpstreamMetadata(std::move(metadata_map));
380
1111
}
381

            
382
void UpstreamRequest::maybeEndDecode(bool end_stream) {
383
  if (end_stream) {
384
    upstreamTiming().onLastUpstreamRxByteReceived(parent_.callbacks()->dispatcher().timeSource());
385
  }
386
}
387

            
388
void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
389
46966
                                             bool pool_success) {
390
46966
  StreamInfo::UpstreamInfo& upstream_info = *streamInfo().upstreamInfo();
391
46966
  upstream_info.setUpstreamHost(host);
392
46966
  upstream_host_ = host;
393
46966
  parent_.onUpstreamHostSelected(host, pool_success);
394
46966
}
395

            
396
47467
void UpstreamRequest::acceptHeadersFromRouter(bool end_stream) {
397
47467
  ASSERT(!router_sent_end_stream_);
398
47467
  router_sent_end_stream_ = end_stream;
399

            
400
  // Make sure that when we are forwarding CONNECT payload we do not do so until
401
  // the upstream has accepted the CONNECT request.
402
  // This must be done before conn_pool->newStream, as onPoolReady un-pauses for CONNECT
403
  // termination.
404
47467
  auto* headers = parent_.downstreamHeaders();
405
47467
  if (headers->getMethodValue() == Http::Headers::get().MethodValues.Connect) {
406
842
    paused_for_connect_ = true;
407
    // If this is a websocket upgrade request, pause the request until the upstream sends
408
    // the 101 Switching Protocols response code. Using the else logic here to obey CONNECT
409
    // method which is expecting 2xx response.
410
47403
  } else if (Http::Utility::isWebSocketUpgradeRequest(*headers)) {
411
100
    paused_for_websocket_ = true;
412

            
413
100
    if (Runtime::runtimeFeatureEnabled(
414
100
            "envoy.reloadable_features.websocket_enable_timeout_on_upgrade_response")) {
415
      // For websocket upgrades, we need to set up timeouts immediately
416
      // because the upstream request will be paused waiting for the upgrade response.
417
100
      if (!per_try_timeout_) {
418
100
        setupPerTryTimeout();
419
100
      }
420
100
      parent_.setupRouteTimeoutForWebsocketUpgrade();
421
100
    }
422
100
  }
423

            
424
  // Kick off creation of the upstream connection immediately upon receiving headers.
425
  // In future it may be possible for upstream HTTP filters to delay this, or influence connection
426
  // creation but for now optimize for minimal latency and fetch the connection
427
  // as soon as possible.
428
47467
  conn_pool_->newStream(this);
429

            
430
47467
  if (parent_.config().upstream_log_flush_interval_.has_value()) {
431
3
    upstream_log_flush_timer_ = parent_.callbacks()->dispatcher().createTimer([this]() -> void {
432
      // If the request is complete, we've already done the stream-end upstream log, and shouldn't
433
      // do the periodic log.
434
3
      if (!streamInfo().requestComplete().has_value()) {
435
3
        upstreamLog(AccessLog::AccessLogType::UpstreamPeriodic);
436
3
        resetUpstreamLogFlushTimer();
437
3
      }
438
      // Both downstream and upstream bytes meters may not be initialized when
439
      // the timer goes off, e.g. if it takes longer than the interval for a
440
      // connection to be initialized; check for nullptr.
441
3
      auto& downstream_bytes_meter = stream_info_.getDownstreamBytesMeter();
442
3
      auto& upstream_bytes_meter = stream_info_.getUpstreamBytesMeter();
443
3
      const SystemTime now = parent_.callbacks()->dispatcher().timeSource().systemTime();
444
3
      if (downstream_bytes_meter) {
445
3
        downstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now);
446
3
      }
447
3
      if (upstream_bytes_meter) {
448
3
        upstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now);
449
3
      }
450
3
    });
451

            
452
2
    resetUpstreamLogFlushTimer();
453
2
  }
454

            
455
47467
  filter_manager_->requestHeadersInitialized();
456
47467
  filter_manager_->streamInfo().setRequestHeaders(*parent_.downstreamHeaders());
457
47467
  filter_manager_->decodeHeaders(*parent_.downstreamHeaders(), end_stream);
458
47467
}
459

            
460
194260
void UpstreamRequest::acceptDataFromRouter(Buffer::Instance& data, bool end_stream) {
461
194260
  ASSERT(!router_sent_end_stream_);
462
194260
  router_sent_end_stream_ = end_stream;
463

            
464
194260
  filter_manager_->decodeData(data, end_stream);
465
194260
}
466

            
467
463
void UpstreamRequest::acceptTrailersFromRouter(Http::RequestTrailerMap& trailers) {
468
463
  ASSERT(!router_sent_end_stream_);
469
463
  router_sent_end_stream_ = true;
470
463
  encode_trailers_ = true;
471

            
472
463
  filter_manager_->decodeTrailers(trailers);
473
463
}
474

            
475
1281
void UpstreamRequest::acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr) {
476
1281
  filter_manager_->decodeMetadata(*metadata_map_ptr);
477
1281
}
478

            
479
void UpstreamRequest::onResetStream(Http::StreamResetReason reason,
480
2794
                                    absl::string_view transport_failure_reason) {
481
2794
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
482

            
483
2794
  if (span_ != nullptr) {
484
    // Add tags about reset.
485
2
    span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
486
2
    span_->setTag(Tracing::Tags::get().ErrorReason, Http::Utility::resetReasonToString(reason));
487
2
  }
488
2794
  clearRequestEncoder();
489
2794
  awaiting_headers_ = false;
490

            
491
2794
  stream_info_.setResponseFlag(Filter::streamResetReasonToResponseFlag(reason));
492
2794
  parent_.onUpstreamReset(reason, transport_failure_reason, *this);
493
2794
}
494

            
495
7913
void UpstreamRequest::resetStream() {
496
7913
  if (conn_pool_->cancelAnyPendingStream()) {
497
490
    ENVOY_STREAM_LOG(debug, "canceled pool request", *parent_.callbacks());
498
490
    ASSERT(!upstream_);
499
490
  }
500

            
501
  // Don't reset the stream if we're already done with it.
502
7913
  if (upstreamTiming().last_upstream_tx_byte_sent_.has_value() &&
503
7913
      upstreamTiming().last_upstream_rx_byte_received_.has_value()) {
504
346
    return;
505
346
  }
506

            
507
7567
  if (span_ != nullptr) {
508
    // Add tags about the cancellation.
509
1
    span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True);
510
1
  }
511

            
512
7567
  if (upstream_) {
513
6963
    ENVOY_STREAM_LOG(debug, "resetting pool request", *parent_.callbacks());
514
6963
    upstream_->resetStream();
515
6963
    clearRequestEncoder();
516
6963
  }
517
7567
  reset_stream_ = true;
518
7567
}
519

            
520
424199
void UpstreamRequest::resetPerTryIdleTimer() {
521
424199
  if (per_try_idle_timeout_ != nullptr) {
522
30
    per_try_idle_timeout_->enableTimer(parent_.timeout().per_try_idle_timeout_);
523
30
  }
524
424199
}
525

            
526
5
void UpstreamRequest::resetUpstreamLogFlushTimer() {
527
5
  if (upstream_log_flush_timer_ != nullptr) {
528
5
    upstream_log_flush_timer_->enableTimer(parent_.config().upstream_log_flush_interval_.value());
529
5
  }
530
5
}
531

            
532
39154
void UpstreamRequest::setupPerTryTimeout() {
533
39154
  ASSERT(!per_try_timeout_);
534
39154
  if (parent_.timeout().per_try_timeout_.count() > 0) {
535
2318
    per_try_timeout_ =
536
2318
        parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryTimeout(); });
537
2318
    per_try_timeout_->enableTimer(parent_.timeout().per_try_timeout_);
538
2318
  }
539

            
540
39154
  ASSERT(!per_try_idle_timeout_);
541
39154
  if (parent_.timeout().per_try_idle_timeout_.count() > 0) {
542
15
    per_try_idle_timeout_ =
543
15
        parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryIdleTimeout(); });
544
15
    resetPerTryIdleTimer();
545
15
  }
546
39154
}
547

            
548
13
void UpstreamRequest::onPerTryIdleTimeout() {
549
13
  ENVOY_STREAM_LOG(debug, "upstream per try idle timeout", *parent_.callbacks());
550
13
  if (per_try_timeout_) {
551
    // Disable the per try idle timer, so it does not trigger further retries
552
1
    per_try_timeout_->disableTimer();
553
1
  }
554
13
  stream_info_.setResponseFlag(StreamInfo::CoreResponseFlag::StreamIdleTimeout);
555
13
  parent_.onPerTryIdleTimeout(*this);
556
13
}
557

            
558
42
void UpstreamRequest::onPerTryTimeout() {
559
42
  if (per_try_idle_timeout_) {
560
    // Delete the per try idle timer, so it does not trigger further retries.
561
    // The timer has to be deleted to prevent data flow from re-arming it.
562
    per_try_idle_timeout_.reset();
563
  }
564
  // If we've sent anything downstream, ignore the per try timeout and let the response continue
565
  // up to the global timeout
566
42
  if (!parent_.downstreamResponseStarted()) {
567
37
    ENVOY_STREAM_LOG(debug, "upstream per try timeout", *parent_.callbacks());
568

            
569
37
    stream_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamRequestTimeout);
570
37
    parent_.onPerTryTimeout(*this);
571
41
  } else {
572
5
    ENVOY_STREAM_LOG(debug,
573
5
                     "ignored upstream per try timeout due to already started downstream response",
574
5
                     *parent_.callbacks());
575
5
  }
576
42
}
577

            
578
46966
void UpstreamRequest::recordConnectionPoolCallbackLatency() {
579
46966
  upstreamTiming().recordConnectionPoolCallbackLatency(
580
46966
      start_time_, parent_.callbacks()->dispatcher().timeSource());
581
46966
}
582

            
583
void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason,
584
                                    absl::string_view transport_failure_reason,
585
291
                                    Upstream::HostDescriptionConstSharedPtr host) {
586
291
  recordConnectionPoolCallbackLatency();
587
291
  Http::StreamResetReason reset_reason = [](ConnectionPool::PoolFailureReason reason) {
588
291
    switch (reason) {
589
34
    case ConnectionPool::PoolFailureReason::Overflow:
590
34
      return Http::StreamResetReason::Overflow;
591
208
    case ConnectionPool::PoolFailureReason::RemoteConnectionFailure:
592
208
      return Http::StreamResetReason::RemoteConnectionFailure;
593
46
    case ConnectionPool::PoolFailureReason::LocalConnectionFailure:
594
46
      return Http::StreamResetReason::LocalConnectionFailure;
595
3
    case ConnectionPool::PoolFailureReason::Timeout:
596
3
      return Http::StreamResetReason::ConnectionTimeout;
597
291
    }
598
    PANIC_DUE_TO_CORRUPT_ENUM;
599
  }(reason);
600

            
601
291
  stream_info_.upstreamInfo()->setUpstreamTransportFailureReason(transport_failure_reason);
602

            
603
  // Mimic an upstream reset.
604
291
  onUpstreamHostSelected(host, false);
605
291
  onResetStream(reset_reason, transport_failure_reason);
606
291
}
607

            
608
void UpstreamRequest::onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
609
                                  Upstream::HostDescriptionConstSharedPtr host,
610
                                  const Network::ConnectionInfoProvider& address_provider,
611
                                  StreamInfo::StreamInfo& info,
612
46675
                                  absl::optional<Http::Protocol> protocol) {
613
  // This may be called under an existing ScopeTrackerScopeState but it will unwind correctly.
614
46675
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
615
46675
  ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks());
616
46675
  recordConnectionPoolCallbackLatency();
617
46675
  upstream_ = std::move(upstream);
618
46675
  had_upstream_ = true;
619
  // Have the upstream use the account of the downstream.
620
46675
  upstream_->setAccount(parent_.callbacks()->account());
621

            
622
46675
  host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess);
623

            
624
46675
  if (protocol) {
625
46362
    stream_info_.protocol(protocol.value());
626
46564
  } else {
627
    // We only pause for CONNECT and WebSocket for HTTP upstreams. If this is a TCP upstream,
628
    // unpause.
629
313
    paused_for_connect_ = false;
630
313
    paused_for_websocket_ = false;
631
313
  }
632

            
633
46675
  StreamInfo::UpstreamInfo& upstream_info = *stream_info_.upstreamInfo();
634
46675
  if (info.upstreamInfo()) {
635
46643
    auto& upstream_timing = info.upstreamInfo()->upstreamTiming();
636
46643
    upstreamTiming().upstream_connect_start_ = upstream_timing.upstream_connect_start_;
637
46643
    upstreamTiming().upstream_connect_complete_ = upstream_timing.upstream_connect_complete_;
638
46643
    upstreamTiming().upstream_handshake_complete_ = upstream_timing.upstream_handshake_complete_;
639
46643
    upstream_info.setUpstreamNumStreams(info.upstreamInfo()->upstreamNumStreams());
640
46643
  }
641

            
642
  // Upstream HTTP filters might have already created/set a filter state.
643
46675
  const StreamInfo::FilterStateSharedPtr& filter_state = info.filterState();
644
46675
  if (!filter_state) {
645
    upstream_info.setUpstreamFilterState(
646
        std::make_shared<StreamInfo::FilterStateImpl>(StreamInfo::FilterState::LifeSpan::Request));
647
46675
  } else {
648
46675
    upstream_info.setUpstreamFilterState(filter_state);
649
46675
  }
650
46675
  upstream_info.setUpstreamLocalAddress(address_provider.localAddress());
651
46675
  upstream_info.setUpstreamRemoteAddress(address_provider.remoteAddress());
652
46675
  upstream_info.setUpstreamSslConnection(info.downstreamAddressProvider().sslConnection());
653

            
654
  // Invoke the onUpstreamHostSelected after setting ssl_connection_info_ in upstream_info.
655
  // This is because the onUpstreamHostSelected callback may need to access the ssl_connection_info
656
  // to determine the scheme of the upstream connection.
657
46675
  onUpstreamHostSelected(host, true);
658

            
659
46675
  if (info.downstreamAddressProvider().connectionID().has_value()) {
660
46372
    uint64_t connection_id = info.downstreamAddressProvider().connectionID().value();
661
46372
    upstream_info.setUpstreamConnectionId(connection_id);
662
46372
  }
663

            
664
46675
  if (info.downstreamAddressProvider().interfaceName().has_value()) {
665
9
    upstream_info.setUpstreamInterfaceName(
666
9
        info.downstreamAddressProvider().interfaceName().value());
667
9
  }
668

            
669
46675
  stream_info_.setUpstreamBytesMeter(upstream_->bytesMeter());
670
46675
  StreamInfo::StreamInfo::syncUpstreamAndDownstreamBytesMeter(parent_.callbacks()->streamInfo(),
671
46675
                                                              stream_info_);
672
46675
  if (protocol) {
673
46362
    upstream_info.setUpstreamProtocol(protocol.value());
674
46362
  }
675

            
676
46675
  if (parent_.downstreamEndStream() && !per_try_timeout_) {
677
24547
    setupPerTryTimeout();
678
41902
  } else {
679
22128
    create_per_try_timeout_on_request_complete_ = true;
680
22128
  }
681

            
682
  // Make sure the connection manager will inform the downstream watermark manager when the
683
  // downstream buffers are overrun. This may result in immediate watermark callbacks referencing
684
  // the encoder.
685
46675
  parent_.callbacks()->addDownstreamWatermarkCallbacks(downstream_watermark_manager_);
686

            
687
46675
  absl::optional<std::chrono::milliseconds> max_stream_duration;
688
46675
  if (parent_.dynamicMaxStreamDuration().has_value()) {
689
14
    max_stream_duration = parent_.dynamicMaxStreamDuration().value();
690
46661
  } else if (upstream_host_->cluster()
691
46661
                 .httpProtocolOptions()
692
46661
                 .commonHttpProtocolOptions()
693
46661
                 .has_max_stream_duration()) {
694
62
    max_stream_duration = std::chrono::milliseconds(
695
62
        DurationUtil::durationToMilliseconds(upstream_host_->cluster()
696
62
                                                 .httpProtocolOptions()
697
62
                                                 .commonHttpProtocolOptions()
698
62
                                                 .max_stream_duration()));
699
62
  }
700
46675
  if (max_stream_duration.has_value() && max_stream_duration->count()) {
701
74
    max_stream_duration_timer_ = parent_.callbacks()->dispatcher().createTimer(
702
74
        [this]() -> void { onStreamMaxDurationReached(); });
703
74
    max_stream_duration_timer_->enableTimer(*max_stream_duration);
704
74
  }
705

            
706
46675
  const auto* route_entry = route().routeEntry();
707
46675
  if (route_entry->autoHostRewrite() && !host->hostname().empty()) {
708
15
    Http::Utility::updateAuthority(*parent_.downstreamHeaders(), host->hostname(),
709
15
                                   route_entry->appendXfh(),
710
15
                                   !parent_.config().suppress_envoy_headers_);
711
15
  }
712

            
713
46675
  stream_info_.setRequestHeaders(*parent_.downstreamHeaders());
714

            
715
46675
  if (parent_.config().flush_upstream_log_on_upstream_stream_) {
716
4
    upstreamLog(AccessLog::AccessLogType::UpstreamPoolReady);
717
4
  }
718

            
719
46675
  if (address_provider.connectionID() && stream_info_.downstreamAddressProvider().connectionID()) {
720
43514
    ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
721
43514
              address_provider.connectionID().value(),
722
43514
              stream_info_.downstreamAddressProvider().connectionID().value());
723
43514
  }
724

            
725
46677
  for (auto* callback : upstream_callbacks_) {
726
46677
    callback->onUpstreamConnectionEstablished();
727
46677
  }
728
46675
}
729

            
730
140799
UpstreamToDownstream& UpstreamRequest::upstreamToDownstream() { return *upstream_interface_; }
731

            
732
66
void UpstreamRequest::onStreamMaxDurationReached() {
733
66
  upstream_host_->cluster().trafficStats()->upstream_rq_max_duration_reached_.inc();
734

            
735
  // The upstream had closed then try to retry along with retry policy.
736
66
  parent_.onStreamMaxDurationReached(*this);
737
66
}
738

            
739
57254
void UpstreamRequest::clearRequestEncoder() {
740
  // Before clearing the encoder, unsubscribe from callbacks.
741
57254
  if (upstream_) {
742
46675
    parent_.callbacks()->removeDownstreamWatermarkCallbacks(downstream_watermark_manager_);
743
46675
  }
744
57254
  upstream_.reset();
745
57254
}
746

            
747
46994
void UpstreamRequest::readDisableOrDefer(bool disable) {
748
46994
  if (disable) {
749
    // See comments on deferred_read_disabling_count_ for when we do and don't defer.
750
23564
    if (parent_.downstreamResponseStarted()) {
751
      // The downstream connection is overrun. Pause reads from upstream.
752
      // If there are multiple calls to readDisable either the codec (H2) or the
753
      // underlying Network::Connection (H1) will handle reference counting.
754
13575
      parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
755
13575
      upstream_->readDisable(disable);
756
18207
    } else {
757
9989
      ++deferred_read_disabling_count_;
758
9989
    }
759
23564
    return;
760
23564
  }
761

            
762
  // One source of connection blockage has buffer available.
763
23430
  if (deferred_read_disabling_count_ > 0) {
764
9864
    ASSERT(!parent_.downstreamResponseStarted());
765
    // Cancel out an existing deferred read disabling.
766
9864
    --deferred_read_disabling_count_;
767
9864
    return;
768
9864
  }
769
13566
  ASSERT(parent_.downstreamResponseStarted());
770
  // Pass this on to the stream, which
771
  // will resume reads if this was the last remaining high watermark.
772
13566
  parent_.cluster()->trafficStats()->upstream_flow_control_resumed_reading_total_.inc();
773
13566
  upstream_->readDisable(disable);
774
13566
}
775

            
776
19577
void UpstreamRequest::DownstreamWatermarkManager::onAboveWriteBufferHighWatermark() {
777
19577
  ASSERT(parent_.upstream_);
778
19577
  parent_.readDisableOrDefer(true);
779
19577
}
780

            
781
19451
void UpstreamRequest::DownstreamWatermarkManager::onBelowWriteBufferLowWatermark() {
782
19451
  ASSERT(parent_.upstream_);
783
19451
  parent_.readDisableOrDefer(false);
784
19451
}
785

            
786
229252
void UpstreamRequest::disableDataFromDownstreamForFlowControl() {
787
229252
  parent_.cluster()->trafficStats()->upstream_flow_control_backed_up_total_.inc();
788
229252
  parent_.callbacks()->onDecoderFilterAboveWriteBufferHighWatermark();
789
229252
  ++downstream_data_disabled_;
790
229252
}
791

            
792
229223
void UpstreamRequest::enableDataFromDownstreamForFlowControl() {
793
229223
  parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc();
794
229223
  parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
795
229223
  ASSERT(downstream_data_disabled_ != 0);
796
229223
  if (downstream_data_disabled_ > 0) {
797
229223
    --downstream_data_disabled_;
798
229223
  }
799
229223
}
800

            
801
123535
Http::RequestHeaderMapOptRef UpstreamRequestFilterManagerCallbacks::requestHeaders() {
802
123535
  return {*upstream_request_.parent_.downstreamHeaders()};
803
123535
}
804

            
805
447598
Http::RequestTrailerMapOptRef UpstreamRequestFilterManagerCallbacks::requestTrailers() {
806
447598
  if (upstream_request_.parent_.downstreamTrailers()) {
807
579
    return {*upstream_request_.parent_.downstreamTrailers()};
808
579
  }
809
447019
  if (trailers_) {
810
32
    return {*trailers_};
811
32
  }
812
446987
  return {};
813
447019
}
814

            
815
158
const ScopeTrackedObject& UpstreamRequestFilterManagerCallbacks::scope() {
816
158
  return upstream_request_.parent_.callbacks()->scope();
817
158
}
818

            
819
1
OptRef<const Tracing::Config> UpstreamRequestFilterManagerCallbacks::tracingConfig() const {
820
1
  return upstream_request_.parent_.callbacks()->tracingConfig();
821
1
}
822

            
823
59
Tracing::Span& UpstreamRequestFilterManagerCallbacks::activeSpan() {
824
59
  return upstream_request_.parent_.callbacks()->activeSpan();
825
59
}
826

            
827
void UpstreamRequestFilterManagerCallbacks::resetStream(
828
2509
    Http::StreamResetReason reset_reason, absl::string_view transport_failure_reason) {
829
  // The filter manager needs to disambiguate between a filter-driven reset,
830
  // which should force reset the stream, and a codec driven reset, which should
831
  // tell the router the stream reset, and let the router make the decision to
832
  // send a local reply, or retry the stream.
833
2509
  bool is_codec_error = absl::StrContains(transport_failure_reason, "codec_error");
834
2509
  if (reset_reason == Http::StreamResetReason::LocalReset && !is_codec_error) {
835
7
    upstream_request_.parent_.callbacks()->resetStream();
836
7
    return;
837
7
  }
838
2502
  return upstream_request_.onResetStream(reset_reason, transport_failure_reason);
839
2509
}
840

            
841
23
Upstream::ClusterInfoConstSharedPtr UpstreamRequestFilterManagerCallbacks::clusterInfo() {
842
23
  return upstream_request_.parent_.callbacks()->clusterInfo();
843
23
}
844

            
845
Http::Http1StreamEncoderOptionsOptRef
846
1
UpstreamRequestFilterManagerCallbacks::http1StreamEncoderOptions() {
847
1
  return upstream_request_.parent_.callbacks()->http1StreamEncoderOptions();
848
1
}
849

            
850
56
void UpstreamRequestFilterManagerCallbacks::disableRouteTimeoutForWebsocketUpgrade() {
851
56
  upstream_request_.parent_.disableRouteTimeoutForWebsocketUpgrade();
852
56
}
853

            
854
56
void UpstreamRequestFilterManagerCallbacks::disablePerTryTimeoutForWebsocketUpgrade() {
855
  // Disable the per-try timeout and idle timeout timers once websocket upgrade succeeds.
856
  // This mirrors the behavior for route timeout disabling in upgrades.
857
56
  upstream_request_.disablePerTryTimeoutForWebsocketUpgrade();
858
56
}
859

            
860
56
void UpstreamRequest::disablePerTryTimeoutForWebsocketUpgrade() {
861
  // Disable and clear per-try timers so they do not fire after websocket upgrade.
862
56
  if (per_try_timeout_ != nullptr) {
863
    per_try_timeout_->disableTimer();
864
    per_try_timeout_.reset();
865
  }
866
56
  if (per_try_idle_timeout_ != nullptr) {
867
    per_try_idle_timeout_->disableTimer();
868
    per_try_idle_timeout_.reset();
869
  }
870
56
}
871

            
872
} // namespace Router
873
} // namespace Envoy