1
#include "source/common/http/async_client_impl.h"
2

            
3
#include <memory>
4
#include <string>
5

            
6
#include "envoy/config/core/v3/base.pb.h"
7
#include "envoy/router/router.h"
8

            
9
#include "source/common/grpc/common.h"
10
#include "source/common/http/null_route_impl.h"
11
#include "source/common/http/utility.h"
12
#include "source/common/local_reply/local_reply.h"
13
#include "source/common/protobuf/message_validator_impl.h"
14
#include "source/common/stream_info/filter_state_impl.h"
15
#include "source/common/tracing/http_tracer_impl.h"
16
#include "source/common/upstream/retry_factory.h"
17

            
18
namespace Envoy {
19
namespace Http {
20

            
21
const absl::string_view AsyncClientImpl::ResponseBufferLimit = "http.async_response_buffer_limit";
22

            
23
AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster,
24
                                 Stats::Store& stats_store, Event::Dispatcher& dispatcher,
25
                                 Upstream::ClusterManager& cm,
26
                                 Server::Configuration::CommonFactoryContext& factory_context,
27
                                 Router::ShadowWriterPtr&& shadow_writer,
28
                                 Http::Context& http_context, Router::Context& router_context)
29
2128
    : factory_context_(factory_context), cluster_(cluster),
30
2128
      config_(std::make_shared<Router::FilterConfig>(
31
2128
          factory_context, http_context.asyncClientStatPrefix(), *stats_store.rootScope(), cm,
32
2128
          factory_context.runtime(), factory_context.api().randomGenerator(),
33
2128
          std::move(shadow_writer), true, false, false, false, false, false, false,
34
2128
          Protobuf::RepeatedPtrField<std::string>{}, dispatcher.timeSource(), http_context,
35
2128
          router_context)),
36
2128
      dispatcher_(dispatcher), local_reply_(LocalReply::Factory::createDefault()) {}
37

            
38
2128
AsyncClientImpl::~AsyncClientImpl() {
39
2151
  while (!active_streams_.empty()) {
40
23
    active_streams_.front()->reset();
41
23
  }
42
2128
}
43

            
44
405
template <typename T> T* AsyncClientImpl::internalStartRequest(T* async_request) {
45
405
  if (!async_request) {
46
    return nullptr;
47
  }
48
405
  async_request->initialize();
49
405
  std::unique_ptr<AsyncStreamImpl> new_request{async_request};
50

            
51
  // The request may get immediately failed. If so, we will return nullptr.
52
405
  if (!new_request->remote_closed_) {
53
401
    LinkedList::moveIntoList(std::move(new_request), active_streams_);
54
401
    return async_request;
55
401
  } else {
56
4
    new_request->cleanup();
57
4
    return nullptr;
58
4
  }
59
405
}
60

            
61
template AsyncRequestImpl*
62
AsyncClientImpl::internalStartRequest<AsyncRequestImpl>(AsyncRequestImpl*);
63
template AsyncOngoingRequestImpl*
64
AsyncClientImpl::internalStartRequest<AsyncOngoingRequestImpl>(AsyncOngoingRequestImpl*);
65

            
66
AsyncClient::Request* AsyncClientImpl::send(RequestMessagePtr&& request,
67
                                            AsyncClient::Callbacks& callbacks,
68
322
                                            const AsyncClient::RequestOptions& options) {
69
322
  AsyncRequestImpl* async_request =
70
322
      AsyncRequestImpl::create(std::move(request), *this, callbacks, options);
71
322
  return internalStartRequest(async_request);
72
322
}
73

            
74
AsyncClient::OngoingRequest*
75
AsyncClientImpl::startRequest(RequestHeaderMapPtr&& request_headers, Callbacks& callbacks,
76
83
                              const AsyncClient::RequestOptions& options) {
77
83
  AsyncOngoingRequestImpl* async_request =
78
83
      AsyncOngoingRequestImpl::create(std::move(request_headers), *this, callbacks, options);
79
83
  return internalStartRequest(async_request);
80
83
}
81

            
82
AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks,
83
2683
                                            const AsyncClient::StreamOptions& options) {
84
2683
  auto stream_or_error = AsyncStreamImpl::create(*this, callbacks, options);
85
2683
  if (!stream_or_error.ok()) {
86
    callbacks.onReset();
87
    return nullptr;
88
  }
89
2683
  LinkedList::moveIntoList(std::move(stream_or_error.value()), active_streams_);
90
2683
  return active_streams_.front().get();
91
2683
}
92

            
93
Router::RetryPolicyConstSharedPtr
94
createRetryPolicy(const AsyncClient::StreamOptions& options,
95
                  Server::Configuration::CommonFactoryContext& context,
96
3096
                  absl::Status& creation_status) {
97
3096
  if (options.retry_policy.has_value()) {
98
17
    auto policy_or_error = Router::RetryPolicyImpl::create(
99
17
        options.retry_policy.value(), ProtobufMessage::getNullValidationVisitor(), context);
100
17
    creation_status = policy_or_error.status();
101
17
    return policy_or_error.status().ok() ? std::move(policy_or_error.value())
102
17
                                         : Router::RetryPolicyImpl::DefaultRetryPolicy;
103
17
  }
104
3079
  return options.parsed_retry_policy != nullptr ? options.parsed_retry_policy
105
3079
                                                : Router::RetryPolicyImpl::DefaultRetryPolicy;
106
3096
}
107

            
108
AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
109
                                 const AsyncClient::StreamOptions& options,
110
                                 absl::Status& creation_status)
111
3096
    : parent_(parent),
112

            
113
3096
      discard_response_body_(options.discard_response_body),
114
3096
      new_async_client_retry_logic_(Runtime::runtimeFeatureEnabled(
115
3096
          "envoy.reloadable_features.http_async_client_retry_respect_buffer_limits")),
116
3096
      buffer_limit_(options.buffer_limit_), stream_callbacks_(callbacks),
117
3096
      stream_id_(parent.config_->random_.random()),
118
3096
      router_(options.filter_config_ ? options.filter_config_ : parent.config_,
119
3096
              parent.config_->async_stats_),
120
3096
      stream_info_(Protocol::Http11, parent.dispatcher().timeSource(), nullptr,
121
3096
                   options.filter_state != nullptr
122
3096
                       ? options.filter_state
123
3096
                       : std::make_shared<StreamInfo::FilterStateImpl>(
124
3095
                             StreamInfo::FilterState::LifeSpan::FilterChain)),
125
3096
      tracing_config_(Tracing::EgressConfig::get()), local_reply_(*parent.local_reply_),
126
3096
      account_(options.account_), send_xff_(options.send_xff),
127
3096
      send_internal_(options.send_internal),
128
3096
      upstream_override_host_(options.upstream_override_host_) {
129
3096
  auto retry_policy = createRetryPolicy(options, parent.factory_context_, creation_status);
130

            
131
  // A field initialization may set the creation-status as unsuccessful.
132
  // In that case return immediately.
133
3096
  if (!creation_status.ok()) {
134
1
    return;
135
1
  }
136

            
137
3095
  const Router::MetadataMatchCriteria* metadata_matching_criteria = nullptr;
138
3095
  if (options.parent_context.stream_info != nullptr) {
139
899
    stream_info_.setParentStreamInfo(*options.parent_context.stream_info);
140
    // Keep the parent root to ensure the metadata_matching_criteria will not become
141
    // dangling pointer once the parent downstream request is gone.
142
899
    parent_route_ = options.parent_context.stream_info->route();
143
899
    if (parent_route_ != nullptr) {
144
867
      const auto* route_entry = parent_route_->routeEntry();
145
867
      if (route_entry != nullptr) {
146
866
        metadata_matching_criteria = route_entry->metadataMatchCriteria();
147
866
      }
148
867
    }
149
899
  }
150

            
151
3095
  auto route_or_error = NullRouteImpl::create(
152
3095
      parent_.cluster_->name(), std::move(retry_policy), parent_.factory_context_.regexEngine(),
153
3095
      options.timeout, options.hash_policy, metadata_matching_criteria);
154
3095
  SET_AND_RETURN_IF_NOT_OK(route_or_error.status(), creation_status);
155
3095
  route_ = std::move(*route_or_error);
156
3095
  stream_info_.dynamicMetadata().MergeFrom(options.metadata);
157
3095
  stream_info_.setIsShadow(options.is_shadow);
158
3095
  stream_info_.setUpstreamClusterInfo(parent_.cluster_);
159
3095
  stream_info_.route_ = route_;
160

            
161
3095
  if (options.buffer_body_for_retry) {
162
226
    buffered_body_ = std::make_unique<Buffer::OwnedImpl>(account_);
163
226
  }
164

            
165
3095
  router_.setDecoderFilterCallbacks(*this);
166
  // TODO(mattklein123): Correctly set protocol in stream info when we support access logging.
167
3095
}
168

            
169
void AsyncStreamImpl::sendLocalReply(Code code, absl::string_view body,
170
                                     std::function<void(ResponseHeaderMap& headers)> modify_headers,
171
                                     const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
172
1473
                                     absl::string_view details) {
173
1473
  stream_info_.setResponseCodeDetails(details);
174
1473
  if (encoded_response_headers_) {
175
1090
    resetStream();
176
1090
    return;
177
1090
  }
178
383
  Utility::sendLocalReply(
179
383
      remote_closed_,
180
383
      Utility::EncodeFunctions{
181
383
          [modify_headers](ResponseHeaderMap& headers) -> void {
182
383
            if (modify_headers != nullptr) {
183
379
              modify_headers(headers);
184
379
            }
185
383
          },
186
383
          [this](ResponseHeaderMap& response_headers, Code& code, std::string& body,
187
383
                 absl::string_view& content_type) -> void {
188
383
            local_reply_.rewrite(request_headers_, response_headers, stream_info_, code, body,
189
383
                                 content_type);
190
383
          },
191
383
          [this, &details](ResponseHeaderMapPtr&& headers, bool end_stream) -> void {
192
383
            encodeHeaders(std::move(headers), end_stream, details);
193
383
          },
194
383
          [this](Buffer::Instance& data, bool end_stream) -> void {
195
60
            encodeData(data, end_stream);
196
60
          }},
197
383
      Utility::LocalReplyData{is_grpc_request_, code, body, grpc_status, is_head_request_});
198
383
}
199
void AsyncStreamImpl::encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
200
3038
                                    absl::string_view) {
201
3038
  ENVOY_LOG(debug, "async http request response headers (end_stream={}):\n{}", end_stream,
202
3038
            *headers);
203
3038
  ASSERT(!remote_closed_);
204
3038
  encoded_response_headers_ = true;
205
3038
  stream_callbacks_.onHeaders(std::move(headers), end_stream);
206
3038
  closeRemote(end_stream);
207
  // At present, the AsyncStream is always fully closed when the server half closes the stream.
208
  //
209
  // Always ensure we close locally to trigger completion. Another option would be to issue a stream
210
  // reset here if local isn't yet closed, triggering cleanup along a more standardized path.
211
  // However, this would require additional logic to handle the response completion and subsequent
212
  // reset, and run the risk of being interpreted as a failure, when in fact no error has
213
  // necessarily occurred. Gracefully closing seems most in-line with behavior elsewhere in Envoy
214
  // for now.
215
3038
  closeLocal(end_stream);
216
3038
}
217

            
218
104056
void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
219
104056
  ENVOY_LOG(trace, "async http request response data (length={} end_stream={})", data.length(),
220
104056
            end_stream);
221
104056
  ASSERT(!remote_closed_);
222
104056
  stream_callbacks_.onData(data, end_stream);
223
104056
  closeRemote(end_stream);
224
  // Ensure we close locally on receiving a complete response; see comment in encodeHeaders for
225
  // rationale.
226
104056
  closeLocal(end_stream);
227
104056
}
228

            
229
390
void AsyncStreamImpl::encodeTrailers(ResponseTrailerMapPtr&& trailers) {
230
390
  ENVOY_LOG(debug, "async http request response trailers:\n{}", *trailers);
231
390
  ASSERT(!remote_closed_);
232
390
  stream_callbacks_.onTrailers(std::move(trailers));
233
390
  closeRemote(true);
234
  // Ensure we close locally on receiving a complete response; see comment in encodeHeaders for
235
  // rationale.
236
390
  closeLocal(true);
237
390
}
238

            
239
3083
void AsyncStreamImpl::sendHeaders(RequestHeaderMap& headers, bool end_stream) {
240
3083
  request_headers_ = &headers;
241

            
242
3083
  if (Http::Headers::get().MethodValues.Head == headers.getMethodValue()) {
243
55
    is_head_request_ = true;
244
55
  }
245

            
246
3083
  is_grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
247
3083
  if (send_internal_) {
248
3036
    headers.setReferenceEnvoyInternalRequest(Headers::get().EnvoyInternalRequestValues.True);
249
3036
  }
250

            
251
3083
  if (send_xff_) {
252
2965
    Utility::appendXff(headers, *parent_.config_->factory_context_.localInfo().address());
253
2965
  }
254

            
255
3083
  router_.decodeHeaders(headers, end_stream);
256
3083
  closeLocal(end_stream);
257
3083
}
258

            
259
70490
void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
260
70490
  ASSERT(dispatcher().isThreadSafe());
261
  // Map send calls after local closure to no-ops. The send call could have been queued prior to
262
  // remote reset or closure, and/or closure could have occurred synchronously in response to a
263
  // previous send. In these cases the router will have already cleaned up stream state. This
264
  // parallels handling in the main Http::ConnectionManagerImpl as well.
265
70490
  if (local_closed_) {
266
6
    return;
267
6
  }
268

            
269
70484
  if (!new_async_client_retry_logic_) {
270
2
    if (buffered_body_ != nullptr) {
271
      // TODO(shikugawa): Currently, data is dropped when the retry buffer overflows and there is no
272
      // ability implement any error handling. We need to implement buffer overflow handling in the
273
      // future. Options include configuring the max buffer size, or for use cases like gRPC
274
      // streaming, deleting old data in the retry buffer.
275
2
      if (buffered_body_->length() + data.length() > kDefaultDecoderBufferLimit) {
276
1
        ENVOY_LOG_EVERY_POW_2(
277
1
            warn, "the buffer size limit (64KB) for async client retries has been exceeded.");
278
1
      } else {
279
1
        buffered_body_->add(data);
280
1
      }
281
2
    }
282
2
  }
283

            
284
70484
  if (router_.awaitingHost()) {
285
3
    ENVOY_LOG_EVERY_POW_2(warn, "the buffer limit for the async client has been exceeded "
286
3
                                "due to async host selection");
287
3
    reset();
288
3
    return;
289
3
  }
290

            
291
70481
  router_.decodeData(data, end_stream);
292
70481
  closeLocal(end_stream);
293
70481
}
294

            
295
7
void AsyncStreamImpl::sendTrailers(RequestTrailerMap& trailers) {
296
7
  request_trailers_ = &trailers;
297

            
298
7
  ASSERT(dispatcher().isThreadSafe());
299
  // See explanation in sendData.
300
7
  if (local_closed_) {
301
1
    return;
302
1
  }
303

            
304
6
  router_.decodeTrailers(trailers);
305
6
  closeLocal(true);
306
6
}
307

            
308
181054
void AsyncStreamImpl::closeLocal(bool end_stream) {
309
  // This guard ensures that we don't attempt to clean up a stream or fire a completion callback
310
  // for a stream that has already been closed. Both send* calls and resets can result in stream
311
  // closure, and this state may be updated synchronously during stream interaction and callbacks.
312
  // Additionally AsyncRequestImpl maintains behavior wherein its onComplete callback will fire
313
  // immediately upon receiving a complete response, regardless of whether it has finished sending
314
  // a request.
315
  // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage
316
  // guards against redundant cleanup), but to surface consistent stream state via callbacks,
317
  // it's necessary to be more rigorous.
318
  // TODO(goaway): Consider deeper cleanup of assumptions here.
319
181054
  if (local_closed_) {
320
3013
    return;
321
3013
  }
322

            
323
178041
  local_closed_ = end_stream;
324
178041
  if (complete()) {
325
10
    stream_callbacks_.onComplete();
326
10
    cleanup();
327
10
  }
328
178041
}
329

            
330
107484
void AsyncStreamImpl::closeRemote(bool end_stream) {
331
  // This guard ensures that we don't attempt to clean up a stream or fire a completion callback for
332
  // a stream that has already been closed. This function is called synchronously after callbacks
333
  // have executed, and it's possible for callbacks to, for instance, directly reset a stream or
334
  // close the remote manually. The test case ResetInOnHeaders covers this case specifically.
335
  // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage
336
  // guards against redundant cleanup), but to surface consistent stream state via callbacks, it's
337
  // necessary to be more rigorous.
338
  // TODO(goaway): Consider deeper cleanup of assumptions here.
339
107484
  if (remote_closed_) {
340
1324
    return;
341
1324
  }
342

            
343
106160
  remote_closed_ = end_stream;
344
106160
  if (complete()) {
345
532
    stream_callbacks_.onComplete();
346
532
    cleanup();
347
532
  }
348
106160
}
349

            
350
1477
void AsyncStreamImpl::reset() {
351
1477
  routerDestroy();
352
1477
  resetStream();
353
1477
}
354

            
355
7657
void AsyncStreamImpl::routerDestroy() {
356
7657
  if (!router_destroyed_) {
357
3096
    router_destroyed_ = true;
358
3096
    router_.onDestroy();
359
3096
  }
360
7657
}
361

            
362
3113
void AsyncStreamImpl::cleanup() {
363
3113
  ASSERT(dispatcher().isThreadSafe());
364
3113
  local_closed_ = remote_closed_ = true;
365
  // This will destroy us, but only do so if we are actually in a list. This does not happen in
366
  // the immediate failure case.
367
3113
  if (inserted()) {
368
3084
    routerDestroy();
369
3084
    dispatcher().deferredDelete(removeFromList(parent_.active_streams_));
370
3084
  }
371
3113
}
372

            
373
2567
void AsyncStreamImpl::resetStream(Http::StreamResetReason, absl::string_view) {
374
2567
  stream_callbacks_.onReset();
375
2567
  cleanup();
376
2567
}
377

            
378
AsyncRequestSharedImpl::AsyncRequestSharedImpl(AsyncClientImpl& parent,
379
                                               AsyncClient::Callbacks& callbacks,
380
                                               const AsyncClient::RequestOptions& options,
381
                                               absl::Status& creation_status)
382
405
    : AsyncStreamImpl(parent, *this, options, creation_status), callbacks_(callbacks),
383
405
      response_buffer_limit_(parent.config_->runtime_.snapshot().getInteger(
384
405
          AsyncClientImpl::ResponseBufferLimit, kBufferLimitForResponse)) {
385
405
  if (!creation_status.ok()) {
386
    return;
387
  }
388
405
  if (options.parent_span_ != nullptr) {
389
262
    const std::string child_span_name =
390
262
        options.child_span_name_.empty()
391
262
            ? absl::StrCat("async ", parent.cluster_->name(), " egress")
392
262
            : options.child_span_name_;
393
262
    child_span_ = options.parent_span_->spawnChild(Tracing::EgressConfig::get(), child_span_name,
394
262
                                                   parent.dispatcher().timeSource().systemTime());
395
398
  } else {
396
143
    child_span_ = std::make_unique<Tracing::NullSpan>();
397
143
  }
398
  // Span gets sampled by default, as sampled_ defaults to true.
399
  // If caller overrides sampled_ with empty value, sampling status of the parent is kept.
400
405
  if (options.sampled_.has_value()) {
401
230
    child_span_->setSampled(options.sampled_.value());
402
230
  }
403
405
}
404

            
405
322
void AsyncRequestImpl::initialize() {
406
322
  Tracing::HttpTraceContext trace_context(request_->headers());
407
322
  Tracing::UpstreamContext upstream_context(nullptr,                    // host_
408
322
                                            parent_.cluster_.get(),     // cluster_
409
322
                                            Tracing::ServiceType::Http, // service_type_
410
322
                                            true                        // async_client_span_
411
322
  );
412
322
  child_span_->injectContext(trace_context, upstream_context);
413
322
  sendHeaders(request_->headers(), request_->body().length() == 0);
414
322
  if (request_->body().length() != 0) {
415
    // It's possible this will be a no-op due to a local response synchronously generated in
416
    // sendHeaders; guards handle this within AsyncStreamImpl.
417
128
    sendData(request_->body(), true);
418
128
  }
419
  // TODO(mattklein123): Support request trailers.
420
322
}
421

            
422
83
void AsyncOngoingRequestImpl::initialize() {
423
83
  Tracing::HttpTraceContext trace_context(*request_headers_);
424
83
  Tracing::UpstreamContext upstream_context(nullptr,                    // host_
425
83
                                            parent_.cluster_.get(),     // cluster_
426
83
                                            Tracing::ServiceType::Http, // service_type_
427
83
                                            true                        // async_client_span_
428
83
  );
429
83
  child_span_->injectContext(trace_context, upstream_context);
430
83
  sendHeaders(*request_headers_, false);
431
83
}
432

            
433
367
void AsyncRequestSharedImpl::onComplete() {
434
367
  complete_ = true;
435
367
  callbacks_.onBeforeFinalizeUpstreamSpan(*child_span_, &response_->headers());
436

            
437
367
  Tracing::HttpTracerUtility::finalizeUpstreamSpan(*child_span_, streamInfo(),
438
367
                                                   Tracing::EgressConfig::get());
439

            
440
367
  callbacks_.onSuccess(*this, std::move(response_));
441
367
}
442

            
443
374
void AsyncRequestSharedImpl::onHeaders(ResponseHeaderMapPtr&& headers, bool) {
444
374
  const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
445
374
  streamInfo().setResponseCode(response_code);
446
374
  response_ = std::make_unique<ResponseMessageImpl>(std::move(headers));
447
374
}
448

            
449
352
void AsyncRequestSharedImpl::onData(Buffer::Instance& data, bool) {
450
352
  if (discard_response_body_) {
451
125
    data.drain(data.length());
452
125
    return;
453
125
  }
454

            
455
227
  if (response_->body().length() + data.length() > response_buffer_limit_) {
456
4
    ENVOY_LOG_EVERY_POW_2(warn, "the buffer size limit for async client response body "
457
4
                                "has been exceeded, draining data");
458
4
    data.drain(data.length());
459
4
    response_buffer_overlimit_ = true;
460
4
    reset();
461
223
  } else {
462
223
    response_->body().move(data);
463
223
  }
464
227
}
465

            
466
1
void AsyncRequestSharedImpl::onTrailers(ResponseTrailerMapPtr&& trailers) {
467
1
  response_->trailers(std::move(trailers));
468
1
}
469

            
470
59
void AsyncRequestSharedImpl::onReset() {
471
59
  if (complete_) {
472
    // This request has already been completed; a reset should be ignored.
473
21
    return;
474
21
  }
475

            
476
38
  if (!cancelled_) {
477
    // Set "error reason" tag related to reset. The tagging for "error true" is done inside the
478
    // Tracing::HttpTracerUtility::finalizeUpstreamSpan.
479
23
    child_span_->setTag(Tracing::Tags::get().ErrorReason, "Reset");
480
23
  }
481

            
482
38
  callbacks_.onBeforeFinalizeUpstreamSpan(*child_span_,
483
38
                                          remoteClosed() ? &response_->headers() : nullptr);
484

            
485
  // Finalize the span based on whether we received a response or not.
486
38
  Tracing::HttpTracerUtility::finalizeUpstreamSpan(*child_span_, streamInfo(),
487
38
                                                   Tracing::EgressConfig::get());
488

            
489
38
  if (!cancelled_) {
490
23
    if (response_buffer_overlimit_) {
491
4
      callbacks_.onFailure(*this, AsyncClient::FailureReason::ExceedResponseBufferLimit);
492
21
    } else {
493
      // In this case we don't have a valid response so we do need to raise a failure.
494
19
      callbacks_.onFailure(*this, AsyncClient::FailureReason::Reset);
495
19
    }
496
23
  }
497
38
}
498

            
499
35
void AsyncRequestSharedImpl::cancel() {
500
35
  cancelled_ = true;
501

            
502
  // Add tags about the cancellation.
503
35
  child_span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True);
504

            
505
35
  reset();
506
35
}
507

            
508
} // namespace Http
509
} // namespace Envoy