1
#pragma once
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8

            
9
#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h"
10
#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.validate.h"
11
#include "envoy/http/codec.h"
12
#include "envoy/http/codes.h"
13
#include "envoy/http/conn_pool.h"
14
#include "envoy/http/filter.h"
15
#include "envoy/stats/scope.h"
16
#include "envoy/tcp/conn_pool.h"
17

            
18
#include "source/common/buffer/watermark_buffer.h"
19
#include "source/common/common/cleanup.h"
20
#include "source/common/common/hash.h"
21
#include "source/common/common/hex.h"
22
#include "source/common/common/linked_object.h"
23
#include "source/common/common/logger.h"
24
#include "source/common/config/well_known_names.h"
25
#include "source/common/http/filter_manager.h"
26
#include "source/common/router/upstream_to_downstream_impl_base.h"
27
#include "source/common/stream_info/stream_info_impl.h"
28
#include "source/common/tracing/null_span_impl.h"
29
#include "source/extensions/filters/http/common/factory_base.h"
30

            
31
namespace Envoy {
32
namespace Router {
33

            
34
class GenericUpstream;
35
class GenericConnectionPoolCallbacks;
36
class RouterFilterInterface;
37
class UpstreamRequest;
38
class UpstreamRequestFilterManagerCallbacks;
39
class UpstreamFilterManager;
40
class UpstreamCodecFilter;
41

            
42
/* The Upstream request is the base class for forwarding HTTP upstream.
43
 *
44
 * On the new request path, payload (headers/body/metadata/data) still arrives via
45
 * the accept[X]fromRouter functions. Said data is immediately passed off to the
46
 * UpstreamFilterManager, which passes each item through the filter chain until
47
 * it arrives at the last filter in the chain, the UpstreamCodecFilter. If the upstream
48
 * stream is not established, the UpstreamCodecFilter returns StopAllIteration, and the
49
 * FilterManager will buffer data, using watermarks to push back to the router
50
 * filter if buffers become overrun. When an upstream connection is established,
51
 * the UpstreamCodecFilter will send data upstream.
52
 *
53
 * On the new response path, payload arrives from upstream via the UpstreamCodecFilter's
54
 * CodecBridge. It is passed off directly to the FilterManager, traverses the
55
 * filter chain, and completion is signaled via the
56
 * UpstreamRequestFilterManagerCallbacks's encode[X] functions. These somewhat
57
 * confusingly pass through the UpstreamRequest's legacy decode[X] functions
58
 * (required due to the UpstreamToDownstream interface, but will be renamed once
59
 * the classic mode is deprecated), and are finally passed to the router via the
60
 * RouterFilterInterface onUpstream[X] functions.
61
 *
62
 * There is some required communication between the UpstreamRequest and
63
 * UpstreamCodecFilter. This is accomplished via the UpstreamStreamFilterCallbacks
64
 * interface, with the UpstreamFilterManager acting as intermediary.
65
 *
66
 */
67
class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
68
                        public UpstreamToDownstreamImplBase,
69
                        public LinkedObject<UpstreamRequest>,
70
                        public GenericConnectionPoolCallbacks,
71
                        public Event::DeferredDeletable {
72
public:
73
  UpstreamRequest(RouterFilterInterface& parent, std::unique_ptr<GenericConnPool>&& conn_pool,
74
                  bool can_send_early_data, bool can_use_http3, bool enable_half_close);
75
  ~UpstreamRequest() override;
76
46795
  void deleteIsPending() override { cleanUp(); }
77

            
78
  // To be called from the destructor, or prior to deferred delete.
79
  void cleanUp();
80

            
81
  virtual void acceptHeadersFromRouter(bool end_stream);
82
  virtual void acceptDataFromRouter(Buffer::Instance& data, bool end_stream);
83
  void acceptTrailersFromRouter(Http::RequestTrailerMap& trailers);
84
  void acceptMetadataFromRouter(Http::MetadataMapPtr&& metadata_map_ptr);
85

            
86
  virtual void resetStream();
87
  void setupPerTryTimeout();
88
  void maybeEndDecode(bool end_stream);
89
  void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host, bool pool_success);
90

            
91
  // Http::StreamDecoder
92
  void decodeData(Buffer::Instance& data, bool end_stream) override;
93
  void decodeMetadata(Http::MetadataMapPtr&& metadata_map) override;
94

            
95
  // UpstreamToDownstream (Http::ResponseDecoder)
96
  void decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) override;
97
  void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override;
98
  void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override;
99
  void dumpState(std::ostream& os, int indent_level) const override;
100

            
101
  // UpstreamToDownstream (Http::StreamCallbacks)
102
  void onResetStream(Http::StreamResetReason reason,
103
                     absl::string_view transport_failure_reason) override;
104
229252
  void onAboveWriteBufferHighWatermark() override { disableDataFromDownstreamForFlowControl(); }
105
229223
  void onBelowWriteBufferLowWatermark() override { enableDataFromDownstreamForFlowControl(); }
106
  // UpstreamToDownstream
107
  const Route& route() const override;
108
  OptRef<const Network::Connection> connection() const override;
109
53433
  const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override {
110
53433
    return stream_options_;
111
53433
  }
112

            
113
  void disableDataFromDownstreamForFlowControl();
114
  void enableDataFromDownstreamForFlowControl();
115

            
116
  // GenericConnPool
117
  void onPoolFailure(ConnectionPool::PoolFailureReason reason,
118
                     absl::string_view transport_failure_reason,
119
                     Upstream::HostDescriptionConstSharedPtr host) override;
120
  void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
121
                   Upstream::HostDescriptionConstSharedPtr host,
122
                   const Network::ConnectionInfoProvider& address_provider,
123
                   StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) override;
124
  UpstreamToDownstream& upstreamToDownstream() override;
125

            
126
  void clearRequestEncoder();
127
  void onStreamMaxDurationReached();
128

            
129
  // Either disable upstream reading immediately or defer it and keep tracking
130
  // of how many read disabling has happened.
131
  void readDisableOrDefer(bool disable);
132
  // Called upon receiving the first response headers from the upstream. And
133
  // applies read disabling to it if there is any pending read disabling.
134
  void maybeHandleDeferredReadDisable();
135

            
136
  struct DownstreamWatermarkManager : public Http::DownstreamWatermarkCallbacks {
137
47497
    DownstreamWatermarkManager(UpstreamRequest& parent) : parent_(parent) {}
138

            
139
    // Http::DownstreamWatermarkCallbacks
140
    void onBelowWriteBufferLowWatermark() override;
141
    void onAboveWriteBufferHighWatermark() override;
142

            
143
    UpstreamRequest& parent_;
144
  };
145

            
146
  void readEnable();
147

            
148
  // Getters and setters
149
277581
  Upstream::HostDescriptionOptConstRef upstreamHost() {
150
277581
    return makeOptRefFromPtr(upstream_host_.get());
151
277581
  }
152
18
  void outlierDetectionTimeoutRecorded(bool recorded) {
153
18
    outlier_detection_timeout_recorded_ = recorded;
154
18
  }
155
159
  bool outlierDetectionTimeoutRecorded() { return outlier_detection_timeout_recorded_; }
156
18
  void retried(bool value) { retried_ = value; }
157
3189
  bool retried() { return retried_; }
158
14416
  bool grpcRqSuccessDeferred() { return grpc_rq_success_deferred_; }
159
2003
  void grpcRqSuccessDeferred(bool deferred) { grpc_rq_success_deferred_ = deferred; }
160
39445
  void upstreamCanary(bool value) { upstream_canary_ = value; }
161
35894
  bool upstreamCanary() { return upstream_canary_; }
162
183
  bool awaitingHeaders() { return awaiting_headers_; }
163
14
  void recordTimeoutBudget(bool value) { record_timeout_budget_ = value; }
164
39253
  bool createPerTryTimeoutOnRequestComplete() {
165
39253
    return create_per_try_timeout_on_request_complete_;
166
39253
  }
167
58
  bool encodeComplete() const { return router_sent_end_stream_; }
168
  // Exposes streamInfo for the upstream stream.
169
454649
  StreamInfo::StreamInfo& streamInfo() { return stream_info_; }
170
169
  bool hadUpstream() const { return had_upstream_; }
171
  // Disable per-try timeouts for websocket upgrades after successful handshake
172
  void disablePerTryTimeoutForWebsocketUpgrade();
173

            
174
private:
175
  friend class UpstreamFilterManager;
176
  friend class UpstreamCodecFilter;
177
  friend class UpstreamRequestFilterManagerCallbacks;
178
196288
  StreamInfo::UpstreamTiming& upstreamTiming() {
179
196288
    return stream_info_.upstreamInfo()->upstreamTiming();
180
196288
  }
181
  // Records the latency from when the upstream request was first created to
182
  // when the pool callback fires. This latency can be useful to track excessive
183
  // queuing.
184
  void recordConnectionPoolCallbackLatency();
185

            
186
40306
  void addResponseHeadersStat(uint64_t size, size_t count) {
187
40306
    response_headers_size_ = response_headers_size_.value_or(0) + size;
188
40306
    response_headers_count_ = response_headers_count_.value_or(0) + count;
189
40306
  }
190
  void resetPerTryIdleTimer();
191
  void onPerTryTimeout();
192
  void onPerTryIdleTimeout();
193
  void upstreamLog(AccessLog::AccessLogType access_log_type);
194
  void resetUpstreamLogFlushTimer();
195

            
196
  RouterFilterInterface& parent_;
197
  std::unique_ptr<GenericConnPool> conn_pool_;
198
  Event::TimerPtr per_try_timeout_;
199
  Event::TimerPtr per_try_idle_timeout_;
200
  std::unique_ptr<GenericUpstream> upstream_;
201
  absl::optional<Http::StreamResetReason> deferred_reset_reason_;
202
  Upstream::HostDescriptionConstSharedPtr upstream_host_;
203
  DownstreamWatermarkManager downstream_watermark_manager_{*this};
204
  Tracing::SpanPtr span_;
205
  StreamInfo::StreamInfoImpl stream_info_;
206
  const MonotonicTime start_time_;
207
  // This is wrapped in an optional, since we want to avoid computing zero size headers when in
208
  // reality we just didn't get a response back.
209
  absl::optional<uint64_t> response_headers_size_;
210
  absl::optional<size_t> response_headers_count_;
211
  // Copies of upstream headers/trailers. These are only set if upstream
212
  // access logging is configured.
213
  Http::ResponseHeaderMapPtr upstream_headers_;
214
  Http::ResponseTrailerMapPtr upstream_trailers_;
215
  OptRef<UpstreamToDownstream> upstream_interface_;
216
  std::list<Http::UpstreamCallbacks*> upstream_callbacks_;
217

            
218
  Event::TimerPtr max_stream_duration_timer_;
219

            
220
  // Per-stream access log flush duration. This timer is enabled once when the stream is created
221
  // and will log to all access logs once per trigger.
222
  Event::TimerPtr upstream_log_flush_timer_;
223

            
224
  std::unique_ptr<UpstreamRequestFilterManagerCallbacks> filter_manager_callbacks_;
225
  std::unique_ptr<Http::FilterManager> filter_manager_;
226

            
227
  // The number of outstanding readDisable to be called with parameter value true.
228
  // When downstream send buffers get above high watermark before response headers arrive, we
229
  // increment this counter instead of immediately calling readDisable on upstream stream. This is
230
  // to avoid the upstream request from being spuriously retried or reset because of upstream
231
  // timeouts while upstream stream is readDisabled by downstream but the response has actually
232
  // arrived from upstream. See https://github.com/envoyproxy/envoy/issues/25901. During the
233
  // deferring period, if the downstream buffer gets below low watermark, this counter gets
234
  // decremented. Once the response headers arrive, call readDisable the number of times as the
235
  // remaining value of this counter.
236
  size_t deferred_read_disabling_count_{0};
237

            
238
  // Keep small members (bools and enums) at the end of class, to reduce alignment overhead.
239
  // Tracks the number of times the flow of data from downstream has been disabled.
240
  uint32_t downstream_data_disabled_{};
241
  bool upstream_canary_ : 1;
242
  bool router_sent_end_stream_ : 1;
243
  bool encode_trailers_ : 1;
244
  bool retried_ : 1;
245
  bool awaiting_headers_ : 1;
246
  bool outlier_detection_timeout_recorded_ : 1;
247
  // Tracks whether we deferred a per try timeout because the downstream request
248
  // had not been completed yet.
249
  bool create_per_try_timeout_on_request_complete_ : 1;
250
  // True if the CONNECT headers have been sent but proxying payload is paused
251
  // waiting for response headers.
252
  bool paused_for_connect_ : 1;
253
  bool paused_for_websocket_ : 1;
254
  bool reset_stream_ : 1;
255

            
256
  // Sentinel to indicate if timeout budget tracking is configured for the cluster,
257
  // and if so, if the per-try histogram should record a value.
258
  bool record_timeout_budget_ : 1;
259
  // Track if one time clean up has been performed.
260
  bool cleaned_up_ : 1;
261
  bool had_upstream_ : 1;
262
  Http::ConnectionPool::Instance::StreamOptions stream_options_;
263
  bool grpc_rq_success_deferred_ : 1;
264
  bool enable_half_close_ : 1;
265
};
266

            
267
class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallbacks,
268
                                              public Event::DeferredDeletable,
269
                                              public Http::UpstreamStreamFilterCallbacks {
270
public:
271
  UpstreamRequestFilterManagerCallbacks(UpstreamRequest& upstream_request)
272
47497
      : upstream_request_(upstream_request) {}
273
40146
  void encodeHeaders(Http::ResponseHeaderMap&, bool end_stream) override {
274
40146
    upstream_request_.decodeHeaders(std::move(response_headers_), end_stream);
275
40146
  }
276
150
  void encode1xxHeaders(Http::ResponseHeaderMap&) override {
277
150
    upstream_request_.decode1xxHeaders(std::move(informational_headers_));
278
150
  }
279
384026
  void encodeData(Buffer::Instance& data, bool end_stream) override {
280
384026
    upstream_request_.decodeData(data, end_stream);
281
384026
  }
282
780
  void encodeTrailers(Http::ResponseTrailerMap&) override {
283
780
    upstream_request_.decodeTrailers(std::move(response_trailers_));
284
780
  }
285
1111
  void encodeMetadata(Http::MetadataMapPtr&& metadata) override {
286
1111
    upstream_request_.decodeMetadata(std::move(metadata));
287
1111
  }
288
6
  void setRequestTrailers(Http::RequestTrailerMapPtr&& request_trailers) override {
289
6
    trailers_ = std::move(request_trailers);
290
6
  }
291
150
  void setInformationalHeaders(Http::ResponseHeaderMapPtr&& response_headers) override {
292
150
    informational_headers_ = std::move(response_headers);
293
150
  }
294
40175
  void setResponseHeaders(Http::ResponseHeaderMapPtr&& response_headers) override {
295
40175
    response_headers_ = std::move(response_headers);
296
40175
  }
297
780
  void setResponseTrailers(Http::ResponseTrailerMapPtr&& response_trailers) override {
298
780
    response_trailers_ = std::move(response_trailers);
299
780
  }
300
  Http::RequestHeaderMapOptRef requestHeaders() override;
301
  Http::RequestTrailerMapOptRef requestTrailers() override;
302
151
  Http::ResponseHeaderMapOptRef informationalHeaders() override {
303
151
    if (informational_headers_) {
304
150
      return {*informational_headers_};
305
150
    }
306
1
    return {};
307
151
  }
308
41101
  Http::ResponseHeaderMapOptRef responseHeaders() override {
309
41101
    if (response_headers_) {
310
40992
      return {*response_headers_};
311
40992
    }
312
109
    return {};
313
41101
  }
314
386990
  Http::ResponseTrailerMapOptRef responseTrailers() override {
315
386990
    if (response_trailers_) {
316
858
      return {*response_trailers_};
317
858
    }
318
386132
    return {};
319
386990
  }
320
  // If the filter manager determines a decoder filter has available, tell
321
  // the router to resume the flow of data from downstream.
322
229222
  void onDecoderFilterBelowWriteBufferLowWatermark() override {
323
229222
    upstream_request_.onBelowWriteBufferLowWatermark();
324
229222
  }
325
  // If the filter manager determines a decoder filter has too much data, tell
326
  // the router to stop the flow of data from downstream.
327
229251
  void onDecoderFilterAboveWriteBufferHighWatermark() override {
328
229251
    upstream_request_.onAboveWriteBufferHighWatermark();
329
229251
  }
330

            
331
  // These functions are delegated to the downstream HCM/FM
332
  OptRef<const Tracing::Config> tracingConfig() const override;
333
  const ScopeTrackedObject& scope() override;
334
  Tracing::Span& activeSpan() override;
335
  void resetStream(Http::StreamResetReason reset_reason,
336
                   absl::string_view transport_failure_reason) override;
337
  Upstream::ClusterInfoConstSharedPtr clusterInfo() override;
338
  Http::Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override;
339

            
340
  // Intentional no-op functions.
341
16
  void onResponseDataTooLarge() override {}
342
27
  void onRequestDataTooLarge() override {}
343
37569
  void endStream() override {}
344
100047
  void disarmRequestTimeout() override {}
345
628687
  void resetIdleTimer() override {}
346
  void onLocalReply(Http::Code) override {}
347
  void sendGoAwayAndClose(bool graceful [[maybe_unused]] = false) override {}
348
  // Upgrade filter chains not supported.
349
  const Router::RouteEntry::UpgradeMap* upgradeMap() override { return nullptr; }
350

            
351
  // Unsupported functions.
352
  void recreateStream(StreamInfo::FilterStateSharedPtr) override {
353
    IS_ENVOY_BUG("recreateStream called from upstream HTTP filter");
354
  }
355
994790
  OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {*this}; }
356

            
357
  // Http::UpstreamStreamFilterCallbacks
358
367837
  StreamInfo::StreamInfo& upstreamStreamInfo() override { return upstream_request_.streamInfo(); }
359
312427
  OptRef<GenericUpstream> upstream() override {
360
312427
    return makeOptRefFromPtr(upstream_request_.upstream_.get());
361
312427
  }
362
1
  void dumpState(std::ostream& os, int indent_level = 0) const override {
363
1
    upstream_request_.dumpState(os, indent_level);
364
1
  }
365
86207
  bool pausedForConnect() const override { return upstream_request_.paused_for_connect_; }
366
348
  void setPausedForConnect(bool value) override { upstream_request_.paused_for_connect_ = value; }
367

            
368
85745
  bool pausedForWebsocketUpgrade() const override {
369
85745
    return upstream_request_.paused_for_websocket_;
370
85745
  }
371
56
  void setPausedForWebsocketUpgrade(bool value) override {
372
56
    upstream_request_.paused_for_websocket_ = value;
373
56
  }
374

            
375
  void disableRouteTimeoutForWebsocketUpgrade() override;
376
  void disablePerTryTimeoutForWebsocketUpgrade() override;
377

            
378
47061
  const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override {
379
47061
    return upstream_request_.upstreamStreamOptions();
380
47061
  }
381
47499
  void addUpstreamCallbacks(Http::UpstreamCallbacks& callbacks) override {
382
47499
    upstream_request_.upstream_callbacks_.push_back(&callbacks);
383
47499
  }
384
47497
  void setUpstreamToDownstream(UpstreamToDownstream& upstream_to_downstream_interface) override {
385
47497
    upstream_request_.upstream_interface_ = upstream_to_downstream_interface;
386
47497
  }
387
114959
  bool isHalfCloseEnabled() override { return upstream_request_.enable_half_close_; }
388
  Http::RequestTrailerMapPtr trailers_;
389
  Http::ResponseHeaderMapPtr informational_headers_;
390
  Http::ResponseHeaderMapPtr response_headers_;
391
  Http::ResponseTrailerMapPtr response_trailers_;
392
  UpstreamRequest& upstream_request_;
393
};
394

            
395
} // namespace Router
396
} // namespace Envoy