Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/router/upstream_request.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8
9
#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h"
10
#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.validate.h"
11
#include "envoy/http/codec.h"
12
#include "envoy/http/codes.h"
13
#include "envoy/http/conn_pool.h"
14
#include "envoy/http/filter.h"
15
#include "envoy/stats/scope.h"
16
#include "envoy/tcp/conn_pool.h"
17
18
#include "source/common/buffer/watermark_buffer.h"
19
#include "source/common/common/cleanup.h"
20
#include "source/common/common/hash.h"
21
#include "source/common/common/hex.h"
22
#include "source/common/common/linked_object.h"
23
#include "source/common/common/logger.h"
24
#include "source/common/config/well_known_names.h"
25
#include "source/common/http/filter_manager.h"
26
#include "source/common/stream_info/stream_info_impl.h"
27
#include "source/common/tracing/null_span_impl.h"
28
#include "source/extensions/filters/http/common/factory_base.h"
29
30
namespace Envoy {
31
namespace Router {
32
33
class GenericUpstream;
34
class GenericConnectionPoolCallbacks;
35
class RouterFilterInterface;
36
class UpstreamRequest;
37
class UpstreamRequestFilterManagerCallbacks;
38
class UpstreamFilterManager;
39
class UpstreamCodecFilter;
40
41
/* The Upstream request is the base class for forwarding HTTP upstream.
42
 *
43
 * On the new request path, payload (headers/body/metadata/data) still arrives via
44
 * the accept[X]fromRouter functions. Said data is immediately passed off to the
45
 * UpstreamFilterManager, which passes each item through the filter chain until
46
 * it arrives at the last filter in the chain, the UpstreamCodecFilter. If the upstream
47
 * stream is not established, the UpstreamCodecFilter returns StopAllIteration, and the
48
 * FilterManager will buffer data, using watermarks to push back to the router
49
 * filter if buffers become overrun. When an upstream connection is established,
50
 * the UpstreamCodecFilter will send data upstream.
51
 *
52
 * On the new response path, payload arrives from upstream via the UpstreamCodecFilter's
53
 * CodecBridge. It is passed off directly to the FilterManager, traverses the
54
 * filter chain, and completion is signaled via the
55
 * UpstreamRequestFilterManagerCallbacks's encode[X] functions. These somewhat
56
 * confusingly pass through the UpstreamRequest's legacy decode[X] functions
57
 * (required due to the UpstreamToDownstream interface, but will be renamed once
58
 * the classic mode is deprecated), and are finally passed to the router via the
59
 * RouterFilterInterface onUpstream[X] functions.
60
 *
61
 * There is some required communication between the UpstreamRequest and
62
 * UpstreamCodecFilter. This is accomplished via the UpstreamStreamFilterCallbacks
63
 * interface, with the UpstreamFilterManager acting as intermediary.
64
 *
65
 */
66
class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
67
                        public UpstreamToDownstream,
68
                        public LinkedObject<UpstreamRequest>,
69
                        public GenericConnectionPoolCallbacks,
70
                        public Event::DeferredDeletable {
71
public:
72
  UpstreamRequest(RouterFilterInterface& parent, std::unique_ptr<GenericConnPool>&& conn_pool,
73
                  bool can_send_early_data, bool can_use_http3, bool enable_half_close);
74
  ~UpstreamRequest() override;
75
1.81k
  void deleteIsPending() override { cleanUp(); }
76
77
  // To be called from the destructor, or prior to deferred delete.
78
  void cleanUp();
79
80
  virtual void acceptHeadersFromRouter(bool end_stream);
81
  virtual void acceptDataFromRouter(Buffer::Instance& data, bool end_stream);
82
  void acceptTrailersFromRouter(Http::RequestTrailerMap& trailers);
83
  void acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr);
84
85
  virtual void resetStream();
86
  void setupPerTryTimeout();
87
  void maybeEndDecode(bool end_stream);
88
  void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host, bool pool_success);
89
90
  // Http::StreamDecoder
91
  void decodeData(Buffer::Instance& data, bool end_stream) override;
92
  void decodeMetadata(Http::MetadataMapPtr&& metadata_map) override;
93
94
  // UpstreamToDownstream (Http::ResponseDecoder)
95
  void decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) override;
96
  void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override;
97
  void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override;
98
  void dumpState(std::ostream& os, int indent_level) const override;
99
100
  // UpstreamToDownstream (Http::StreamCallbacks)
101
  void onResetStream(Http::StreamResetReason reason,
102
                     absl::string_view transport_failure_reason) override;
103
0
  void onAboveWriteBufferHighWatermark() override { disableDataFromDownstreamForFlowControl(); }
104
0
  void onBelowWriteBufferLowWatermark() override { enableDataFromDownstreamForFlowControl(); }
105
  // UpstreamToDownstream
106
  const Route& route() const override;
107
  OptRef<const Network::Connection> connection() const override;
108
1.81k
  const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override {
109
1.81k
    return stream_options_;
110
1.81k
  }
111
112
  void disableDataFromDownstreamForFlowControl();
113
  void enableDataFromDownstreamForFlowControl();
114
115
  // GenericConnPool
116
  void onPoolFailure(ConnectionPool::PoolFailureReason reason,
117
                     absl::string_view transport_failure_reason,
118
                     Upstream::HostDescriptionConstSharedPtr host) override;
119
  void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
120
                   Upstream::HostDescriptionConstSharedPtr host,
121
                   const Network::ConnectionInfoProvider& address_provider,
122
                   StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) override;
123
  UpstreamToDownstream& upstreamToDownstream() override;
124
125
  void clearRequestEncoder();
126
  void onStreamMaxDurationReached();
127
128
  // Either disable upstream reading immediately or defer it and keep tracking
129
  // of how many read disabling has happened.
130
  void readDisableOrDefer(bool disable);
131
  // Called upon receiving the first response headers from the upstream. And
132
  // applies read disabling to it if there is any pending read disabling.
133
  void maybeHandleDeferredReadDisable();
134
135
  struct DownstreamWatermarkManager : public Http::DownstreamWatermarkCallbacks {
136
1.81k
    DownstreamWatermarkManager(UpstreamRequest& parent) : parent_(parent) {}
137
138
    // Http::DownstreamWatermarkCallbacks
139
    void onBelowWriteBufferLowWatermark() override;
140
    void onAboveWriteBufferHighWatermark() override;
141
142
    UpstreamRequest& parent_;
143
  };
144
145
  void readEnable();
146
  void encodeBodyAndTrailers();
147
148
  // Getters and setters
149
9.40k
  Upstream::HostDescriptionConstSharedPtr& upstreamHost() { return upstream_host_; }
150
0
  void outlierDetectionTimeoutRecorded(bool recorded) {
151
0
    outlier_detection_timeout_recorded_ = recorded;
152
0
  }
153
0
  bool outlierDetectionTimeoutRecorded() { return outlier_detection_timeout_recorded_; }
154
0
  void retried(bool value) { retried_ = value; }
155
0
  bool retried() { return retried_; }
156
799
  bool grpcRqSuccessDeferred() { return grpc_rq_success_deferred_; }
157
729
  void grpcRqSuccessDeferred(bool deferred) { grpc_rq_success_deferred_ = deferred; }
158
1.63k
  void upstreamCanary(bool value) { upstream_canary_ = value; }
159
799
  bool upstreamCanary() { return upstream_canary_; }
160
0
  bool awaitingHeaders() { return awaiting_headers_; }
161
0
  void recordTimeoutBudget(bool value) { record_timeout_budget_ = value; }
162
1.62k
  bool createPerTryTimeoutOnRequestComplete() {
163
1.62k
    return create_per_try_timeout_on_request_complete_;
164
1.62k
  }
165
0
  bool encodeComplete() const { return router_sent_end_stream_; }
166
  // Exposes streamInfo for the upstream stream.
167
12.2k
  StreamInfo::StreamInfo& streamInfo() { return stream_info_; }
168
0
  bool hadUpstream() const { return had_upstream_; }
169
170
private:
171
  friend class UpstreamFilterManager;
172
  friend class UpstreamCodecFilter;
173
  friend class UpstreamRequestFilterManagerCallbacks;
174
8.70k
  StreamInfo::UpstreamTiming& upstreamTiming() {
175
8.70k
    return stream_info_.upstreamInfo()->upstreamTiming();
176
8.70k
  }
177
  // Records the latency from when the upstream request was first created to
178
  // when the pool callback fires. This latency can be useful to track excessive
179
  // queuing.
180
  void recordConnectionPoolCallbackLatency();
181
182
1.63k
  void addResponseHeadersSize(uint64_t size) {
183
1.63k
    response_headers_size_ = response_headers_size_.value_or(0) + size;
184
1.63k
  }
185
  void resetPerTryIdleTimer();
186
  void onPerTryTimeout();
187
  void onPerTryIdleTimeout();
188
  void upstreamLog(AccessLog::AccessLogType access_log_type);
189
  void resetUpstreamLogFlushTimer();
190
191
  RouterFilterInterface& parent_;
192
  std::unique_ptr<GenericConnPool> conn_pool_;
193
  Event::TimerPtr per_try_timeout_;
194
  Event::TimerPtr per_try_idle_timeout_;
195
  std::unique_ptr<GenericUpstream> upstream_;
196
  absl::optional<Http::StreamResetReason> deferred_reset_reason_;
197
  Upstream::HostDescriptionConstSharedPtr upstream_host_;
198
  DownstreamWatermarkManager downstream_watermark_manager_{*this};
199
  Tracing::SpanPtr span_;
200
  StreamInfo::StreamInfoImpl stream_info_;
201
  const MonotonicTime start_time_;
202
  // This is wrapped in an optional, since we want to avoid computing zero size headers when in
203
  // reality we just didn't get a response back.
204
  absl::optional<uint64_t> response_headers_size_{};
205
  // Copies of upstream headers/trailers. These are only set if upstream
206
  // access logging is configured.
207
  Http::ResponseHeaderMapPtr upstream_headers_;
208
  Http::ResponseTrailerMapPtr upstream_trailers_;
209
  OptRef<UpstreamToDownstream> upstream_interface_;
210
  std::list<Http::UpstreamCallbacks*> upstream_callbacks_;
211
212
  Event::TimerPtr max_stream_duration_timer_;
213
214
  // Per-stream access log flush duration. This timer is enabled once when the stream is created
215
  // and will log to all access logs once per trigger.
216
  Event::TimerPtr upstream_log_flush_timer_;
217
218
  std::unique_ptr<UpstreamRequestFilterManagerCallbacks> filter_manager_callbacks_;
219
  std::unique_ptr<Http::FilterManager> filter_manager_;
220
221
  // The number of outstanding readDisable to be called with parameter value true.
222
  // When downstream send buffers get above high watermark before response headers arrive, we
223
  // increment this counter instead of immediately calling readDisable on upstream stream. This is
224
  // to avoid the upstream request from being spuriously retried or reset because of upstream
225
  // timeouts while upstream stream is readDisabled by downstream but the response has actually
226
  // arrived from upstream. See https://github.com/envoyproxy/envoy/issues/25901. During the
227
  // deferring period, if the downstream buffer gets below low watermark, this counter gets
228
  // decremented. Once the response headers arrive, call readDisable the number of times as the
229
  // remaining value of this counter.
230
  size_t deferred_read_disabling_count_{0};
231
232
  // Keep small members (bools and enums) at the end of class, to reduce alignment overhead.
233
  // Tracks the number of times the flow of data from downstream has been disabled.
234
  uint32_t downstream_data_disabled_{};
235
  bool calling_encode_headers_ : 1;
236
  bool upstream_canary_ : 1;
237
  bool router_sent_end_stream_ : 1;
238
  bool encode_trailers_ : 1;
239
  bool retried_ : 1;
240
  bool awaiting_headers_ : 1;
241
  bool outlier_detection_timeout_recorded_ : 1;
242
  // Tracks whether we deferred a per try timeout because the downstream request
243
  // had not been completed yet.
244
  bool create_per_try_timeout_on_request_complete_ : 1;
245
  // True if the CONNECT headers have been sent but proxying payload is paused
246
  // waiting for response headers.
247
  bool paused_for_connect_ : 1;
248
  bool paused_for_websocket_ : 1;
249
  bool reset_stream_ : 1;
250
251
  // Sentinel to indicate if timeout budget tracking is configured for the cluster,
252
  // and if so, if the per-try histogram should record a value.
253
  bool record_timeout_budget_ : 1;
254
  // Track if one time clean up has been performed.
255
  bool cleaned_up_ : 1;
256
  bool had_upstream_ : 1;
257
  Http::ConnectionPool::Instance::StreamOptions stream_options_;
258
  bool grpc_rq_success_deferred_ : 1;
259
  bool enable_half_close_ : 1;
260
};
261
262
class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallbacks,
263
                                              public Event::DeferredDeletable,
264
                                              public Http::UpstreamStreamFilterCallbacks {
265
public:
266
  UpstreamRequestFilterManagerCallbacks(UpstreamRequest& upstream_request)
267
1.81k
      : upstream_request_(upstream_request) {}
268
1.63k
  void encodeHeaders(Http::ResponseHeaderMap&, bool end_stream) override {
269
1.63k
    upstream_request_.decodeHeaders(std::move(response_headers_), end_stream);
270
1.63k
  }
271
1
  void encode1xxHeaders(Http::ResponseHeaderMap&) override {
272
1
    upstream_request_.decode1xxHeaders(std::move(informational_headers_));
273
1
  }
274
3.11k
  void encodeData(Buffer::Instance& data, bool end_stream) override {
275
3.11k
    upstream_request_.decodeData(data, end_stream);
276
3.11k
  }
277
1
  void encodeTrailers(Http::ResponseTrailerMap&) override {
278
1
    upstream_request_.decodeTrailers(std::move(response_trailers_));
279
1
  }
280
293
  void encodeMetadata(Http::MetadataMapPtr&& metadata) override {
281
293
    upstream_request_.decodeMetadata(std::move(metadata));
282
293
  }
283
0
  void setRequestTrailers(Http::RequestTrailerMapPtr&& request_trailers) override {
284
0
    trailers_ = std::move(request_trailers);
285
0
  }
286
1
  void setInformationalHeaders(Http::ResponseHeaderMapPtr&& response_headers) override {
287
1
    informational_headers_ = std::move(response_headers);
288
1
  }
289
1.63k
  void setResponseHeaders(Http::ResponseHeaderMapPtr&& response_headers) override {
290
1.63k
    response_headers_ = std::move(response_headers);
291
1.63k
  }
292
1
  void setResponseTrailers(Http::ResponseTrailerMapPtr&& response_trailers) override {
293
1
    response_trailers_ = std::move(response_trailers);
294
1
  }
295
  Http::RequestHeaderMapOptRef requestHeaders() override;
296
  Http::RequestTrailerMapOptRef requestTrailers() override;
297
1
  Http::ResponseHeaderMapOptRef informationalHeaders() override {
298
1
    if (informational_headers_) {
299
1
      return {*informational_headers_};
300
1
    }
301
0
    return {};
302
1
  }
303
1.63k
  Http::ResponseHeaderMapOptRef responseHeaders() override {
304
1.63k
    if (response_headers_) {
305
1.63k
      return {*response_headers_};
306
1.63k
    }
307
0
    return {};
308
1.63k
  }
309
3.11k
  Http::ResponseTrailerMapOptRef responseTrailers() override {
310
3.11k
    if (response_trailers_) {
311
1
      return {*response_trailers_};
312
1
    }
313
3.11k
    return {};
314
3.11k
  }
315
  // If the filter manager determines a decoder filter has available, tell
316
  // the router to resume the flow of data from downstream.
317
0
  void onDecoderFilterBelowWriteBufferLowWatermark() override {
318
0
    upstream_request_.onBelowWriteBufferLowWatermark();
319
0
  }
320
  // If the filter manager determines a decoder filter has too much data, tell
321
  // the router to stop the flow of data from downstream.
322
0
  void onDecoderFilterAboveWriteBufferHighWatermark() override {
323
0
    upstream_request_.onAboveWriteBufferHighWatermark();
324
0
  }
325
326
  // These functions are delegated to the downstream HCM/FM
327
  OptRef<const Tracing::Config> tracingConfig() const override;
328
  const ScopeTrackedObject& scope() override;
329
  Tracing::Span& activeSpan() override;
330
  void resetStream(Http::StreamResetReason reset_reason,
331
                   absl::string_view transport_failure_reason) override;
332
  Upstream::ClusterInfoConstSharedPtr clusterInfo() override;
333
  Http::Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override;
334
335
  // Intentional no-op functions.
336
0
  void onResponseDataTooLarge() override {}
337
0
  void onRequestDataTooLarge() override {}
338
902
  void endStream() override {}
339
3.52k
  void disarmRequestTimeout() override {}
340
8.73k
  void resetIdleTimer() override {}
341
0
  void onLocalReply(Http::Code) override {}
342
  // Upgrade filter chains not supported.
343
0
  const Router::RouteEntry::UpgradeMap* upgradeMap() override { return nullptr; }
344
345
  // Unsupported functions.
346
0
  void recreateStream(StreamInfo::FilterStateSharedPtr) override {
347
0
    IS_ENVOY_BUG("recreateStream called from upstream HTTP filter");
348
0
  }
349
0
  void upgradeFilterChainCreated() override {
350
0
    IS_ENVOY_BUG("upgradeFilterChainCreated called from upstream HTTP filter");
351
0
  }
352
34.1k
  OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {*this}; }
353
354
  // Http::UpstreamStreamFilterCallbacks
355
8.83k
  StreamInfo::StreamInfo& upstreamStreamInfo() override { return upstream_request_.streamInfo(); }
356
7.44k
  OptRef<GenericUpstream> upstream() override {
357
7.44k
    return makeOptRefFromPtr(upstream_request_.upstream_.get());
358
7.44k
  }
359
0
  void dumpState(std::ostream& os, int indent_level = 0) const override {
360
0
    upstream_request_.dumpState(os, indent_level);
361
0
  }
362
6.35k
  bool pausedForConnect() const override { return upstream_request_.paused_for_connect_; }
363
0
  void setPausedForConnect(bool value) override { upstream_request_.paused_for_connect_ = value; }
364
365
3.40k
  bool pausedForWebsocketUpgrade() const override {
366
3.40k
    return upstream_request_.paused_for_websocket_;
367
3.40k
  }
368
0
  void setPausedForWebsocketUpgrade(bool value) override {
369
0
    upstream_request_.paused_for_websocket_ = value;
370
0
  }
371
372
1.81k
  const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override {
373
1.81k
    return upstream_request_.upstreamStreamOptions();
374
1.81k
  }
375
1.81k
  void addUpstreamCallbacks(Http::UpstreamCallbacks& callbacks) override {
376
1.81k
    upstream_request_.upstream_callbacks_.push_back(&callbacks);
377
1.81k
  }
378
1.81k
  void setUpstreamToDownstream(UpstreamToDownstream& upstream_to_downstream_interface) override {
379
1.81k
    upstream_request_.upstream_interface_ = upstream_to_downstream_interface;
380
1.81k
  }
381
3.42k
  bool isHalfCloseEnabled() override { return upstream_request_.enable_half_close_; }
382
  Http::RequestTrailerMapPtr trailers_;
383
  Http::ResponseHeaderMapPtr informational_headers_;
384
  Http::ResponseHeaderMapPtr response_headers_;
385
  Http::ResponseTrailerMapPtr response_trailers_;
386
  UpstreamRequest& upstream_request_;
387
};
388
389
} // namespace Router
390
} // namespace Envoy