Line data Source code
1 : #pragma once
2 :
3 : #include <chrono>
4 : #include <cstdint>
5 : #include <functional>
6 : #include <list>
7 : #include <map>
8 : #include <memory>
9 : #include <string>
10 : #include <vector>
11 :
12 : #include "envoy/buffer/buffer.h"
13 : #include "envoy/common/random_generator.h"
14 : #include "envoy/common/scope_tracker.h"
15 : #include "envoy/config/core/v3/base.pb.h"
16 : #include "envoy/config/route/v3/route_components.pb.h"
17 : #include "envoy/config/typed_metadata.h"
18 : #include "envoy/event/dispatcher.h"
19 : #include "envoy/http/async_client.h"
20 : #include "envoy/http/codec.h"
21 : #include "envoy/http/context.h"
22 : #include "envoy/http/filter.h"
23 : #include "envoy/http/header_map.h"
24 : #include "envoy/http/message.h"
25 : #include "envoy/router/context.h"
26 : #include "envoy/router/router.h"
27 : #include "envoy/router/router_ratelimit.h"
28 : #include "envoy/router/shadow_writer.h"
29 : #include "envoy/server/filter_config.h"
30 : #include "envoy/ssl/connection.h"
31 : #include "envoy/tracing/tracer.h"
32 : #include "envoy/type/v3/percent.pb.h"
33 : #include "envoy/upstream/load_balancer.h"
34 : #include "envoy/upstream/upstream.h"
35 :
36 : #include "source/common/common/assert.h"
37 : #include "source/common/common/empty_string.h"
38 : #include "source/common/common/linked_object.h"
39 : #include "source/common/http/message_impl.h"
40 : #include "source/common/http/null_route_impl.h"
41 : #include "source/common/router/config_impl.h"
42 : #include "source/common/router/router.h"
43 : #include "source/common/stream_info/stream_info_impl.h"
44 : #include "source/common/tracing/http_tracer_impl.h"
45 : #include "source/common/upstream/retry_factory.h"
46 : #include "source/extensions/early_data/default_early_data_policy.h"
47 :
48 : namespace Envoy {
49 : namespace Http {
50 : namespace {
51 : // Limit the size of buffer for data used for retries.
52 : // This is currently fixed to 64KB.
53 : constexpr uint64_t kBufferLimitForRetry = 1 << 16;
54 : } // namespace
55 :
56 : class AsyncStreamImpl;
57 : class AsyncRequestSharedImpl;
58 :
59 : class AsyncClientImpl final : public AsyncClient {
60 : public:
61 : AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster, Stats::Store& stats_store,
62 : Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info,
63 : Upstream::ClusterManager& cm, Runtime::Loader& runtime,
64 : Random::RandomGenerator& random, Router::ShadowWriterPtr&& shadow_writer,
65 : Http::Context& http_context, Router::Context& router_context);
66 : ~AsyncClientImpl() override;
67 :
68 : // Http::AsyncClient
69 : Request* send(RequestMessagePtr&& request, Callbacks& callbacks,
70 : const AsyncClient::RequestOptions& options) override;
71 : Stream* start(StreamCallbacks& callbacks, const AsyncClient::StreamOptions& options) override;
72 : OngoingRequest* startRequest(RequestHeaderMapPtr&& request_headers, Callbacks& callbacks,
73 : const AsyncClient::RequestOptions& options) override;
74 : Singleton::Manager& singleton_manager_;
75 : Upstream::ClusterInfoConstSharedPtr cluster_;
76 136 : Event::Dispatcher& dispatcher() override { return dispatcher_; }
77 :
78 : private:
79 : template <typename T> T* internalStartRequest(T* async_request);
80 : Router::FilterConfig config_;
81 : Event::Dispatcher& dispatcher_;
82 : std::list<std::unique_ptr<AsyncStreamImpl>> active_streams_;
83 :
84 : friend class AsyncStreamImpl;
85 : friend class AsyncRequestSharedImpl;
86 : };
87 :
88 : /**
89 : * Implementation of AsyncRequest. This implementation is capable of sending HTTP requests to a
90 : * ConnectionPool asynchronously.
91 : */
92 : class AsyncStreamImpl : public virtual AsyncClient::Stream,
93 : public StreamDecoderFilterCallbacks,
94 : public Event::DeferredDeletable,
95 : Logger::Loggable<Logger::Id::http>,
96 : public LinkedObject<AsyncStreamImpl>,
97 : public ScopeTrackedObject {
98 : public:
99 : AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
100 : const AsyncClient::StreamOptions& options);
101 68 : ~AsyncStreamImpl() override {
102 68 : router_.onDestroy();
103 : // UpstreamRequest::cleanUp() is guaranteed to reset the high watermark calls.
104 68 : ENVOY_BUG(high_watermark_calls_ == 0, "Excess high watermark calls after async stream ended.");
105 68 : if (destructor_callback_.has_value()) {
106 0 : (*destructor_callback_)();
107 0 : }
108 68 : }
109 :
110 0 : void setDestructorCallback(AsyncClient::StreamDestructorCallbacks callback) override {
111 0 : ASSERT(!destructor_callback_);
112 0 : destructor_callback_.emplace(callback);
113 0 : }
114 :
115 0 : void removeDestructorCallback() override {
116 0 : ASSERT(destructor_callback_);
117 0 : destructor_callback_.reset();
118 0 : }
119 :
120 0 : void setWatermarkCallbacks(DecoderFilterWatermarkCallbacks& callbacks) override {
121 0 : ASSERT(!watermark_callbacks_);
122 0 : watermark_callbacks_.emplace(callbacks);
123 0 : for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
124 0 : watermark_callbacks_->get().onDecoderFilterAboveWriteBufferHighWatermark();
125 0 : }
126 0 : }
127 :
128 0 : void removeWatermarkCallbacks() override {
129 0 : ASSERT(watermark_callbacks_);
130 0 : for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
131 0 : watermark_callbacks_->get().onDecoderFilterBelowWriteBufferLowWatermark();
132 0 : }
133 0 : watermark_callbacks_.reset();
134 0 : }
135 :
136 : // Http::AsyncClient::Stream
137 : void sendHeaders(RequestHeaderMap& headers, bool end_stream) override;
138 : void sendData(Buffer::Instance& data, bool end_stream) override;
139 : void sendTrailers(RequestTrailerMap& trailers) override;
140 : void reset() override;
141 0 : bool isAboveWriteBufferHighWatermark() const override { return high_watermark_calls_ > 0; }
142 122 : const StreamInfo::StreamInfo& streamInfo() const override { return stream_info_; }
143 :
144 : protected:
145 0 : bool remoteClosed() { return remote_closed_; }
146 : void closeLocal(bool end_stream);
147 1277 : StreamInfo::StreamInfoImpl& streamInfo() override { return stream_info_; }
148 :
149 : AsyncClientImpl& parent_;
150 : // Callback to listen for stream destruction.
151 : absl::optional<AsyncClient::StreamDestructorCallbacks> destructor_callback_;
152 : // Callback to listen for low/high/overflow watermark events.
153 : absl::optional<std::reference_wrapper<DecoderFilterWatermarkCallbacks>> watermark_callbacks_;
154 :
155 : private:
156 : void cleanup();
157 : void closeRemote(bool end_stream);
158 1073 : bool complete() { return local_closed_ && remote_closed_; }
159 :
160 : // Http::StreamDecoderFilterCallbacks
161 204 : OptRef<const Network::Connection> connection() override { return {}; }
162 1007 : Event::Dispatcher& dispatcher() override { return parent_.dispatcher_; }
163 : void resetStream(Http::StreamResetReason reset_reason = Http::StreamResetReason::LocalReset,
164 : absl::string_view transport_failure_reason = "") override;
165 204 : Router::RouteConstSharedPtr route() override { return route_; }
166 0 : Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; }
167 68 : uint64_t streamId() const override { return stream_id_; }
168 : // TODO(kbaichoo): Plumb account from owning request filter.
169 136 : Buffer::BufferMemoryAccountSharedPtr account() const override { return account_; }
170 68 : Tracing::Span& activeSpan() override { return active_span_; }
171 68 : OptRef<const Tracing::Config> tracingConfig() const override {
172 68 : return makeOptRef<const Tracing::Config>(tracing_config_);
173 68 : }
174 0 : void continueDecoding() override {}
175 0 : RequestTrailerMap& addDecodedTrailers() override { PANIC("not implemented"); }
176 0 : void addDecodedData(Buffer::Instance&, bool) override {
177 : // This should only be called if the user has set up buffering. The request is already fully
178 : // buffered. Note that this is only called via the async client's internal use of the router
179 : // filter which uses this function for buffering.
180 0 : ASSERT(buffered_body_ != nullptr);
181 0 : }
182 0 : MetadataMapVector& addDecodedMetadata() override { PANIC("not implemented"); }
183 0 : void injectDecodedDataToFilterChain(Buffer::Instance&, bool) override {}
184 0 : const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); }
185 0 : void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {}
186 : void sendLocalReply(Code code, absl::string_view body,
187 : std::function<void(ResponseHeaderMap& headers)> modify_headers,
188 : const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
189 28 : absl::string_view details) override {
190 28 : if (encoded_response_headers_) {
191 28 : resetStream();
192 28 : return;
193 28 : }
194 0 : Utility::sendLocalReply(
195 0 : remote_closed_,
196 0 : Utility::EncodeFunctions{nullptr, nullptr,
197 0 : [this, modify_headers, &details](ResponseHeaderMapPtr&& headers,
198 0 : bool end_stream) -> void {
199 0 : if (modify_headers != nullptr) {
200 0 : modify_headers(*headers);
201 0 : }
202 0 : encodeHeaders(std::move(headers), end_stream, details);
203 0 : },
204 0 : [this](Buffer::Instance& data, bool end_stream) -> void {
205 0 : encodeData(data, end_stream);
206 0 : }},
207 0 : Utility::LocalReplyData{is_grpc_request_, code, body, grpc_status, is_head_request_});
208 0 : }
209 : // The async client won't pause if sending 1xx headers so simply swallow any.
210 0 : void encode1xxHeaders(ResponseHeaderMapPtr&&) override {}
211 : void encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
212 : absl::string_view details) override;
213 : void encodeData(Buffer::Instance& data, bool end_stream) override;
214 : void encodeTrailers(ResponseTrailerMapPtr&& trailers) override;
215 0 : void encodeMetadata(MetadataMapPtr&&) override {}
216 0 : void onDecoderFilterAboveWriteBufferHighWatermark() override {
217 0 : ++high_watermark_calls_;
218 0 : if (watermark_callbacks_.has_value()) {
219 0 : watermark_callbacks_->get().onDecoderFilterAboveWriteBufferHighWatermark();
220 0 : }
221 0 : }
222 0 : void onDecoderFilterBelowWriteBufferLowWatermark() override {
223 0 : ASSERT(high_watermark_calls_ != 0);
224 0 : --high_watermark_calls_;
225 0 : if (watermark_callbacks_.has_value()) {
226 0 : watermark_callbacks_->get().onDecoderFilterBelowWriteBufferLowWatermark();
227 0 : }
228 0 : }
229 68 : void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
230 68 : void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
231 0 : void setDecoderBufferLimit(uint32_t) override {
232 0 : IS_ENVOY_BUG("decoder buffer limits should not be overridden on async streams.");
233 0 : }
234 136 : uint32_t decoderBufferLimit() override { return buffer_limit_.value_or(0); }
235 0 : bool recreateStream(const ResponseHeaderMap*) override { return false; }
236 423 : const ScopeTrackedObject& scope() override { return *this; }
237 0 : void restoreContextOnContinue(ScopeTrackedObjectStack& tracked_object_stack) override {
238 0 : tracked_object_stack.add(*this);
239 0 : }
240 0 : void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr&) override {}
241 68 : Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override { return {}; }
242 0 : const Router::RouteSpecificFilterConfig* mostSpecificPerFilterConfig() const override {
243 0 : return nullptr;
244 0 : }
245 : void traversePerFilterConfig(
246 0 : std::function<void(const Router::RouteSpecificFilterConfig&)>) const override {}
247 0 : Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override { return {}; }
248 0 : OptRef<DownstreamStreamFilterCallbacks> downstreamCallbacks() override { return {}; }
249 0 : OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {}; }
250 0 : void resetIdleTimer() override {}
251 0 : void setUpstreamOverrideHost(Upstream::LoadBalancerContext::OverrideHost) override {}
252 : absl::optional<Upstream::LoadBalancerContext::OverrideHost>
253 136 : upstreamOverrideHost() const override {
254 136 : return absl::nullopt;
255 136 : }
256 0 : absl::string_view filterConfigName() const override { return ""; }
257 0 : RequestHeaderMapOptRef requestHeaders() override { return makeOptRefFromPtr(request_headers_); }
258 0 : RequestTrailerMapOptRef requestTrailers() override {
259 0 : return makeOptRefFromPtr(request_trailers_);
260 0 : }
261 0 : ResponseHeaderMapOptRef informationalHeaders() override { return {}; }
262 0 : ResponseHeaderMapOptRef responseHeaders() override { return {}; }
263 0 : ResponseTrailerMapOptRef responseTrailers() override { return {}; }
264 :
265 : // ScopeTrackedObject
266 0 : void dumpState(std::ostream& os, int indent_level) const override {
267 0 : const char* spaces = spacesForLevel(indent_level);
268 0 : os << spaces << "AsyncClient " << this << DUMP_MEMBER(stream_id_) << "\n";
269 0 : DUMP_DETAILS(&stream_info_);
270 0 : }
271 :
272 : AsyncClient::StreamCallbacks& stream_callbacks_;
273 : const uint64_t stream_id_;
274 : Router::ProdFilter router_;
275 : StreamInfo::StreamInfoImpl stream_info_;
276 : Tracing::NullSpan active_span_;
277 : const Tracing::Config& tracing_config_;
278 : std::shared_ptr<NullRouteImpl> route_;
279 : uint32_t high_watermark_calls_{};
280 : bool local_closed_{};
281 : bool remote_closed_{};
282 : Buffer::InstancePtr buffered_body_;
283 : Buffer::BufferMemoryAccountSharedPtr account_{nullptr};
284 : absl::optional<uint32_t> buffer_limit_{absl::nullopt};
285 : RequestHeaderMap* request_headers_{};
286 : RequestTrailerMap* request_trailers_{};
287 : bool encoded_response_headers_{};
288 : bool is_grpc_request_{};
289 : bool is_head_request_{false};
290 : bool send_xff_{true};
291 :
292 : friend class AsyncClientImpl;
293 : friend class AsyncClientImplUnitTest;
294 : };
295 :
296 : class AsyncRequestSharedImpl : public virtual AsyncClient::Request,
297 : protected AsyncStreamImpl,
298 : protected AsyncClient::StreamCallbacks {
299 : public:
300 : void cancel() final;
301 :
302 : protected:
303 : AsyncRequestSharedImpl(AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks,
304 : const AsyncClient::RequestOptions& options);
305 : void onHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) final;
306 : void onData(Buffer::Instance& data, bool end_stream) final;
307 : void onTrailers(ResponseTrailerMapPtr&& trailers) final;
308 : void onComplete() final;
309 : void onReset() final;
310 :
311 : AsyncClient::Callbacks& callbacks_;
312 : Tracing::SpanPtr child_span_;
313 : std::unique_ptr<ResponseMessageImpl> response_;
314 : bool cancelled_{};
315 : };
316 :
317 : class AsyncOngoingRequestImpl final : public AsyncClient::OngoingRequest,
318 : public AsyncRequestSharedImpl {
319 : public:
320 : AsyncOngoingRequestImpl(RequestHeaderMapPtr&& request_headers, AsyncClientImpl& parent,
321 : AsyncClient::Callbacks& callbacks,
322 : const AsyncClient::RequestOptions& options)
323 : : AsyncRequestSharedImpl(parent, callbacks, options),
324 0 : request_headers_(std::move(request_headers)) {
325 0 : ASSERT(request_headers_);
326 0 : }
327 0 : void captureAndSendTrailers(RequestTrailerMapPtr&& trailers) override {
328 0 : request_trailers_ = std::move(trailers);
329 0 : sendTrailers(*request_trailers_);
330 0 : }
331 :
332 : private:
333 : void initialize();
334 :
335 : RequestHeaderMapPtr request_headers_;
336 : RequestTrailerMapPtr request_trailers_;
337 :
338 : friend class AsyncClientImpl;
339 : };
340 :
341 : class AsyncRequestImpl final : public AsyncRequestSharedImpl {
342 : public:
343 : AsyncRequestImpl(RequestMessagePtr&& request, AsyncClientImpl& parent,
344 : AsyncClient::Callbacks& callbacks, const AsyncClient::RequestOptions& options)
345 0 : : AsyncRequestSharedImpl(parent, callbacks, options), request_(std::move(request)) {}
346 :
347 : private:
348 : void initialize();
349 :
350 : // Http::StreamDecoderFilterCallbacks
351 0 : void addDecodedData(Buffer::Instance&, bool) override {
352 0 : // The request is already fully buffered. Note that this is only called via the async client's
353 0 : // internal use of the router filter which uses this function for buffering.
354 0 : }
355 0 : const Buffer::Instance* decodingBuffer() override { return &request_->body(); }
356 0 : void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {}
357 :
358 : RequestMessagePtr request_;
359 :
360 : friend class AsyncClientImpl;
361 : };
362 :
363 : } // namespace Http
364 : } // namespace Envoy
|