1
#pragma once
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <list>
7
#include <map>
8
#include <memory>
9
#include <string>
10
#include <vector>
11

            
12
#include "envoy/buffer/buffer.h"
13
#include "envoy/common/random_generator.h"
14
#include "envoy/common/scope_tracker.h"
15
#include "envoy/config/core/v3/base.pb.h"
16
#include "envoy/config/route/v3/route_components.pb.h"
17
#include "envoy/config/typed_metadata.h"
18
#include "envoy/event/dispatcher.h"
19
#include "envoy/http/async_client.h"
20
#include "envoy/http/codec.h"
21
#include "envoy/http/context.h"
22
#include "envoy/http/filter.h"
23
#include "envoy/http/header_map.h"
24
#include "envoy/http/message.h"
25
#include "envoy/router/context.h"
26
#include "envoy/router/router.h"
27
#include "envoy/router/router_ratelimit.h"
28
#include "envoy/router/shadow_writer.h"
29
#include "envoy/server/filter_config.h"
30
#include "envoy/ssl/connection.h"
31
#include "envoy/tracing/tracer.h"
32
#include "envoy/type/v3/percent.pb.h"
33
#include "envoy/upstream/load_balancer.h"
34
#include "envoy/upstream/upstream.h"
35

            
36
#include "source/common/common/assert.h"
37
#include "source/common/common/empty_string.h"
38
#include "source/common/common/linked_object.h"
39
#include "source/common/http/message_impl.h"
40
#include "source/common/http/null_route_impl.h"
41
#include "source/common/local_reply/local_reply.h"
42
#include "source/common/router/config_impl.h"
43
#include "source/common/router/router.h"
44
#include "source/common/stream_info/stream_info_impl.h"
45
#include "source/common/tracing/http_tracer_impl.h"
46
#include "source/common/upstream/retry_factory.h"
47
#include "source/extensions/early_data/default_early_data_policy.h"
48

            
49
namespace Envoy {
50
namespace Http {
51
namespace {
52
// Limit the size of buffer for data used for retries.
53
// This is currently fixed to 64KB.
54
constexpr uint64_t kDefaultDecoderBufferLimit = 1 << 16;
55
// Response buffer limit 32MB.
56
constexpr uint64_t kBufferLimitForResponse = 32 * 1024 * 1024;
57
} // namespace
58

            
59
class AsyncStreamImpl;
60
class AsyncRequestSharedImpl;
61

            
62
class AsyncClientImpl final : public AsyncClient {
63
public:
64
  AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster, Stats::Store& stats_store,
65
                  Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm,
66
                  Server::Configuration::CommonFactoryContext& factory_context,
67
                  Router::ShadowWriterPtr&& shadow_writer, Http::Context& http_context,
68
                  Router::Context& router_context);
69
  ~AsyncClientImpl() override;
70

            
71
  // Http::AsyncClient
72
  Request* send(RequestMessagePtr&& request, Callbacks& callbacks,
73
                const AsyncClient::RequestOptions& options) override;
74
  Stream* start(StreamCallbacks& callbacks, const AsyncClient::StreamOptions& options) override;
75
  OngoingRequest* startRequest(RequestHeaderMapPtr&& request_headers, Callbacks& callbacks,
76
                               const AsyncClient::RequestOptions& options) override;
77
  Server::Configuration::CommonFactoryContext& factory_context_;
78
  Upstream::ClusterInfoConstSharedPtr cluster_;
79
5701
  Event::Dispatcher& dispatcher() override { return dispatcher_; }
80
  static const absl::string_view ResponseBufferLimit;
81

            
82
private:
83
  template <typename T> T* internalStartRequest(T* async_request);
84
  const Router::FilterConfigSharedPtr config_;
85
  Event::Dispatcher& dispatcher_;
86
  std::list<std::unique_ptr<AsyncStreamImpl>> active_streams_;
87
  const LocalReply::LocalReplyPtr local_reply_;
88

            
89
  friend class AsyncStreamImpl;
90
  friend class AsyncRequestSharedImpl;
91
};
92

            
93
/**
94
 * Implementation of AsyncRequest. This implementation is capable of sending HTTP requests to a
95
 * ConnectionPool asynchronously.
96
 */
97
class AsyncStreamImpl : public virtual AsyncClient::Stream,
98
                        public StreamDecoderFilterCallbacks,
99
                        public Event::DeferredDeletable,
100
                        public Logger::Loggable<Logger::Id::http>,
101
                        public LinkedObject<AsyncStreamImpl>,
102
                        public ScopeTrackedObject {
103
public:
104
  static absl::StatusOr<std::unique_ptr<AsyncStreamImpl>>
105
  create(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
106
2691
         const AsyncClient::StreamOptions& options) {
107
2691
    absl::Status creation_status = absl::OkStatus();
108
2691
    std::unique_ptr<AsyncStreamImpl> stream = std::unique_ptr<AsyncStreamImpl>(
109
2691
        new AsyncStreamImpl(parent, callbacks, options, creation_status));
110
2691
    RETURN_IF_NOT_OK(creation_status);
111
2690
    return stream;
112
2691
  }
113

            
114
3096
  ~AsyncStreamImpl() override {
115
3096
    routerDestroy();
116
    // UpstreamRequest::cleanUp() is guaranteed to reset the high watermark calls.
117
3096
    ENVOY_BUG(high_watermark_calls_ == 0, "Excess high watermark calls after async stream ended.");
118
3096
    if (destructor_callback_.has_value()) {
119
3
      (*destructor_callback_)();
120
3
    }
121
3096
  }
122

            
123
35
  void setDestructorCallback(AsyncClient::StreamDestructorCallbacks callback) override {
124
35
    ASSERT(!destructor_callback_);
125
35
    destructor_callback_.emplace(callback);
126
35
  }
127

            
128
32
  void removeDestructorCallback() override {
129
32
    ASSERT(destructor_callback_);
130
32
    destructor_callback_.reset();
131
32
  }
132

            
133
698
  void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override {
134
698
    ENVOY_BUG(!watermark_callbacks_, "Watermark callbacks should not already be registered!");
135
698
    watermark_callbacks_.emplace(callbacks);
136
701
    for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
137
3
      watermark_callbacks_->get().onSidestreamAboveHighWatermark();
138
3
    }
139
698
  }
140

            
141
692
  void removeWatermarkCallbacks() override {
142
692
    ENVOY_BUG(watermark_callbacks_, "Watermark callbacks should already be registered!");
143
695
    for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
144
3
      watermark_callbacks_->get().onSidestreamBelowLowWatermark();
145
3
    }
146
692
    watermark_callbacks_.reset();
147
692
  }
148

            
149
  // Http::AsyncClient::Stream
150
  void sendHeaders(RequestHeaderMap& headers, bool end_stream) override;
151
  void sendData(Buffer::Instance& data, bool end_stream) override;
152
  void sendTrailers(RequestTrailerMap& trailers) override;
153
  void reset() override;
154
21
  bool isAboveWriteBufferHighWatermark() const override { return high_watermark_calls_ > 0; }
155
3760
  const StreamInfo::StreamInfo& streamInfo() const override { return stream_info_; }
156
84825
  StreamInfo::StreamInfoImpl& streamInfo() override { return stream_info_; }
157

            
158
protected:
159
  AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
160
                  const AsyncClient::StreamOptions& options, absl::Status& creation_status);
161

            
162
38
  bool remoteClosed() { return remote_closed_; }
163
  void closeLocal(bool end_stream);
164

            
165
  AsyncClientImpl& parent_;
166
  // Callback to listen for stream destruction.
167
  absl::optional<AsyncClient::StreamDestructorCallbacks> destructor_callback_;
168
  // Callback to listen for low/high/overflow watermark events.
169
  absl::optional<std::reference_wrapper<SidestreamWatermarkCallbacks>> watermark_callbacks_;
170
  bool complete_{};
171
  const bool discard_response_body_;
172
  const bool new_async_client_retry_logic_{};
173
  absl::optional<uint64_t> buffer_limit_{absl::nullopt};
174

            
175
private:
176
  void cleanup();
177
  void closeRemote(bool end_stream);
178
284201
  bool complete() { return local_closed_ && remote_closed_; }
179
  void routerDestroy();
180

            
181
  // Http::StreamDecoderFilterCallbacks
182
12381
  OptRef<const Network::Connection> connection() override { return {}; }
183
145909
  Event::Dispatcher& dispatcher() override { return parent_.dispatcher_; }
184
  void resetStream(Http::StreamResetReason reset_reason = Http::StreamResetReason::LocalReset,
185
                   absl::string_view transport_failure_reason = "") override;
186
9120
  Router::RouteConstSharedPtr route() override { return route_; }
187
1
  Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; }
188
3157
  uint64_t streamId() const override { return stream_id_; }
189
  // TODO(kbaichoo): Plumb account from owning request filter.
190
6036
  Buffer::BufferMemoryAccountSharedPtr account() const override { return account_; }
191
8859
  Tracing::Span& activeSpan() override { return active_span_; }
192
3122
  OptRef<const Tracing::Config> tracingConfig() const override {
193
3122
    return makeOptRef<const Tracing::Config>(tracing_config_);
194
3122
  }
195
301
  void continueDecoding() override {}
196
  RequestTrailerMap& addDecodedTrailers() override { PANIC("not implemented"); }
197
12
  void addDecodedData(Buffer::Instance& data, bool) override {
198
12
    if (!new_async_client_retry_logic_) {
199
      // This should only be called if the user has set up buffering. The request is already fully
200
      // buffered. Note that this is only called via the async client's internal use of the router
201
      // filter which uses this function for buffering.
202
2
      ASSERT(buffered_body_ != nullptr);
203
2
      return;
204
2
    }
205

            
206
    // This will only be used by internal router filter for buffering for retries.
207

            
208
    // If the buffer limit is reached, the router filter will ignore the retry and the following
209
    // data will not be buffered. So, we don't need to check the buffer limit here because the
210
    // router filter already did that.
211
10
    if (buffered_body_ == nullptr) {
212
2
      buffered_body_ = std::make_unique<Buffer::OwnedImpl>();
213
2
    }
214
10
    buffered_body_->move(data);
215
10
  }
216
  MetadataMapVector& addDecodedMetadata() override { PANIC("not implemented"); }
217
1
  void injectDecodedDataToFilterChain(Buffer::Instance&, bool) override {}
218
70393
  const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); }
219
1
  void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {}
220
  void sendLocalReply(Code code, absl::string_view body,
221
                      std::function<void(ResponseHeaderMap& headers)> modify_headers,
222
                      const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
223
                      absl::string_view details) override;
224
  // The async client won't pause if sending 1xx headers so simply swallow any.
225
2
  void encode1xxHeaders(ResponseHeaderMapPtr&&) override {}
226
  void encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
227
                     absl::string_view details) override;
228
  void encodeData(Buffer::Instance& data, bool end_stream) override;
229
  void encodeTrailers(ResponseTrailerMapPtr&& trailers) override;
230
1
  void encodeMetadata(MetadataMapPtr&&) override {}
231
62048
  void onDecoderFilterAboveWriteBufferHighWatermark() override {
232
62048
    ++high_watermark_calls_;
233
62048
    if (watermark_callbacks_.has_value()) {
234
62043
      watermark_callbacks_->get().onSidestreamAboveHighWatermark();
235
62043
    }
236
62048
  }
237
62048
  void onDecoderFilterBelowWriteBufferLowWatermark() override {
238
62048
    ASSERT(high_watermark_calls_ != 0);
239
62048
    --high_watermark_calls_;
240
62048
    if (watermark_callbacks_.has_value()) {
241
62043
      watermark_callbacks_->get().onSidestreamBelowLowWatermark();
242
62043
    }
243
62048
  }
244
2914
  void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
245
2914
  void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
246
  void sendGoAwayAndClose(bool graceful [[maybe_unused]] = false) override {}
247

            
248
  void setBufferLimit(uint64_t) override {
249
    IS_ENVOY_BUG("decoder buffer limits should not be overridden on async streams.");
250
  }
251
78645
  uint64_t bufferLimit() override {
252
78645
    if (new_async_client_retry_logic_) {
253
78637
      return buffer_limit_.value_or(kDefaultDecoderBufferLimit);
254
78637
    } else {
255
8
      return buffer_limit_.value_or(0);
256
8
    }
257
78645
  }
258
1
  bool recreateStream(const ResponseHeaderMap*) override { return false; }
259
111432
  const ScopeTrackedObject& scope() override { return *this; }
260
  void restoreContextOnContinue(ScopeTrackedObjectStack& tracked_object_stack) override {
261
    tracked_object_stack.add(*this);
262
  }
263
1
  void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr&) override {}
264
3012
  Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override { return {}; }
265
1
  const Router::RouteSpecificFilterConfig* mostSpecificPerFilterConfig() const override {
266
1
    return nullptr;
267
1
  }
268
1
  Router::RouteSpecificFilterConfigs perFilterConfigs() const override { return {}; }
269
1
  Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override { return {}; }
270
1
  OptRef<DownstreamStreamFilterCallbacks> downstreamCallbacks() override { return {}; }
271
1
  OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {}; }
272
1
  void resetIdleTimer() override {}
273
1
  void setUpstreamOverrideHost(Upstream::LoadBalancerContext::OverrideHost) override {}
274
  absl::optional<Upstream::LoadBalancerContext::OverrideHost>
275
2983
  upstreamOverrideHost() const override {
276
2983
    return upstream_override_host_;
277
2983
  }
278
3080
  bool shouldLoadShed() const override { return false; }
279
1
  absl::string_view filterConfigName() const override { return ""; }
280
  RequestHeaderMapOptRef requestHeaders() override { return makeOptRefFromPtr(request_headers_); }
281
  RequestTrailerMapOptRef requestTrailers() override {
282
    return makeOptRefFromPtr(request_trailers_);
283
  }
284
1
  ResponseHeaderMapOptRef informationalHeaders() override { return {}; }
285
1
  ResponseHeaderMapOptRef responseHeaders() override { return {}; }
286
1
  ResponseTrailerMapOptRef responseTrailers() override { return {}; }
287

            
288
  // ScopeTrackedObject
289
1
  void dumpState(std::ostream& os, int indent_level) const override {
290
1
    const char* spaces = spacesForLevel(indent_level);
291
1
    os << spaces << "AsyncClient " << this << DUMP_MEMBER(stream_id_) << "\n";
292
1
    DUMP_DETAILS(&stream_info_);
293
1
  }
294

            
295
  AsyncClient::StreamCallbacks& stream_callbacks_;
296
  const uint64_t stream_id_;
297
  Router::ProdFilter router_;
298
  StreamInfo::StreamInfoImpl stream_info_;
299
  Tracing::NullSpan active_span_;
300
  const Tracing::Config& tracing_config_;
301
  const LocalReply::LocalReply& local_reply_;
302
  Router::RouteConstSharedPtr parent_route_;
303
  std::shared_ptr<NullRouteImpl> route_;
304
  uint32_t high_watermark_calls_{};
305
  bool local_closed_{};
306
  bool remote_closed_{};
307
  Buffer::InstancePtr buffered_body_;
308
  Buffer::BufferMemoryAccountSharedPtr account_{nullptr};
309
  RequestHeaderMap* request_headers_{};
310
  RequestTrailerMap* request_trailers_{};
311
  bool encoded_response_headers_{};
312
  bool is_grpc_request_{};
313
  bool is_head_request_{false};
314
  bool send_xff_{true};
315
  bool send_internal_{true};
316
  bool router_destroyed_{false};
317

            
318
  // Upstream override host for bypassing load balancer selection
319
  absl::optional<Upstream::LoadBalancerContext::OverrideHost> upstream_override_host_;
320

            
321
  friend class AsyncClientImpl;
322
  friend class AsyncClientImplUnitTest;
323
};
324

            
325
class AsyncRequestSharedImpl : public virtual AsyncClient::Request,
326
                               protected AsyncStreamImpl,
327
                               protected AsyncClient::StreamCallbacks {
328
public:
329
  void cancel() final;
330

            
331
protected:
332
  AsyncRequestSharedImpl(AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks,
333
                         const AsyncClient::RequestOptions& options, absl::Status& creation_status);
334
  void onHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) final;
335
  void onData(Buffer::Instance& data, bool end_stream) final;
336
  void onTrailers(ResponseTrailerMapPtr&& trailers) final;
337
  void onComplete() final;
338
  void onReset() final;
339

            
340
  AsyncClient::Callbacks& callbacks_;
341
  Tracing::SpanPtr child_span_;
342
  std::unique_ptr<ResponseMessageImpl> response_;
343
  bool cancelled_{};
344
  bool response_buffer_overlimit_{};
345
  const uint64_t response_buffer_limit_;
346
};
347

            
348
class AsyncOngoingRequestImpl final : public AsyncClient::OngoingRequest,
349
                                      public AsyncRequestSharedImpl {
350
public:
351
  static AsyncOngoingRequestImpl* create(RequestHeaderMapPtr&& request_headers,
352
                                         AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks,
353
83
                                         const AsyncClient::RequestOptions& options) {
354
83
    absl::Status creation_status = absl::OkStatus();
355
83
    auto* ret = new AsyncOngoingRequestImpl(std::move(request_headers), parent, callbacks, options,
356
83
                                            creation_status);
357
83
    if (!creation_status.ok()) {
358
      delete ret;
359
      return nullptr;
360
    }
361
83
    return ret;
362
83
  }
363
1
  void captureAndSendTrailers(RequestTrailerMapPtr&& trailers) override {
364
1
    request_trailers_ = std::move(trailers);
365
1
    sendTrailers(*request_trailers_);
366
1
  }
367

            
368
private:
369
  AsyncOngoingRequestImpl(RequestHeaderMapPtr&& request_headers, AsyncClientImpl& parent,
370
                          AsyncClient::Callbacks& callbacks,
371
                          const AsyncClient::RequestOptions& options, absl::Status& creation_status)
372
83
      : AsyncRequestSharedImpl(parent, callbacks, options, creation_status),
373
83
        request_headers_(std::move(request_headers)) {
374
83
    ASSERT(request_headers_);
375
83
  }
376
  void initialize();
377

            
378
  RequestHeaderMapPtr request_headers_;
379
  RequestTrailerMapPtr request_trailers_;
380

            
381
  friend class AsyncClientImpl;
382
};
383

            
384
class AsyncRequestImpl final : public AsyncRequestSharedImpl {
385
public:
386
  static AsyncRequestImpl* create(RequestMessagePtr&& request, AsyncClientImpl& parent,
387
                                  AsyncClient::Callbacks& callbacks,
388
322
                                  const AsyncClient::RequestOptions& options) {
389
322
    absl::Status creation_status = absl::OkStatus();
390
322
    auto* ret =
391
322
        new AsyncRequestImpl(std::move(request), parent, callbacks, options, creation_status);
392
322
    if (!creation_status.ok()) {
393
      delete ret;
394
      return nullptr;
395
    }
396
322
    return ret;
397
322
  }
398

            
399
private:
400
  AsyncRequestImpl(RequestMessagePtr&& request, AsyncClientImpl& parent,
401
                   AsyncClient::Callbacks& callbacks, const AsyncClient::RequestOptions& options,
402
                   absl::Status& creation_status)
403
322
      : AsyncRequestSharedImpl(parent, callbacks, options, creation_status),
404
322
        request_(std::move(request)) {}
405

            
406
  void initialize();
407

            
408
  // Http::StreamDecoderFilterCallbacks
409
19
  void addDecodedData(Buffer::Instance&, bool) override {
410
    // This will only be used by internal router filter for buffering for retries.
411
    // But for AsyncRequest that all data is already buffered in request message
412
    // and do not need to buffer again.
413
19
  }
414
214
  const Buffer::Instance* decodingBuffer() override { return &request_->body(); }
415
800
  uint64_t bufferLimit() override {
416
800
    if (new_async_client_retry_logic_) {
417
      // 0 means no limit because the whole body is already buffered in request message.
418
800
      return 0;
419
800
    } else {
420
      return buffer_limit_.value_or(0);
421
    }
422
800
  }
423

            
424
  RequestMessagePtr request_;
425

            
426
  friend class AsyncClientImpl;
427
};
428

            
429
} // namespace Http
430
} // namespace Envoy