Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/router/upstream_request.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8
9
#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h"
10
#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.validate.h"
11
#include "envoy/http/codec.h"
12
#include "envoy/http/codes.h"
13
#include "envoy/http/conn_pool.h"
14
#include "envoy/http/filter.h"
15
#include "envoy/stats/scope.h"
16
#include "envoy/tcp/conn_pool.h"
17
18
#include "source/common/buffer/watermark_buffer.h"
19
#include "source/common/common/cleanup.h"
20
#include "source/common/common/hash.h"
21
#include "source/common/common/hex.h"
22
#include "source/common/common/linked_object.h"
23
#include "source/common/common/logger.h"
24
#include "source/common/config/well_known_names.h"
25
#include "source/common/http/filter_manager.h"
26
#include "source/common/stream_info/stream_info_impl.h"
27
#include "source/common/tracing/null_span_impl.h"
28
#include "source/extensions/filters/http/common/factory_base.h"
29
30
namespace Envoy {
31
namespace Router {
32
33
class GenericUpstream;
34
class GenericConnectionPoolCallbacks;
35
class RouterFilterInterface;
36
class UpstreamRequest;
37
class UpstreamRequestFilterManagerCallbacks;
38
class UpstreamFilterManager;
39
class UpstreamCodecFilter;
40
41
/* The Upstream request is the base class for forwarding HTTP upstream.
42
 *
43
 * On the new request path, payload (headers/body/metadata/data) still arrives via
44
 * the accept[X]fromRouter functions. Said data is immediately passed off to the
45
 * UpstreamFilterManager, which passes each item through the filter chain until
46
 * it arrives at the last filter in the chain, the UpstreamCodecFilter. If the upstream
47
 * stream is not established, the UpstreamCodecFilter returns StopAllIteration, and the
48
 * FilterManager will buffer data, using watermarks to push back to the router
49
 * filter if buffers become overrun. When an upstream connection is established,
50
 * the UpstreamCodecFilter will send data upstream.
51
 *
52
 * On the new response path, payload arrives from upstream via the UpstreamCodecFilter's
53
 * CodecBridge. It is passed off directly to the FilterManager, traverses the
54
 * filter chain, and completion is signaled via the
55
 * UpstreamRequestFilterManagerCallbacks's encode[X] functions. These somewhat
56
 * confusingly pass through the UpstreamRequest's legacy decode[X] functions
57
 * (required due to the UpstreamToDownstream interface, but will be renamed once
58
 * the classic mode is deprecated), and are finally passed to the router via the
59
 * RouterFilterInterface onUpstream[X] functions.
60
 *
61
 * There is some required communication between the UpstreamRequest and
62
 * UpstreamCodecFilter. This is accomplished via the UpstreamStreamFilterCallbacks
63
 * interface, with the UpstreamFilterManager acting as intermediary.
64
 */
65
class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
66
                        public UpstreamToDownstream,
67
                        public LinkedObject<UpstreamRequest>,
68
                        public GenericConnectionPoolCallbacks,
69
                        public Event::DeferredDeletable {
70
public:
71
  UpstreamRequest(RouterFilterInterface& parent, std::unique_ptr<GenericConnPool>&& conn_pool,
72
                  bool can_send_early_data, bool can_use_http3);
73
  ~UpstreamRequest() override;
74
2.40k
  void deleteIsPending() override { cleanUp(); }
75
76
  // To be called from the destructor, or prior to deferred delete.
77
  void cleanUp();
78
79
  void acceptHeadersFromRouter(bool end_stream);
80
  void acceptDataFromRouter(Buffer::Instance& data, bool end_stream);
81
  void acceptTrailersFromRouter(Http::RequestTrailerMap& trailers);
82
  void acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr);
83
84
  void resetStream();
85
  void setupPerTryTimeout();
86
  void maybeEndDecode(bool end_stream);
87
  void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host, bool pool_success);
88
89
  // Http::StreamDecoder
90
  void decodeData(Buffer::Instance& data, bool end_stream) override;
91
  void decodeMetadata(Http::MetadataMapPtr&& metadata_map) override;
92
93
  // UpstreamToDownstream (Http::ResponseDecoder)
94
  void decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) override;
95
  void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override;
96
  void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override;
97
  void dumpState(std::ostream& os, int indent_level) const override;
98
99
  // UpstreamToDownstream (Http::StreamCallbacks)
100
  void onResetStream(Http::StreamResetReason reason,
101
                     absl::string_view transport_failure_reason) override;
102
0
  void onAboveWriteBufferHighWatermark() override { disableDataFromDownstreamForFlowControl(); }
103
0
  void onBelowWriteBufferLowWatermark() override { enableDataFromDownstreamForFlowControl(); }
104
  // UpstreamToDownstream
105
  const Route& route() const override;
106
  OptRef<const Network::Connection> connection() const override;
107
2.39k
  const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override {
108
2.39k
    return stream_options_;
109
2.39k
  }
110
111
  void disableDataFromDownstreamForFlowControl();
112
  void enableDataFromDownstreamForFlowControl();
113
114
  // GenericConnPool
115
  void onPoolFailure(ConnectionPool::PoolFailureReason reason,
116
                     absl::string_view transport_failure_reason,
117
                     Upstream::HostDescriptionConstSharedPtr host) override;
118
  void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
119
                   Upstream::HostDescriptionConstSharedPtr host,
120
                   const Network::ConnectionInfoProvider& address_provider,
121
                   StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) override;
122
  UpstreamToDownstream& upstreamToDownstream() override;
123
124
  void clearRequestEncoder();
125
  void onStreamMaxDurationReached();
126
127
  // Either disable upstream reading immediately or defer it and keep tracking
128
  // of how many read disabling has happened.
129
  void readDisableOrDefer(bool disable);
130
  // Called upon receiving the first response headers from the upstream. And
131
  // applies read disabling to it if there is any pending read disabling.
132
  void maybeHandleDeferredReadDisable();
133
134
  struct DownstreamWatermarkManager : public Http::DownstreamWatermarkCallbacks {
135
2.40k
    DownstreamWatermarkManager(UpstreamRequest& parent) : parent_(parent) {}
136
137
    // Http::DownstreamWatermarkCallbacks
138
    void onBelowWriteBufferLowWatermark() override;
139
    void onAboveWriteBufferHighWatermark() override;
140
141
    UpstreamRequest& parent_;
142
  };
143
144
  void readEnable();
145
  void encodeBodyAndTrailers();
146
147
  // Getters and setters
148
10.2k
  Upstream::HostDescriptionConstSharedPtr& upstreamHost() { return upstream_host_; }
149
0
  void outlierDetectionTimeoutRecorded(bool recorded) {
150
0
    outlier_detection_timeout_recorded_ = recorded;
151
0
  }
152
0
  bool outlierDetectionTimeoutRecorded() { return outlier_detection_timeout_recorded_; }
153
0
  void retried(bool value) { retried_ = value; }
154
0
  bool retried() { return retried_; }
155
1.08k
  bool grpcRqSuccessDeferred() { return grpc_rq_success_deferred_; }
156
1.02k
  void grpcRqSuccessDeferred(bool deferred) { grpc_rq_success_deferred_ = deferred; }
157
2.22k
  void upstreamCanary(bool value) { upstream_canary_ = value; }
158
1.08k
  bool upstreamCanary() { return upstream_canary_; }
159
0
  bool awaitingHeaders() { return awaiting_headers_; }
160
0
  void recordTimeoutBudget(bool value) { record_timeout_budget_ = value; }
161
2.21k
  bool createPerTryTimeoutOnRequestComplete() {
162
2.21k
    return create_per_try_timeout_on_request_complete_;
163
2.21k
  }
164
0
  bool encodeComplete() const { return router_sent_end_stream_; }
165
  // Exposes streamInfo for the upstream stream.
166
16.7k
  StreamInfo::StreamInfo& streamInfo() { return stream_info_; }
167
0
  bool hadUpstream() const { return had_upstream_; }
168
169
private:
170
  friend class UpstreamFilterManager;
171
  friend class UpstreamCodecFilter;
172
  friend class UpstreamRequestFilterManagerCallbacks;
173
11.6k
  StreamInfo::UpstreamTiming& upstreamTiming() {
174
11.6k
    return stream_info_.upstreamInfo()->upstreamTiming();
175
11.6k
  }
176
  // Records the latency from when the upstream request was first created to
177
  // when the pool callback fires. This latency can be useful to track excessive
178
  // queuing.
179
  void recordConnectionPoolCallbackLatency();
180
181
2.22k
  void addResponseHeadersSize(uint64_t size) {
182
2.22k
    response_headers_size_ = response_headers_size_.value_or(0) + size;
183
2.22k
  }
184
  void resetPerTryIdleTimer();
185
  void onPerTryTimeout();
186
  void onPerTryIdleTimeout();
187
  void upstreamLog(AccessLog::AccessLogType access_log_type);
188
  void resetUpstreamLogFlushTimer();
189
190
  RouterFilterInterface& parent_;
191
  std::unique_ptr<GenericConnPool> conn_pool_;
192
  Event::TimerPtr per_try_timeout_;
193
  Event::TimerPtr per_try_idle_timeout_;
194
  std::unique_ptr<GenericUpstream> upstream_;
195
  absl::optional<Http::StreamResetReason> deferred_reset_reason_;
196
  Upstream::HostDescriptionConstSharedPtr upstream_host_;
197
  DownstreamWatermarkManager downstream_watermark_manager_{*this};
198
  Tracing::SpanPtr span_;
199
  StreamInfo::StreamInfoImpl stream_info_;
200
  const MonotonicTime start_time_;
201
  // This is wrapped in an optional, since we want to avoid computing zero size headers when in
202
  // reality we just didn't get a response back.
203
  absl::optional<uint64_t> response_headers_size_{};
204
  // Copies of upstream headers/trailers. These are only set if upstream
205
  // access logging is configured.
206
  Http::ResponseHeaderMapPtr upstream_headers_;
207
  Http::ResponseTrailerMapPtr upstream_trailers_;
208
  OptRef<UpstreamToDownstream> upstream_interface_;
209
  std::list<Http::UpstreamCallbacks*> upstream_callbacks_;
210
211
  Event::TimerPtr max_stream_duration_timer_;
212
213
  // Per-stream access log flush duration. This timer is enabled once when the stream is created
214
  // and will log to all access logs once per trigger.
215
  Event::TimerPtr upstream_log_flush_timer_;
216
217
  std::unique_ptr<UpstreamRequestFilterManagerCallbacks> filter_manager_callbacks_;
218
  std::unique_ptr<Http::FilterManager> filter_manager_;
219
220
  // The number of outstanding readDisable to be called with parameter value true.
221
  // When downstream send buffers get above high watermark before response headers arrive, we
222
  // increment this counter instead of immediately calling readDisable on upstream stream. This is
223
  // to avoid the upstream request from being spuriously retried or reset because of upstream
224
  // timeouts while upstream stream is readDisabled by downstream but the response has actually
225
  // arrived from upstream. See https://github.com/envoyproxy/envoy/issues/25901. During the
226
  // deferring period, if the downstream buffer gets below low watermark, this counter gets
227
  // decremented. Once the response headers arrive, call readDisable the number of times as the
228
  // remaining value of this counter.
229
  size_t deferred_read_disabling_count_{0};
230
231
  // Keep small members (bools and enums) at the end of class, to reduce alignment overhead.
232
  // Tracks the number of times the flow of data from downstream has been disabled.
233
  uint32_t downstream_data_disabled_{};
234
  bool calling_encode_headers_ : 1;
235
  bool upstream_canary_ : 1;
236
  bool router_sent_end_stream_ : 1;
237
  bool encode_trailers_ : 1;
238
  bool retried_ : 1;
239
  bool awaiting_headers_ : 1;
240
  bool outlier_detection_timeout_recorded_ : 1;
241
  // Tracks whether we deferred a per try timeout because the downstream request
242
  // had not been completed yet.
243
  bool create_per_try_timeout_on_request_complete_ : 1;
244
  // True if the CONNECT headers have been sent but proxying payload is paused
245
  // waiting for response headers.
246
  bool paused_for_connect_ : 1;
247
  bool reset_stream_ : 1;
248
249
  // Sentinel to indicate if timeout budget tracking is configured for the cluster,
250
  // and if so, if the per-try histogram should record a value.
251
  bool record_timeout_budget_ : 1;
252
  // Track if one time clean up has been performed.
253
  bool cleaned_up_ : 1;
254
  bool had_upstream_ : 1;
255
  Http::ConnectionPool::Instance::StreamOptions stream_options_;
256
  bool grpc_rq_success_deferred_ : 1;
257
  bool upstream_wait_for_response_headers_before_disabling_read_ : 1;
258
};
259
260
class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallbacks,
261
                                              public Event::DeferredDeletable,
262
                                              public Http::UpstreamStreamFilterCallbacks {
263
public:
264
  UpstreamRequestFilterManagerCallbacks(UpstreamRequest& upstream_request)
265
2.40k
      : upstream_request_(upstream_request) {}
266
2.22k
  void encodeHeaders(Http::ResponseHeaderMap&, bool end_stream) override {
267
2.22k
    upstream_request_.decodeHeaders(std::move(response_headers_), end_stream);
268
2.22k
  }
269
1
  void encode1xxHeaders(Http::ResponseHeaderMap&) override {
270
1
    upstream_request_.decode1xxHeaders(std::move(informational_headers_));
271
1
  }
272
4.28k
  void encodeData(Buffer::Instance& data, bool end_stream) override {
273
4.28k
    upstream_request_.decodeData(data, end_stream);
274
4.28k
  }
275
3
  void encodeTrailers(Http::ResponseTrailerMap&) override {
276
3
    upstream_request_.decodeTrailers(std::move(response_trailers_));
277
3
  }
278
181
  void encodeMetadata(Http::MetadataMapPtr&& metadata) override {
279
181
    upstream_request_.decodeMetadata(std::move(metadata));
280
181
  }
281
0
  void setRequestTrailers(Http::RequestTrailerMapPtr&& request_trailers) override {
282
0
    trailers_ = std::move(request_trailers);
283
0
  }
284
1
  void setInformationalHeaders(Http::ResponseHeaderMapPtr&& response_headers) override {
285
1
    informational_headers_ = std::move(response_headers);
286
1
  }
287
2.22k
  void setResponseHeaders(Http::ResponseHeaderMapPtr&& response_headers) override {
288
2.22k
    response_headers_ = std::move(response_headers);
289
2.22k
  }
290
3
  void setResponseTrailers(Http::ResponseTrailerMapPtr&& response_trailers) override {
291
3
    response_trailers_ = std::move(response_trailers);
292
3
  }
293
  Http::RequestHeaderMapOptRef requestHeaders() override;
294
  Http::RequestTrailerMapOptRef requestTrailers() override;
295
1
  Http::ResponseHeaderMapOptRef informationalHeaders() override {
296
1
    if (informational_headers_) {
297
1
      return {*informational_headers_};
298
1
    }
299
0
    return {};
300
1
  }
301
2.22k
  Http::ResponseHeaderMapOptRef responseHeaders() override {
302
2.22k
    if (response_headers_) {
303
2.22k
      return {*response_headers_};
304
2.22k
    }
305
0
    return {};
306
2.22k
  }
307
4.29k
  Http::ResponseTrailerMapOptRef responseTrailers() override {
308
4.29k
    if (response_trailers_) {
309
3
      return {*response_trailers_};
310
3
    }
311
4.28k
    return {};
312
4.29k
  }
313
  // If the filter manager determines a decoder filter has available, tell
314
  // the router to resume the flow of data from downstream.
315
0
  void onDecoderFilterBelowWriteBufferLowWatermark() override {
316
0
    upstream_request_.onBelowWriteBufferLowWatermark();
317
0
  }
318
  // If the filter manager determines a decoder filter has too much data, tell
319
  // the router to stop the flow of data from downstream.
320
0
  void onDecoderFilterAboveWriteBufferHighWatermark() override {
321
0
    upstream_request_.onAboveWriteBufferHighWatermark();
322
0
  }
323
324
  // These functions are delegated to the downstream HCM/FM
325
  OptRef<const Tracing::Config> tracingConfig() const override;
326
  const ScopeTrackedObject& scope() override;
327
  Tracing::Span& activeSpan() override;
328
  void resetStream(Http::StreamResetReason reset_reason,
329
                   absl::string_view transport_failure_reason) override;
330
  Upstream::ClusterInfoConstSharedPtr clusterInfo() override;
331
  Http::Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override;
332
333
  // Intentional no-op functions.
334
0
  void onResponseDataTooLarge() override {}
335
0
  void onRequestDataTooLarge() override {}
336
1.18k
  void endStream() override {}
337
4.85k
  void disarmRequestTimeout() override {}
338
12.1k
  void resetIdleTimer() override {}
339
0
  void onLocalReply(Http::Code) override {}
340
  // Upgrade filter chains not supported.
341
0
  const Router::RouteEntry::UpgradeMap* upgradeMap() override { return nullptr; }
342
343
  // Unsupported functions.
344
0
  void recreateStream(StreamInfo::FilterStateSharedPtr) override {
345
0
    IS_ENVOY_BUG("recreateStream called from upstream HTTP filter");
346
0
  }
347
0
  void upgradeFilterChainCreated() override {
348
0
    IS_ENVOY_BUG("upgradeFilterChainCreated called from upstream HTTP filter");
349
0
  }
350
42.4k
  OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {*this}; }
351
352
  // Http::UpstreamStreamFilterCallbacks
353
12.2k
  StreamInfo::StreamInfo& upstreamStreamInfo() override { return upstream_request_.streamInfo(); }
354
10.3k
  OptRef<GenericUpstream> upstream() override {
355
10.3k
    return makeOptRefFromPtr(upstream_request_.upstream_.get());
356
10.3k
  }
357
0
  void dumpState(std::ostream& os, int indent_level = 0) const override {
358
0
    upstream_request_.dumpState(os, indent_level);
359
0
  }
360
8.91k
  bool pausedForConnect() const override { return upstream_request_.paused_for_connect_; }
361
0
  void setPausedForConnect(bool value) override { upstream_request_.paused_for_connect_ = value; }
362
2.39k
  const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override {
363
2.39k
    return upstream_request_.upstreamStreamOptions();
364
2.39k
  }
365
2.40k
  void addUpstreamCallbacks(Http::UpstreamCallbacks& callbacks) override {
366
2.40k
    upstream_request_.upstream_callbacks_.push_back(&callbacks);
367
2.40k
  }
368
2.40k
  void setUpstreamToDownstream(UpstreamToDownstream& upstream_to_downstream_interface) override {
369
2.40k
    upstream_request_.upstream_interface_ = upstream_to_downstream_interface;
370
2.40k
  }
371
372
  Http::RequestTrailerMapPtr trailers_;
373
  Http::ResponseHeaderMapPtr informational_headers_;
374
  Http::ResponseHeaderMapPtr response_headers_;
375
  Http::ResponseTrailerMapPtr response_trailers_;
376
  UpstreamRequest& upstream_request_;
377
};
378
379
} // namespace Router
380
} // namespace Envoy