Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/router/upstream_request.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/router/upstream_request.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8
9
#include "envoy/event/dispatcher.h"
10
#include "envoy/event/timer.h"
11
#include "envoy/grpc/status.h"
12
#include "envoy/http/conn_pool.h"
13
#include "envoy/http/header_map.h"
14
#include "envoy/runtime/runtime.h"
15
#include "envoy/upstream/cluster_manager.h"
16
#include "envoy/upstream/upstream.h"
17
18
#include "source/common/common/assert.h"
19
#include "source/common/common/dump_state_utils.h"
20
#include "source/common/common/empty_string.h"
21
#include "source/common/common/enum_to_int.h"
22
#include "source/common/common/scope_tracker.h"
23
#include "source/common/common/utility.h"
24
#include "source/common/grpc/common.h"
25
#include "source/common/http/codes.h"
26
#include "source/common/http/header_map_impl.h"
27
#include "source/common/http/headers.h"
28
#include "source/common/http/message_impl.h"
29
#include "source/common/http/utility.h"
30
#include "source/common/network/application_protocol.h"
31
#include "source/common/network/transport_socket_options_impl.h"
32
#include "source/common/network/upstream_server_name.h"
33
#include "source/common/network/upstream_subject_alt_names.h"
34
#include "source/common/router/config_impl.h"
35
#include "source/common/router/debug_config.h"
36
#include "source/common/router/router.h"
37
#include "source/common/stream_info/uint32_accessor_impl.h"
38
#include "source/common/tracing/http_tracer_impl.h"
39
#include "source/extensions/common/proxy_protocol/proxy_protocol_header.h"
40
41
namespace Envoy {
42
namespace Router {
43
44
// The upstream HTTP filter manager class.
45
class UpstreamFilterManager : public Http::FilterManager {
46
public:
47
  UpstreamFilterManager(Http::FilterManagerCallbacks& filter_manager_callbacks,
48
                        Event::Dispatcher& dispatcher, OptRef<const Network::Connection> connection,
49
                        uint64_t stream_id, Buffer::BufferMemoryAccountSharedPtr account,
50
                        bool proxy_100_continue, uint32_t buffer_limit,
51
                        const Http::FilterChainFactory& filter_chain_factory,
52
                        UpstreamRequest& request)
53
      : FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
54
                      proxy_100_continue, buffer_limit, filter_chain_factory),
55
1.81k
        upstream_request_(request) {}
56
57
3.45k
  StreamInfo::StreamInfo& streamInfo() override {
58
3.45k
    return upstream_request_.parent_.callbacks()->streamInfo();
59
3.45k
  }
60
0
  const StreamInfo::StreamInfo& streamInfo() const override {
61
0
    return upstream_request_.parent_.callbacks()->streamInfo();
62
0
  }
63
  // Send local replies via the downstream HTTP filter manager.
64
  // Local replies will not be seen by upstream HTTP filters.
65
  void sendLocalReply(Http::Code code, absl::string_view body,
66
                      const std::function<void(Http::ResponseHeaderMap& headers)>& modify_headers,
67
                      const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
68
0
                      absl::string_view details) override {
69
0
    state().decoder_filter_chain_aborted_ = true;
70
0
    state().encoder_filter_chain_aborted_ = true;
71
0
    state().encoder_filter_chain_complete_ = true;
72
0
    state().observed_encode_end_stream_ = true;
73
    // TODO(alyssawilk) this should be done through the router to play well with hedging.
74
0
    upstream_request_.parent_.callbacks()->sendLocalReply(code, body, modify_headers, grpc_status,
75
0
                                                          details);
76
0
  }
77
0
  void executeLocalReplyIfPrepared() override {}
78
  UpstreamRequest& upstream_request_;
79
};
80
81
UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
82
                                 std::unique_ptr<GenericConnPool>&& conn_pool,
83
                                 bool can_send_early_data, bool can_use_http3,
84
                                 bool enable_half_close)
85
    : parent_(parent), conn_pool_(std::move(conn_pool)),
86
      stream_info_(parent_.callbacks()->dispatcher().timeSource(), nullptr,
87
                   StreamInfo::FilterState::LifeSpan::FilterChain),
88
      start_time_(parent_.callbacks()->dispatcher().timeSource().monotonicTime()),
89
      calling_encode_headers_(false), upstream_canary_(false), router_sent_end_stream_(false),
90
      encode_trailers_(false), retried_(false), awaiting_headers_(true),
91
      outlier_detection_timeout_recorded_(false),
92
      create_per_try_timeout_on_request_complete_(false), paused_for_connect_(false),
93
      paused_for_websocket_(false), reset_stream_(false),
94
      record_timeout_budget_(parent_.cluster()->timeoutBudgetStats().has_value()),
95
      cleaned_up_(false), had_upstream_(false),
96
      stream_options_({can_send_early_data, can_use_http3}), grpc_rq_success_deferred_(false),
97
1.81k
      enable_half_close_(enable_half_close) {
98
1.81k
  if (auto tracing_config = parent_.callbacks()->tracingConfig(); tracing_config.has_value()) {
99
829
    if (tracing_config->spawnUpstreamSpan() || parent_.config().start_child_span_) {
100
0
      span_ = parent_.callbacks()->activeSpan().spawnChild(
101
0
          tracing_config.value().get(),
102
0
          absl::StrCat("router ", parent.cluster()->observabilityName(), " egress"),
103
0
          parent_.callbacks()->dispatcher().timeSource().systemTime());
104
0
      if (parent.attemptCount() != 1) {
105
        // This is a retry request, add this metadata to span.
106
0
        span_->setTag(Tracing::Tags::get().RetryCount, std::to_string(parent.attemptCount() - 1));
107
0
      }
108
0
    }
109
829
  }
110
111
  // The router checks that the connection pool is non-null before creating the upstream request.
112
1.81k
  auto upstream_host = conn_pool_->host();
113
1.81k
  Tracing::HttpTraceContext trace_context(*parent_.downstreamHeaders());
114
1.81k
  Tracing::UpstreamContext upstream_context(upstream_host.get(),           // host_
115
1.81k
                                            &upstream_host->cluster(),     // cluster_
116
1.81k
                                            Tracing::ServiceType::Unknown, // service_type_
117
1.81k
                                            false                          // async_client_span_
118
1.81k
  );
119
120
1.81k
  if (span_ != nullptr) {
121
0
    span_->injectContext(trace_context, upstream_context);
122
1.81k
  } else {
123
    // No independent child span for current upstream request then inject the parent span's tracing
124
    // context into the request headers.
125
    // The injectContext() of the parent span may be called repeatedly when the request is retried.
126
1.81k
    parent_.callbacks()->activeSpan().injectContext(trace_context, upstream_context);
127
1.81k
  }
128
129
1.81k
  stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
130
1.81k
  stream_info_.route_ = parent_.callbacks()->route();
131
1.81k
  parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo());
132
133
1.81k
  stream_info_.healthCheck(parent_.callbacks()->streamInfo().healthCheck());
134
1.81k
  stream_info_.setIsShadow(parent_.callbacks()->streamInfo().isShadow());
135
1.81k
  absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info =
136
1.81k
      parent_.callbacks()->streamInfo().upstreamClusterInfo();
137
1.81k
  if (cluster_info.has_value()) {
138
1.81k
    stream_info_.setUpstreamClusterInfo(*cluster_info);
139
1.81k
  }
140
141
  // Set up the upstream HTTP filter manager.
142
1.81k
  filter_manager_callbacks_ = std::make_unique<UpstreamRequestFilterManagerCallbacks>(*this);
143
1.81k
  filter_manager_ = std::make_unique<UpstreamFilterManager>(
144
1.81k
      *filter_manager_callbacks_, parent_.callbacks()->dispatcher(), UpstreamRequest::connection(),
145
1.81k
      parent_.callbacks()->streamId(), parent_.callbacks()->account(), true,
146
1.81k
      parent_.callbacks()->decoderBufferLimit(), *parent_.cluster(), *this);
147
  // Attempt to create custom cluster-specified filter chain
148
1.81k
  bool created = parent_.cluster()->createFilterChain(*filter_manager_,
149
1.81k
                                                      /*only_create_if_configured=*/true);
150
1.81k
  if (!created) {
151
    // Attempt to create custom router-specified filter chain.
152
1.81k
    created = parent_.config().createFilterChain(*filter_manager_);
153
1.81k
  }
154
1.81k
  if (!created) {
155
    // Neither cluster nor router have a custom filter chain; add the default
156
    // cluster filter chain, which only consists of the codec filter.
157
1.81k
    created = parent_.cluster()->createFilterChain(*filter_manager_, false);
158
1.81k
  }
159
  // There will always be a codec filter present, which sets the upstream
160
  // interface. Fast-fail any tests that don't set up mocks correctly.
161
1.81k
  ASSERT(created && upstream_interface_.has_value());
162
1.81k
}
Unexecuted instantiation: Envoy::Router::UpstreamRequest::UpstreamRequest(Envoy::Router::RouterFilterInterface&, std::__1::unique_ptr<Envoy::Router::GenericConnPool, std::__1::default_delete<Envoy::Router::GenericConnPool> >&&, bool, bool, bool)
Envoy::Router::UpstreamRequest::UpstreamRequest(Envoy::Router::RouterFilterInterface&, std::__1::unique_ptr<Envoy::Router::GenericConnPool, std::__1::default_delete<Envoy::Router::GenericConnPool> >&&, bool, bool, bool)
Line
Count
Source
97
1.81k
      enable_half_close_(enable_half_close) {
98
1.81k
  if (auto tracing_config = parent_.callbacks()->tracingConfig(); tracing_config.has_value()) {
99
829
    if (tracing_config->spawnUpstreamSpan() || parent_.config().start_child_span_) {
100
0
      span_ = parent_.callbacks()->activeSpan().spawnChild(
101
0
          tracing_config.value().get(),
102
0
          absl::StrCat("router ", parent.cluster()->observabilityName(), " egress"),
103
0
          parent_.callbacks()->dispatcher().timeSource().systemTime());
104
0
      if (parent.attemptCount() != 1) {
105
        // This is a retry request, add this metadata to span.
106
0
        span_->setTag(Tracing::Tags::get().RetryCount, std::to_string(parent.attemptCount() - 1));
107
0
      }
108
0
    }
109
829
  }
110
111
  // The router checks that the connection pool is non-null before creating the upstream request.
112
1.81k
  auto upstream_host = conn_pool_->host();
113
1.81k
  Tracing::HttpTraceContext trace_context(*parent_.downstreamHeaders());
114
1.81k
  Tracing::UpstreamContext upstream_context(upstream_host.get(),           // host_
115
1.81k
                                            &upstream_host->cluster(),     // cluster_
116
1.81k
                                            Tracing::ServiceType::Unknown, // service_type_
117
1.81k
                                            false                          // async_client_span_
118
1.81k
  );
119
120
1.81k
  if (span_ != nullptr) {
121
0
    span_->injectContext(trace_context, upstream_context);
122
1.81k
  } else {
123
    // No independent child span for current upstream request then inject the parent span's tracing
124
    // context into the request headers.
125
    // The injectContext() of the parent span may be called repeatedly when the request is retried.
126
1.81k
    parent_.callbacks()->activeSpan().injectContext(trace_context, upstream_context);
127
1.81k
  }
128
129
1.81k
  stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
130
1.81k
  stream_info_.route_ = parent_.callbacks()->route();
131
1.81k
  parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo());
132
133
1.81k
  stream_info_.healthCheck(parent_.callbacks()->streamInfo().healthCheck());
134
1.81k
  stream_info_.setIsShadow(parent_.callbacks()->streamInfo().isShadow());
135
1.81k
  absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info =
136
1.81k
      parent_.callbacks()->streamInfo().upstreamClusterInfo();
137
1.81k
  if (cluster_info.has_value()) {
138
1.81k
    stream_info_.setUpstreamClusterInfo(*cluster_info);
139
1.81k
  }
140
141
  // Set up the upstream HTTP filter manager.
142
1.81k
  filter_manager_callbacks_ = std::make_unique<UpstreamRequestFilterManagerCallbacks>(*this);
143
1.81k
  filter_manager_ = std::make_unique<UpstreamFilterManager>(
144
1.81k
      *filter_manager_callbacks_, parent_.callbacks()->dispatcher(), UpstreamRequest::connection(),
145
1.81k
      parent_.callbacks()->streamId(), parent_.callbacks()->account(), true,
146
1.81k
      parent_.callbacks()->decoderBufferLimit(), *parent_.cluster(), *this);
147
  // Attempt to create custom cluster-specified filter chain
148
1.81k
  bool created = parent_.cluster()->createFilterChain(*filter_manager_,
149
1.81k
                                                      /*only_create_if_configured=*/true);
150
1.81k
  if (!created) {
151
    // Attempt to create custom router-specified filter chain.
152
1.81k
    created = parent_.config().createFilterChain(*filter_manager_);
153
1.81k
  }
154
1.81k
  if (!created) {
155
    // Neither cluster nor router have a custom filter chain; add the default
156
    // cluster filter chain, which only consists of the codec filter.
157
1.81k
    created = parent_.cluster()->createFilterChain(*filter_manager_, false);
158
1.81k
  }
159
  // There will always be a codec filter present, which sets the upstream
160
  // interface. Fast-fail any tests that don't set up mocks correctly.
161
1.81k
  ASSERT(created && upstream_interface_.has_value());
162
1.81k
}
163
164
1.81k
UpstreamRequest::~UpstreamRequest() { cleanUp(); }
165
166
3.63k
void UpstreamRequest::cleanUp() {
167
3.63k
  if (cleaned_up_) {
168
1.81k
    return;
169
1.81k
  }
170
1.81k
  cleaned_up_ = true;
171
172
1.81k
  filter_manager_->destroyFilters();
173
174
1.81k
  if (span_ != nullptr) {
175
0
    auto tracing_config = parent_.callbacks()->tracingConfig();
176
0
    ASSERT(tracing_config.has_value());
177
0
    Tracing::HttpTracerUtility::finalizeUpstreamSpan(*span_, stream_info_,
178
0
                                                     tracing_config.value().get());
179
0
  }
180
181
1.81k
  if (per_try_timeout_ != nullptr) {
182
    // Allows for testing.
183
0
    per_try_timeout_->disableTimer();
184
0
  }
185
186
1.81k
  if (per_try_idle_timeout_ != nullptr) {
187
    // Allows for testing.
188
0
    per_try_idle_timeout_->disableTimer();
189
0
  }
190
191
1.81k
  if (max_stream_duration_timer_ != nullptr) {
192
0
    max_stream_duration_timer_->disableTimer();
193
0
  }
194
195
1.81k
  if (upstream_log_flush_timer_ != nullptr) {
196
0
    upstream_log_flush_timer_->disableTimer();
197
0
  }
198
199
1.81k
  clearRequestEncoder();
200
201
  // If desired, fire the per-try histogram when the UpstreamRequest
202
  // completes.
203
1.81k
  if (record_timeout_budget_) {
204
5
    Event::Dispatcher& dispatcher = parent_.callbacks()->dispatcher();
205
5
    const MonotonicTime end_time = dispatcher.timeSource().monotonicTime();
206
5
    const std::chrono::milliseconds response_time =
207
5
        std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time_);
208
5
    Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = parent_.cluster()->timeoutBudgetStats();
209
5
    tb_stats->get().upstream_rq_timeout_budget_per_try_percent_used_.recordValue(
210
5
        FilterUtility::percentageOfTimeout(response_time, parent_.timeout().per_try_timeout_));
211
5
  }
212
213
  // Ditto for request/response size histograms.
214
1.81k
  Upstream::ClusterRequestResponseSizeStatsOptRef req_resp_stats_opt =
215
1.81k
      parent_.cluster()->requestResponseSizeStats();
216
1.81k
  if (req_resp_stats_opt.has_value() && parent_.downstreamHeaders()) {
217
5
    auto& req_resp_stats = req_resp_stats_opt->get();
218
5
    req_resp_stats.upstream_rq_headers_size_.recordValue(parent_.downstreamHeaders()->byteSize());
219
5
    req_resp_stats.upstream_rq_body_size_.recordValue(stream_info_.bytesSent());
220
221
5
    if (response_headers_size_.has_value()) {
222
0
      req_resp_stats.upstream_rs_headers_size_.recordValue(response_headers_size_.value());
223
0
      req_resp_stats.upstream_rs_body_size_.recordValue(stream_info_.bytesReceived());
224
0
    }
225
5
  }
226
227
1.81k
  stream_info_.onRequestComplete();
228
1.81k
  upstreamLog(AccessLog::AccessLogType::UpstreamEnd);
229
230
1.81k
  while (downstream_data_disabled_ != 0) {
231
0
    parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
232
0
    parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc();
233
0
    --downstream_data_disabled_;
234
0
  }
235
  // The upstream HTTP filter chain callbacks own headers/trailers while they are traversing the
236
  // filter chain. Make sure to not delete them immediately when the stream ends, as the stream
237
  // often ends during filter chain processing and it causes use-after-free violations.
238
1.81k
  parent_.callbacks()->dispatcher().deferredDelete(std::move(filter_manager_callbacks_));
239
1.81k
}
240
241
1.81k
void UpstreamRequest::upstreamLog(AccessLog::AccessLogType access_log_type) {
242
1.81k
  const Formatter::HttpFormatterContext log_context{parent_.downstreamHeaders(),
243
1.81k
                                                    upstream_headers_.get(),
244
1.81k
                                                    upstream_trailers_.get(),
245
1.81k
                                                    {},
246
1.81k
                                                    access_log_type};
247
248
1.81k
  for (const auto& upstream_log : parent_.config().upstream_logs_) {
249
0
    upstream_log->log(log_context, stream_info_);
250
0
  }
251
1.81k
}
252
253
// This is called by the FilterManager when all filters have processed 1xx headers. Forward them
254
// on to the router.
255
1
void UpstreamRequest::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) {
256
1
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
257
258
1
  ASSERT(Http::HeaderUtility::isSpecial1xx(*headers));
259
1
  addResponseHeadersSize(headers->byteSize());
260
1
  maybeHandleDeferredReadDisable();
261
1
  parent_.onUpstream1xxHeaders(std::move(headers), *this);
262
1
}
263
264
// This is called by the FilterManager when all filters have processed headers. Forward them
265
// on to the router.
266
1.63k
void UpstreamRequest::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
267
1.63k
  ASSERT(headers.get());
268
1.63k
  ENVOY_STREAM_LOG(trace, "end_stream: {}, upstream response headers:\n{}", *parent_.callbacks(),
269
1.63k
                   end_stream, *headers);
270
1.63k
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
271
272
1.63k
  resetPerTryIdleTimer();
273
274
1.63k
  addResponseHeadersSize(headers->byteSize());
275
276
  // We drop unsupported 1xx on the floor here. 101 upgrade headers need to be passed to the client
277
  // as part of the final response. Most 1xx headers are handled in onUpstream1xxHeaders.
278
  //
279
  // We could in principle handle other headers here, but this might result in the double invocation
280
  // of decodeHeaders() (once for informational, again for non-informational), which is likely an
281
  // easy to miss corner case in the filter and HCM contract.
282
  //
283
  // This filtering is done early in upstream request, unlike 100 coalescing which is performed in
284
  // the router filter, since the filtering only depends on the state of a single upstream, and we
285
  // don't want to confuse accounting such as onFirstUpstreamRxByteReceived() with informational
286
  // headers.
287
1.63k
  const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
288
1.63k
  if (Http::CodeUtility::is1xx(response_code) &&
289
1.63k
      response_code != enumToInt(Http::Code::SwitchingProtocols)) {
290
0
    return;
291
0
  }
292
293
1.63k
  awaiting_headers_ = false;
294
1.63k
  if (span_ != nullptr) {
295
0
    Tracing::HttpTracerUtility::onUpstreamResponseHeaders(*span_, headers.get());
296
0
  }
297
1.63k
  if (!parent_.config().upstream_logs_.empty()) {
298
0
    upstream_headers_ = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(*headers);
299
0
  }
300
1.63k
  stream_info_.setResponseCode(static_cast<uint32_t>(response_code));
301
302
1.63k
  maybeHandleDeferredReadDisable();
303
1.63k
  ASSERT(headers.get());
304
305
1.63k
  parent_.onUpstreamHeaders(response_code, std::move(headers), *this, end_stream);
306
1.63k
}
307
308
1.63k
void UpstreamRequest::maybeHandleDeferredReadDisable() {
309
1.63k
  for (; deferred_read_disabling_count_ > 0; --deferred_read_disabling_count_) {
310
    // If the deferred read disabling count hasn't been cancelled out by read
311
    // enabling count so far, stop the upstream from reading the rest response.
312
    // Because readDisable keeps track of how many time it is called with
313
    // "true" or "false", here it has to be called with "true" the same number
314
    // of times as it would be called with "false" in the future.
315
0
    parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
316
0
    upstream_->readDisable(true);
317
0
  }
318
1.63k
}
319
320
3.11k
void UpstreamRequest::decodeData(Buffer::Instance& data, bool end_stream) {
321
3.11k
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
322
323
3.11k
  resetPerTryIdleTimer();
324
3.11k
  stream_info_.addBytesReceived(data.length());
325
3.11k
  parent_.onUpstreamData(data, *this, end_stream);
326
3.11k
}
327
328
1
void UpstreamRequest::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) {
329
1
  ENVOY_STREAM_LOG(trace, "upstream response trailers:\n{}", *parent_.callbacks(), *trailers);
330
1
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
331
332
1
  if (span_ != nullptr) {
333
0
    Tracing::HttpTracerUtility::onUpstreamResponseTrailers(*span_, trailers.get());
334
0
  }
335
1
  if (!parent_.config().upstream_logs_.empty()) {
336
0
    upstream_trailers_ = Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*trailers);
337
0
  }
338
1
  parent_.onUpstreamTrailers(std::move(trailers), *this);
339
1
}
340
341
0
void UpstreamRequest::dumpState(std::ostream& os, int indent_level) const {
342
0
  const char* spaces = spacesForLevel(indent_level);
343
0
  os << spaces << "UpstreamRequest " << this << "\n";
344
0
  if (connection()) {
345
0
    const auto addressProvider = connection()->connectionInfoProviderSharedPtr();
346
0
    DUMP_DETAILS(addressProvider);
347
0
  }
348
0
  const Http::RequestHeaderMap* request_headers = parent_.downstreamHeaders();
349
0
  DUMP_DETAILS(request_headers);
350
0
  if (filter_manager_) {
351
0
    filter_manager_->dumpState(os, indent_level);
352
0
  }
353
0
}
354
355
1.77k
const Route& UpstreamRequest::route() const { return *parent_.callbacks()->route(); }
356
357
1.81k
OptRef<const Network::Connection> UpstreamRequest::connection() const {
358
1.81k
  return parent_.callbacks()->connection();
359
1.81k
}
360
361
293
void UpstreamRequest::decodeMetadata(Http::MetadataMapPtr&& metadata_map) {
362
293
  parent_.onUpstreamMetadata(std::move(metadata_map));
363
293
}
364
365
0
void UpstreamRequest::maybeEndDecode(bool end_stream) {
366
0
  if (end_stream) {
367
0
    upstreamTiming().onLastUpstreamRxByteReceived(parent_.callbacks()->dispatcher().timeSource());
368
0
  }
369
0
}
370
371
void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
372
1.77k
                                             bool pool_success) {
373
1.77k
  StreamInfo::UpstreamInfo& upstream_info = *streamInfo().upstreamInfo();
374
1.77k
  upstream_info.setUpstreamHost(host);
375
1.77k
  upstream_host_ = host;
376
1.77k
  parent_.onUpstreamHostSelected(host, pool_success);
377
1.77k
}
378
379
1.81k
void UpstreamRequest::acceptHeadersFromRouter(bool end_stream) {
380
1.81k
  ASSERT(!router_sent_end_stream_);
381
1.81k
  router_sent_end_stream_ = end_stream;
382
383
  // Make sure that when we are forwarding CONNECT payload we do not do so until
384
  // the upstream has accepted the CONNECT request.
385
  // This must be done before conn_pool->newStream, as onPoolReady un-pauses for CONNECT
386
  // termination.
387
1.81k
  auto* headers = parent_.downstreamHeaders();
388
1.81k
  if (headers->getMethodValue() == Http::Headers::get().MethodValues.Connect) {
389
0
    paused_for_connect_ = true;
390
    // If this is a websocket upgrade request, pause the request until the upstream sends
391
    // the 101 Switching Protocols response code. Using the else logic here to obey CONNECT
392
    // method which is expecting 2xx response.
393
1.81k
  } else if ((Runtime::runtimeFeatureEnabled(
394
1.81k
                 "envoy.reloadable_features.check_switch_protocol_websocket_handshake")) &&
395
1.81k
             Http::Utility::isWebSocketUpgradeRequest(*headers)) {
396
0
    paused_for_websocket_ = true;
397
0
  }
398
399
  // Kick off creation of the upstream connection immediately upon receiving headers.
400
  // In future it may be possible for upstream HTTP filters to delay this, or influence connection
401
  // creation but for now optimize for minimal latency and fetch the connection
402
  // as soon as possible.
403
1.81k
  conn_pool_->newStream(this);
404
405
1.81k
  if (parent_.config().upstream_log_flush_interval_.has_value()) {
406
0
    upstream_log_flush_timer_ = parent_.callbacks()->dispatcher().createTimer([this]() -> void {
407
      // If the request is complete, we've already done the stream-end upstream log, and shouldn't
408
      // do the periodic log.
409
0
      if (!streamInfo().requestComplete().has_value()) {
410
0
        upstreamLog(AccessLog::AccessLogType::UpstreamPeriodic);
411
0
        resetUpstreamLogFlushTimer();
412
0
      }
413
      // Both downstream and upstream bytes meters may not be initialized when
414
      // the timer goes off, e.g. if it takes longer than the interval for a
415
      // connection to be initialized; check for nullptr.
416
0
      auto& downstream_bytes_meter = stream_info_.getDownstreamBytesMeter();
417
0
      auto& upstream_bytes_meter = stream_info_.getUpstreamBytesMeter();
418
0
      const SystemTime now = parent_.callbacks()->dispatcher().timeSource().systemTime();
419
0
      if (downstream_bytes_meter) {
420
0
        downstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now);
421
0
      }
422
0
      if (upstream_bytes_meter) {
423
0
        upstream_bytes_meter->takeUpstreamPeriodicLoggingSnapshot(now);
424
0
      }
425
0
    });
426
427
0
    resetUpstreamLogFlushTimer();
428
0
  }
429
430
1.81k
  filter_manager_->requestHeadersInitialized();
431
1.81k
  filter_manager_->streamInfo().setRequestHeaders(*parent_.downstreamHeaders());
432
1.81k
  filter_manager_->decodeHeaders(*parent_.downstreamHeaders(), end_stream);
433
1.81k
}
434
435
3.05k
void UpstreamRequest::acceptDataFromRouter(Buffer::Instance& data, bool end_stream) {
436
3.05k
  ASSERT(!router_sent_end_stream_);
437
3.05k
  router_sent_end_stream_ = end_stream;
438
439
3.05k
  filter_manager_->decodeData(data, end_stream);
440
3.05k
}
441
442
0
void UpstreamRequest::acceptTrailersFromRouter(Http::RequestTrailerMap& trailers) {
443
0
  ASSERT(!router_sent_end_stream_);
444
0
  router_sent_end_stream_ = true;
445
0
  encode_trailers_ = true;
446
447
0
  filter_manager_->decodeTrailers(trailers);
448
0
}
449
450
26
void UpstreamRequest::acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr) {
451
26
  filter_manager_->decodeMetadata(*metadata_map_ptr);
452
26
}
453
454
void UpstreamRequest::onResetStream(Http::StreamResetReason reason,
455
121
                                    absl::string_view transport_failure_reason) {
456
121
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
457
458
121
  if (span_ != nullptr) {
459
    // Add tags about reset.
460
0
    span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
461
0
    span_->setTag(Tracing::Tags::get().ErrorReason, Http::Utility::resetReasonToString(reason));
462
0
  }
463
121
  clearRequestEncoder();
464
121
  awaiting_headers_ = false;
465
121
  if (!calling_encode_headers_) {
466
121
    stream_info_.setResponseFlag(Filter::streamResetReasonToResponseFlag(reason));
467
121
    parent_.onUpstreamReset(reason, transport_failure_reason, *this);
468
121
  } else {
469
0
    deferred_reset_reason_ = reason;
470
0
  }
471
121
}
472
473
896
void UpstreamRequest::resetStream() {
474
896
  if (conn_pool_->cancelAnyPendingStream()) {
475
46
    ENVOY_STREAM_LOG(debug, "canceled pool request", *parent_.callbacks());
476
46
    ASSERT(!upstream_);
477
46
  }
478
479
  // Don't reset the stream if we're already done with it.
480
896
  if (upstreamTiming().last_upstream_tx_byte_sent_.has_value() &&
481
896
      upstreamTiming().last_upstream_rx_byte_received_.has_value()) {
482
0
    return;
483
0
  }
484
485
896
  if (span_ != nullptr) {
486
    // Add tags about the cancellation.
487
0
    span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True);
488
0
  }
489
490
896
  if (upstream_) {
491
850
    ENVOY_STREAM_LOG(debug, "resetting pool request", *parent_.callbacks());
492
850
    upstream_->resetStream();
493
850
    clearRequestEncoder();
494
850
  }
495
896
  reset_stream_ = true;
496
896
}
497
498
4.74k
void UpstreamRequest::resetPerTryIdleTimer() {
499
4.74k
  if (per_try_idle_timeout_ != nullptr) {
500
0
    per_try_idle_timeout_->enableTimer(parent_.timeout().per_try_idle_timeout_);
501
0
  }
502
4.74k
}
503
504
0
void UpstreamRequest::resetUpstreamLogFlushTimer() {
505
0
  if (upstream_log_flush_timer_ != nullptr) {
506
0
    upstream_log_flush_timer_->enableTimer(parent_.config().upstream_log_flush_interval_.value());
507
0
  }
508
0
}
509
510
1.60k
void UpstreamRequest::setupPerTryTimeout() {
511
1.60k
  ASSERT(!per_try_timeout_);
512
1.60k
  if (parent_.timeout().per_try_timeout_.count() > 0) {
513
0
    per_try_timeout_ =
514
0
        parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryTimeout(); });
515
0
    per_try_timeout_->enableTimer(parent_.timeout().per_try_timeout_);
516
0
  }
517
518
1.60k
  ASSERT(!per_try_idle_timeout_);
519
1.60k
  if (parent_.timeout().per_try_idle_timeout_.count() > 0) {
520
0
    per_try_idle_timeout_ =
521
0
        parent_.callbacks()->dispatcher().createTimer([this]() -> void { onPerTryIdleTimeout(); });
522
0
    resetPerTryIdleTimer();
523
0
  }
524
1.60k
}
525
526
0
void UpstreamRequest::onPerTryIdleTimeout() {
527
0
  ENVOY_STREAM_LOG(debug, "upstream per try idle timeout", *parent_.callbacks());
528
0
  if (per_try_timeout_) {
529
    // Disable the per try idle timer, so it does not trigger further retries
530
0
    per_try_timeout_->disableTimer();
531
0
  }
532
0
  stream_info_.setResponseFlag(StreamInfo::CoreResponseFlag::StreamIdleTimeout);
533
0
  parent_.onPerTryIdleTimeout(*this);
534
0
}
535
536
0
void UpstreamRequest::onPerTryTimeout() {
537
0
  if (per_try_idle_timeout_) {
538
    // Delete the per try idle timer, so it does not trigger further retries.
539
    // The timer has to be deleted to prevent data flow from re-arming it.
540
0
    per_try_idle_timeout_.reset();
541
0
  }
542
  // If we've sent anything downstream, ignore the per try timeout and let the response continue
543
  // up to the global timeout
544
0
  if (!parent_.downstreamResponseStarted()) {
545
0
    ENVOY_STREAM_LOG(debug, "upstream per try timeout", *parent_.callbacks());
546
547
0
    stream_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamRequestTimeout);
548
0
    parent_.onPerTryTimeout(*this);
549
0
  } else {
550
0
    ENVOY_STREAM_LOG(debug,
551
0
                     "ignored upstream per try timeout due to already started downstream response",
552
0
                     *parent_.callbacks());
553
0
  }
554
0
}
555
556
1.77k
void UpstreamRequest::recordConnectionPoolCallbackLatency() {
557
1.77k
  upstreamTiming().recordConnectionPoolCallbackLatency(
558
1.77k
      start_time_, parent_.callbacks()->dispatcher().timeSource());
559
1.77k
}
560
561
void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason,
562
                                    absl::string_view transport_failure_reason,
563
0
                                    Upstream::HostDescriptionConstSharedPtr host) {
564
0
  recordConnectionPoolCallbackLatency();
565
0
  Http::StreamResetReason reset_reason = [](ConnectionPool::PoolFailureReason reason) {
566
0
    switch (reason) {
567
0
    case ConnectionPool::PoolFailureReason::Overflow:
568
0
      return Http::StreamResetReason::Overflow;
569
0
    case ConnectionPool::PoolFailureReason::RemoteConnectionFailure:
570
0
      return Http::StreamResetReason::RemoteConnectionFailure;
571
0
    case ConnectionPool::PoolFailureReason::LocalConnectionFailure:
572
0
      return Http::StreamResetReason::LocalConnectionFailure;
573
0
    case ConnectionPool::PoolFailureReason::Timeout:
574
0
      return Http::StreamResetReason::ConnectionTimeout;
575
0
    }
576
0
    PANIC_DUE_TO_CORRUPT_ENUM;
577
0
  }(reason);
578
579
0
  stream_info_.upstreamInfo()->setUpstreamTransportFailureReason(transport_failure_reason);
580
581
  // Mimic an upstream reset.
582
0
  onUpstreamHostSelected(host, false);
583
0
  onResetStream(reset_reason, transport_failure_reason);
584
0
}
585
586
void UpstreamRequest::onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
587
                                  Upstream::HostDescriptionConstSharedPtr host,
588
                                  const Network::ConnectionInfoProvider& address_provider,
589
                                  StreamInfo::StreamInfo& info,
590
1.77k
                                  absl::optional<Http::Protocol> protocol) {
591
  // This may be called under an existing ScopeTrackerScopeState but it will unwind correctly.
592
1.77k
  ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
593
1.77k
  ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks());
594
1.77k
  recordConnectionPoolCallbackLatency();
595
1.77k
  upstream_ = std::move(upstream);
596
1.77k
  had_upstream_ = true;
597
  // Have the upstream use the account of the downstream.
598
1.77k
  upstream_->setAccount(parent_.callbacks()->account());
599
600
1.77k
  host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess);
601
602
1.77k
  onUpstreamHostSelected(host, true);
603
604
1.77k
  if (protocol) {
605
1.77k
    stream_info_.protocol(protocol.value());
606
1.77k
  } else {
607
    // We only pause for CONNECT and WebSocket for HTTP upstreams. If this is a TCP upstream,
608
    // unpause.
609
0
    paused_for_connect_ = false;
610
0
    paused_for_websocket_ = false;
611
0
  }
612
613
1.77k
  StreamInfo::UpstreamInfo& upstream_info = *stream_info_.upstreamInfo();
614
1.77k
  if (info.upstreamInfo()) {
615
1.77k
    auto& upstream_timing = info.upstreamInfo()->upstreamTiming();
616
1.77k
    upstreamTiming().upstream_connect_start_ = upstream_timing.upstream_connect_start_;
617
1.77k
    upstreamTiming().upstream_connect_complete_ = upstream_timing.upstream_connect_complete_;
618
1.77k
    upstreamTiming().upstream_handshake_complete_ = upstream_timing.upstream_handshake_complete_;
619
1.77k
    upstream_info.setUpstreamNumStreams(info.upstreamInfo()->upstreamNumStreams());
620
1.77k
  }
621
622
  // Upstream HTTP filters might have already created/set a filter state.
623
1.77k
  const StreamInfo::FilterStateSharedPtr& filter_state = info.filterState();
624
1.77k
  if (!filter_state) {
625
0
    upstream_info.setUpstreamFilterState(
626
0
        std::make_shared<StreamInfo::FilterStateImpl>(StreamInfo::FilterState::LifeSpan::Request));
627
1.77k
  } else {
628
1.77k
    upstream_info.setUpstreamFilterState(filter_state);
629
1.77k
  }
630
1.77k
  upstream_info.setUpstreamLocalAddress(address_provider.localAddress());
631
1.77k
  upstream_info.setUpstreamRemoteAddress(address_provider.remoteAddress());
632
1.77k
  upstream_info.setUpstreamSslConnection(info.downstreamAddressProvider().sslConnection());
633
634
1.77k
  if (info.downstreamAddressProvider().connectionID().has_value()) {
635
1.76k
    upstream_info.setUpstreamConnectionId(info.downstreamAddressProvider().connectionID().value());
636
1.76k
  }
637
638
1.77k
  if (info.downstreamAddressProvider().interfaceName().has_value()) {
639
0
    upstream_info.setUpstreamInterfaceName(
640
0
        info.downstreamAddressProvider().interfaceName().value());
641
0
  }
642
643
1.77k
  stream_info_.setUpstreamBytesMeter(upstream_->bytesMeter());
644
1.77k
  StreamInfo::StreamInfo::syncUpstreamAndDownstreamBytesMeter(parent_.callbacks()->streamInfo(),
645
1.77k
                                                              stream_info_);
646
1.77k
  if (protocol) {
647
1.77k
    upstream_info.setUpstreamProtocol(protocol.value());
648
1.77k
  }
649
650
1.77k
  if (parent_.downstreamEndStream()) {
651
444
    setupPerTryTimeout();
652
1.32k
  } else {
653
1.32k
    create_per_try_timeout_on_request_complete_ = true;
654
1.32k
  }
655
656
  // Make sure the connection manager will inform the downstream watermark manager when the
657
  // downstream buffers are overrun. This may result in immediate watermark callbacks referencing
658
  // the encoder.
659
1.77k
  parent_.callbacks()->addDownstreamWatermarkCallbacks(downstream_watermark_manager_);
660
661
1.77k
  absl::optional<std::chrono::milliseconds> max_stream_duration;
662
1.77k
  if (parent_.dynamicMaxStreamDuration().has_value()) {
663
0
    max_stream_duration = parent_.dynamicMaxStreamDuration().value();
664
1.77k
  } else if (upstream_host_->cluster().commonHttpProtocolOptions().has_max_stream_duration()) {
665
0
    max_stream_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
666
0
        upstream_host_->cluster().commonHttpProtocolOptions().max_stream_duration()));
667
0
  }
668
1.77k
  if (max_stream_duration.has_value() && max_stream_duration->count()) {
669
0
    max_stream_duration_timer_ = parent_.callbacks()->dispatcher().createTimer(
670
0
        [this]() -> void { onStreamMaxDurationReached(); });
671
0
    max_stream_duration_timer_->enableTimer(*max_stream_duration);
672
0
  }
673
674
1.77k
  const auto* route_entry = route().routeEntry();
675
1.77k
  if (route_entry->autoHostRewrite() && !host->hostname().empty()) {
676
0
    Http::Utility::updateAuthority(*parent_.downstreamHeaders(), host->hostname(),
677
0
                                   route_entry->appendXfh());
678
0
  }
679
680
1.77k
  stream_info_.setRequestHeaders(*parent_.downstreamHeaders());
681
682
1.77k
  if (parent_.config().flush_upstream_log_on_upstream_stream_) {
683
0
    upstreamLog(AccessLog::AccessLogType::UpstreamPoolReady);
684
0
  }
685
686
1.77k
  if (address_provider.connectionID() && stream_info_.downstreamAddressProvider().connectionID()) {
687
0
    ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
688
0
              address_provider.connectionID().value(),
689
0
              stream_info_.downstreamAddressProvider().connectionID().value());
690
0
  }
691
692
1.77k
  for (auto* callback : upstream_callbacks_) {
693
1.77k
    callback->onUpstreamConnectionEstablished();
694
1.77k
  }
695
1.77k
}
696
697
5.39k
UpstreamToDownstream& UpstreamRequest::upstreamToDownstream() { return *upstream_interface_; }
698
699
0
void UpstreamRequest::onStreamMaxDurationReached() {
700
0
  upstream_host_->cluster().trafficStats()->upstream_rq_max_duration_reached_.inc();
701
702
  // The upstream had closed then try to retry along with retry policy.
703
0
  parent_.onStreamMaxDurationReached(*this);
704
0
}
705
706
2.78k
void UpstreamRequest::clearRequestEncoder() {
707
  // Before clearing the encoder, unsubscribe from callbacks.
708
2.78k
  if (upstream_) {
709
1.77k
    parent_.callbacks()->removeDownstreamWatermarkCallbacks(downstream_watermark_manager_);
710
1.77k
  }
711
2.78k
  upstream_.reset();
712
2.78k
}
713
714
1.30k
void UpstreamRequest::readDisableOrDefer(bool disable) {
715
1.30k
  if (disable) {
716
    // See comments on deferred_read_disabling_count_ for when we do and don't defer.
717
655
    if (parent_.downstreamResponseStarted()) {
718
      // The downstream connection is overrun. Pause reads from upstream.
719
      // If there are multiple calls to readDisable either the codec (H2) or the
720
      // underlying Network::Connection (H1) will handle reference counting.
721
655
      parent_.cluster()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
722
655
      upstream_->readDisable(disable);
723
655
    } else {
724
0
      ++deferred_read_disabling_count_;
725
0
    }
726
655
    return;
727
655
  }
728
729
  // One source of connection blockage has buffer available.
730
651
  if (deferred_read_disabling_count_ > 0) {
731
0
    ASSERT(!parent_.downstreamResponseStarted());
732
    // Cancel out an existing deferred read disabling.
733
0
    --deferred_read_disabling_count_;
734
0
    return;
735
0
  }
736
651
  ASSERT(parent_.downstreamResponseStarted());
737
  // Pass this on to the stream, which
738
  // will resume reads if this was the last remaining high watermark.
739
651
  parent_.cluster()->trafficStats()->upstream_flow_control_resumed_reading_total_.inc();
740
651
  upstream_->readDisable(disable);
741
651
}
742
743
655
void UpstreamRequest::DownstreamWatermarkManager::onAboveWriteBufferHighWatermark() {
744
655
  ASSERT(parent_.upstream_);
745
655
  parent_.readDisableOrDefer(true);
746
655
}
747
748
651
void UpstreamRequest::DownstreamWatermarkManager::onBelowWriteBufferLowWatermark() {
749
651
  ASSERT(parent_.upstream_);
750
651
  parent_.readDisableOrDefer(false);
751
651
}
752
753
0
void UpstreamRequest::disableDataFromDownstreamForFlowControl() {
754
0
  parent_.cluster()->trafficStats()->upstream_flow_control_backed_up_total_.inc();
755
0
  parent_.callbacks()->onDecoderFilterAboveWriteBufferHighWatermark();
756
0
  ++downstream_data_disabled_;
757
0
}
758
759
0
void UpstreamRequest::enableDataFromDownstreamForFlowControl() {
760
0
  parent_.cluster()->trafficStats()->upstream_flow_control_drained_total_.inc();
761
0
  parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
762
0
  ASSERT(downstream_data_disabled_ != 0);
763
0
  if (downstream_data_disabled_ > 0) {
764
0
    --downstream_data_disabled_;
765
0
  }
766
0
}
767
768
4.54k
Http::RequestHeaderMapOptRef UpstreamRequestFilterManagerCallbacks::requestHeaders() {
769
4.54k
  return {*upstream_request_.parent_.downstreamHeaders()};
770
4.54k
}
771
772
8.79k
Http::RequestTrailerMapOptRef UpstreamRequestFilterManagerCallbacks::requestTrailers() {
773
8.79k
  if (upstream_request_.parent_.downstreamTrailers()) {
774
0
    return {*upstream_request_.parent_.downstreamTrailers()};
775
0
  }
776
8.79k
  if (trailers_) {
777
0
    return {*trailers_};
778
0
  }
779
8.79k
  return {};
780
8.79k
}
781
782
0
const ScopeTrackedObject& UpstreamRequestFilterManagerCallbacks::scope() {
783
0
  return upstream_request_.parent_.callbacks()->scope();
784
0
}
785
786
0
OptRef<const Tracing::Config> UpstreamRequestFilterManagerCallbacks::tracingConfig() const {
787
0
  return upstream_request_.parent_.callbacks()->tracingConfig();
788
0
}
789
790
0
Tracing::Span& UpstreamRequestFilterManagerCallbacks::activeSpan() {
791
0
  return upstream_request_.parent_.callbacks()->activeSpan();
792
0
}
793
794
void UpstreamRequestFilterManagerCallbacks::resetStream(
795
121
    Http::StreamResetReason reset_reason, absl::string_view transport_failure_reason) {
796
  // The filter manager needs to disambiguate between a filter-driven reset,
797
  // which should force reset the stream, and a codec driven reset, which should
798
  // tell the router the stream reset, and let the router make the decision to
799
  // send a local reply, or retry the stream.
800
121
  bool is_codec_error;
801
121
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.report_stream_reset_error_code")) {
802
121
    is_codec_error = absl::StrContains(transport_failure_reason, "codec_error");
803
121
  } else {
804
0
    is_codec_error = transport_failure_reason == "codec_error";
805
0
  }
806
121
  if (reset_reason == Http::StreamResetReason::LocalReset && !is_codec_error) {
807
0
    upstream_request_.parent_.callbacks()->resetStream();
808
0
    return;
809
0
  }
810
121
  return upstream_request_.onResetStream(reset_reason, transport_failure_reason);
811
121
}
812
813
0
Upstream::ClusterInfoConstSharedPtr UpstreamRequestFilterManagerCallbacks::clusterInfo() {
814
0
  return upstream_request_.parent_.callbacks()->clusterInfo();
815
0
}
816
817
Http::Http1StreamEncoderOptionsOptRef
818
0
UpstreamRequestFilterManagerCallbacks::http1StreamEncoderOptions() {
819
0
  return upstream_request_.parent_.callbacks()->http1StreamEncoderOptions();
820
0
}
821
822
} // namespace Router
823
} // namespace Envoy