Line data Source code
1 : #include "source/common/http/async_client_impl.h"
2 :
3 : #include <chrono>
4 : #include <map>
5 : #include <memory>
6 : #include <string>
7 : #include <vector>
8 :
9 : #include "envoy/config/core/v3/base.pb.h"
10 :
11 : #include "source/common/grpc/common.h"
12 : #include "source/common/http/utility.h"
13 : #include "source/common/tracing/http_tracer_impl.h"
14 :
15 : namespace Envoy {
16 : namespace Http {
17 : AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster,
18 : Stats::Store& stats_store, Event::Dispatcher& dispatcher,
19 : const LocalInfo::LocalInfo& local_info,
20 : Upstream::ClusterManager& cm, Runtime::Loader& runtime,
21 : Random::RandomGenerator& random,
22 : Router::ShadowWriterPtr&& shadow_writer,
23 : Http::Context& http_context, Router::Context& router_context)
24 : : singleton_manager_(cm.clusterManagerFactory().singletonManager()), cluster_(cluster),
25 : config_(http_context.asyncClientStatPrefix(), local_info, *stats_store.rootScope(), cm,
26 : runtime, random, std::move(shadow_writer), true, false, false, false, false, false,
27 : {}, dispatcher.timeSource(), http_context, router_context),
28 33 : dispatcher_(dispatcher) {}
29 :
30 33 : AsyncClientImpl::~AsyncClientImpl() {
31 33 : while (!active_streams_.empty()) {
32 0 : active_streams_.front()->reset();
33 0 : }
34 33 : }
35 :
36 0 : template <typename T> T* AsyncClientImpl::internalStartRequest(T* async_request) {
37 0 : async_request->initialize();
38 0 : std::unique_ptr<AsyncStreamImpl> new_request{async_request};
39 :
40 : // The request may get immediately failed. If so, we will return nullptr.
41 0 : if (!new_request->remote_closed_) {
42 0 : LinkedList::moveIntoList(std::move(new_request), active_streams_);
43 0 : return async_request;
44 0 : } else {
45 0 : new_request->cleanup();
46 0 : return nullptr;
47 0 : }
48 0 : }
49 :
50 : template AsyncRequestImpl*
51 : AsyncClientImpl::internalStartRequest<AsyncRequestImpl>(AsyncRequestImpl*);
52 : template AsyncOngoingRequestImpl*
53 : AsyncClientImpl::internalStartRequest<AsyncOngoingRequestImpl>(AsyncOngoingRequestImpl*);
54 :
55 : AsyncClient::Request* AsyncClientImpl::send(RequestMessagePtr&& request,
56 : AsyncClient::Callbacks& callbacks,
57 0 : const AsyncClient::RequestOptions& options) {
58 0 : AsyncRequestImpl* async_request =
59 0 : new AsyncRequestImpl(std::move(request), *this, callbacks, options);
60 0 : return internalStartRequest(async_request);
61 0 : }
62 :
63 : AsyncClient::OngoingRequest*
64 : AsyncClientImpl::startRequest(RequestHeaderMapPtr&& request_headers, Callbacks& callbacks,
65 0 : const AsyncClient::RequestOptions& options) {
66 0 : AsyncOngoingRequestImpl* async_request =
67 0 : new AsyncOngoingRequestImpl(std::move(request_headers), *this, callbacks, options);
68 0 : return internalStartRequest(async_request);
69 0 : }
70 :
71 : AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks,
72 68 : const AsyncClient::StreamOptions& options) {
73 68 : std::unique_ptr<AsyncStreamImpl> new_stream{new AsyncStreamImpl(*this, callbacks, options)};
74 68 : LinkedList::moveIntoList(std::move(new_stream), active_streams_);
75 68 : return active_streams_.front().get();
76 68 : }
77 :
78 : AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
79 : const AsyncClient::StreamOptions& options)
80 : : parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
81 : router_(options.filter_config_ ? *options.filter_config_ : parent.config_,
82 : parent.config_.async_stats_),
83 : stream_info_(Protocol::Http11, parent.dispatcher().timeSource(), nullptr),
84 : tracing_config_(Tracing::EgressConfig::get()),
85 : route_(std::make_shared<NullRouteImpl>(parent_.cluster_->name(), parent_.singleton_manager_,
86 : options.timeout, options.hash_policy,
87 : options.retry_policy)),
88 : account_(options.account_), buffer_limit_(options.buffer_limit_),
89 68 : send_xff_(options.send_xff) {
90 68 : stream_info_.dynamicMetadata().MergeFrom(options.metadata);
91 68 : stream_info_.setIsShadow(options.is_shadow);
92 68 : stream_info_.setUpstreamClusterInfo(parent_.cluster_);
93 68 : stream_info_.route_ = route_;
94 :
95 68 : if (options.buffer_body_for_retry) {
96 0 : buffered_body_ = std::make_unique<Buffer::OwnedImpl>(account_);
97 0 : }
98 :
99 68 : router_.setDecoderFilterCallbacks(*this);
100 : // TODO(mattklein123): Correctly set protocol in stream info when we support access logging.
101 68 : }
102 :
103 : void AsyncStreamImpl::encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
104 68 : absl::string_view) {
105 68 : ENVOY_LOG(debug, "async http request response headers (end_stream={}):\n{}", end_stream,
106 68 : *headers);
107 68 : ASSERT(!remote_closed_);
108 68 : encoded_response_headers_ = true;
109 68 : stream_callbacks_.onHeaders(std::move(headers), end_stream);
110 68 : closeRemote(end_stream);
111 : // At present, the router cleans up stream state as soon as the remote is closed, making a
112 : // half-open local stream unsupported and dangerous. Ensure we close locally to trigger completion
113 : // and keep things consistent. Another option would be to issue a stream reset here if local isn't
114 : // yet closed, triggering cleanup along a more standardized path. However, this would require
115 : // additional logic to handle the response completion and subsequent reset, and run the risk of
116 : // being interpreted as a failure, when in fact no error has necessarily occurred. Gracefully
117 : // closing seems most in-line with behavior elsewhere in Envoy for now.
118 68 : closeLocal(end_stream);
119 68 : }
120 :
121 253 : void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
122 253 : ENVOY_LOG(trace, "async http request response data (length={} end_stream={})", data.length(),
123 253 : end_stream);
124 253 : ASSERT(!remote_closed_);
125 253 : stream_callbacks_.onData(data, end_stream);
126 253 : closeRemote(end_stream);
127 : // Ensure we close locally on receiving a complete response; see comment in encodeHeaders for
128 : // rationale.
129 253 : closeLocal(end_stream);
130 253 : }
131 :
132 6 : void AsyncStreamImpl::encodeTrailers(ResponseTrailerMapPtr&& trailers) {
133 6 : ENVOY_LOG(debug, "async http request response trailers:\n{}", *trailers);
134 6 : ASSERT(!remote_closed_);
135 6 : stream_callbacks_.onTrailers(std::move(trailers));
136 6 : closeRemote(true);
137 : // Ensure we close locally on receiving a complete response; see comment in encodeHeaders for
138 : // rationale.
139 6 : closeLocal(true);
140 6 : }
141 :
142 68 : void AsyncStreamImpl::sendHeaders(RequestHeaderMap& headers, bool end_stream) {
143 68 : request_headers_ = &headers;
144 :
145 68 : if (Http::Headers::get().MethodValues.Head == headers.getMethodValue()) {
146 0 : is_head_request_ = true;
147 0 : }
148 :
149 68 : is_grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
150 68 : headers.setReferenceEnvoyInternalRequest(Headers::get().EnvoyInternalRequestValues.True);
151 68 : if (send_xff_) {
152 68 : Utility::appendXff(headers, *parent_.config_.local_info_.address());
153 68 : }
154 :
155 68 : router_.decodeHeaders(headers, end_stream);
156 68 : closeLocal(end_stream);
157 68 : }
158 :
159 431 : void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
160 431 : ASSERT(dispatcher().isThreadSafe());
161 : // Map send calls after local closure to no-ops. The send call could have been queued prior to
162 : // remote reset or closure, and/or closure could have occurred synchronously in response to a
163 : // previous send. In these cases the router will have already cleaned up stream state. This
164 : // parallels handling in the main Http::ConnectionManagerImpl as well.
165 431 : if (local_closed_) {
166 0 : return;
167 0 : }
168 :
169 431 : if (buffered_body_ != nullptr) {
170 : // TODO(shikugawa): Currently, data is dropped when the retry buffer overflows and there is no
171 : // ability implement any error handling. We need to implement buffer overflow handling in the
172 : // future. Options include configuring the max buffer size, or for use cases like gRPC
173 : // streaming, deleting old data in the retry buffer.
174 0 : if (buffered_body_->length() + data.length() > kBufferLimitForRetry) {
175 0 : ENVOY_LOG_EVERY_POW_2(
176 0 : warn, "the buffer size limit (64KB) for async client retries has been exceeded.");
177 0 : } else {
178 0 : buffered_body_->add(data);
179 0 : }
180 0 : }
181 :
182 431 : router_.decodeData(data, end_stream);
183 431 : closeLocal(end_stream);
184 431 : }
185 :
186 0 : void AsyncStreamImpl::sendTrailers(RequestTrailerMap& trailers) {
187 0 : request_trailers_ = &trailers;
188 :
189 0 : ASSERT(dispatcher().isThreadSafe());
190 : // See explanation in sendData.
191 0 : if (local_closed_) {
192 0 : return;
193 0 : }
194 :
195 0 : router_.decodeTrailers(trailers);
196 0 : closeLocal(true);
197 0 : }
198 :
199 826 : void AsyncStreamImpl::closeLocal(bool end_stream) {
200 : // This guard ensures that we don't attempt to clean up a stream or fire a completion callback
201 : // for a stream that has already been closed. Both send* calls and resets can result in stream
202 : // closure, and this state may be updated synchronously during stream interaction and callbacks.
203 : // Additionally AsyncRequestImpl maintains behavior wherein its onComplete callback will fire
204 : // immediately upon receiving a complete response, regardless of whether it has finished sending
205 : // a request.
206 : // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage
207 : // guards against redundant cleanup), but to surface consistent stream state via callbacks,
208 : // it's necessary to be more rigorous.
209 : // TODO(goaway): Consider deeper cleanup of assumptions here.
210 826 : if (local_closed_) {
211 40 : return;
212 40 : }
213 :
214 786 : local_closed_ = end_stream;
215 786 : if (complete()) {
216 0 : stream_callbacks_.onComplete();
217 0 : cleanup();
218 0 : }
219 786 : }
220 :
221 327 : void AsyncStreamImpl::closeRemote(bool end_stream) {
222 : // This guard ensures that we don't attempt to clean up a stream or fire a completion callback for
223 : // a stream that has already been closed. This function is called synchronously after callbacks
224 : // have executed, and it's possible for callbacks to, for instance, directly reset a stream or
225 : // close the remote manually. The test case ResetInOnHeaders covers this case specifically.
226 : // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage
227 : // guards against redundant cleanup), but to surface consistent stream state via callbacks, it's
228 : // necessary to be more rigorous.
229 : // TODO(goaway): Consider deeper cleanup of assumptions here.
230 327 : if (remote_closed_) {
231 40 : return;
232 40 : }
233 :
234 287 : remote_closed_ = end_stream;
235 287 : if (complete()) {
236 0 : stream_callbacks_.onComplete();
237 0 : cleanup();
238 0 : }
239 287 : }
240 :
241 40 : void AsyncStreamImpl::reset() {
242 40 : router_.onDestroy();
243 40 : resetStream();
244 40 : }
245 :
246 68 : void AsyncStreamImpl::cleanup() {
247 68 : ASSERT(dispatcher().isThreadSafe());
248 68 : local_closed_ = remote_closed_ = true;
249 : // This will destroy us, but only do so if we are actually in a list. This does not happen in
250 : // the immediate failure case.
251 68 : if (inserted()) {
252 68 : dispatcher().deferredDelete(removeFromList(parent_.active_streams_));
253 68 : }
254 68 : }
255 :
256 68 : void AsyncStreamImpl::resetStream(Http::StreamResetReason, absl::string_view) {
257 68 : stream_callbacks_.onReset();
258 68 : cleanup();
259 68 : }
260 :
261 : AsyncRequestSharedImpl::AsyncRequestSharedImpl(AsyncClientImpl& parent,
262 : AsyncClient::Callbacks& callbacks,
263 : const AsyncClient::RequestOptions& options)
264 0 : : AsyncStreamImpl(parent, *this, options), callbacks_(callbacks) {
265 0 : if (nullptr != options.parent_span_) {
266 0 : const std::string child_span_name =
267 0 : options.child_span_name_.empty()
268 0 : ? absl::StrCat("async ", parent.cluster_->name(), " egress")
269 0 : : options.child_span_name_;
270 0 : child_span_ = options.parent_span_->spawnChild(Tracing::EgressConfig::get(), child_span_name,
271 0 : parent.dispatcher().timeSource().systemTime());
272 0 : } else {
273 0 : child_span_ = std::make_unique<Tracing::NullSpan>();
274 0 : }
275 : // Span gets sampled by default, as sampled_ defaults to true.
276 : // If caller overrides sampled_ with empty value, sampling status of the parent is kept.
277 0 : if (options.sampled_.has_value()) {
278 0 : child_span_->setSampled(options.sampled_.value());
279 0 : }
280 0 : }
281 :
282 0 : void AsyncRequestImpl::initialize() {
283 0 : Tracing::HttpTraceContext trace_context(request_->headers());
284 0 : child_span_->injectContext(trace_context, nullptr);
285 0 : sendHeaders(request_->headers(), request_->body().length() == 0);
286 0 : if (request_->body().length() != 0) {
287 : // It's possible this will be a no-op due to a local response synchronously generated in
288 : // sendHeaders; guards handle this within AsyncStreamImpl.
289 0 : sendData(request_->body(), true);
290 0 : }
291 : // TODO(mattklein123): Support request trailers.
292 0 : }
293 :
294 0 : void AsyncOngoingRequestImpl::initialize() {
295 0 : Tracing::HttpTraceContext trace_context(*request_headers_);
296 0 : child_span_->injectContext(trace_context, nullptr);
297 0 : sendHeaders(*request_headers_, false);
298 0 : }
299 :
300 0 : void AsyncRequestSharedImpl::onComplete() {
301 0 : callbacks_.onBeforeFinalizeUpstreamSpan(*child_span_, &response_->headers());
302 :
303 0 : Tracing::HttpTracerUtility::finalizeUpstreamSpan(*child_span_, streamInfo(),
304 0 : Tracing::EgressConfig::get());
305 :
306 0 : callbacks_.onSuccess(*this, std::move(response_));
307 0 : }
308 :
309 0 : void AsyncRequestSharedImpl::onHeaders(ResponseHeaderMapPtr&& headers, bool) {
310 0 : const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
311 0 : streamInfo().setResponseCode(response_code);
312 0 : response_ = std::make_unique<ResponseMessageImpl>(std::move(headers));
313 0 : }
314 :
315 0 : void AsyncRequestSharedImpl::onData(Buffer::Instance& data, bool) { response_->body().move(data); }
316 :
317 0 : void AsyncRequestSharedImpl::onTrailers(ResponseTrailerMapPtr&& trailers) {
318 0 : response_->trailers(std::move(trailers));
319 0 : }
320 :
321 0 : void AsyncRequestSharedImpl::onReset() {
322 0 : if (!cancelled_) {
323 : // Set "error reason" tag related to reset. The tagging for "error true" is done inside the
324 : // Tracing::HttpTracerUtility::finalizeUpstreamSpan.
325 0 : child_span_->setTag(Tracing::Tags::get().ErrorReason, "Reset");
326 0 : }
327 :
328 0 : callbacks_.onBeforeFinalizeUpstreamSpan(*child_span_,
329 0 : remoteClosed() ? &response_->headers() : nullptr);
330 :
331 : // Finalize the span based on whether we received a response or not.
332 0 : Tracing::HttpTracerUtility::finalizeUpstreamSpan(*child_span_, streamInfo(),
333 0 : Tracing::EgressConfig::get());
334 :
335 0 : if (!cancelled_) {
336 : // In this case we don't have a valid response so we do need to raise a failure.
337 0 : callbacks_.onFailure(*this, AsyncClient::FailureReason::Reset);
338 0 : }
339 0 : }
340 :
341 0 : void AsyncRequestSharedImpl::cancel() {
342 0 : cancelled_ = true;
343 :
344 : // Add tags about the cancellation.
345 0 : child_span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True);
346 :
347 0 : reset();
348 0 : }
349 :
350 : } // namespace Http
351 : } // namespace Envoy
|