1
#include "source/common/http/async_client_impl.h"
2

            
3
#include <memory>
4
#include <string>
5

            
6
#include "envoy/config/core/v3/base.pb.h"
7
#include "envoy/router/router.h"
8

            
9
#include "source/common/grpc/common.h"
10
#include "source/common/http/null_route_impl.h"
11
#include "source/common/http/utility.h"
12
#include "source/common/local_reply/local_reply.h"
13
#include "source/common/protobuf/message_validator_impl.h"
14
#include "source/common/stream_info/filter_state_impl.h"
15
#include "source/common/tracing/http_tracer_impl.h"
16
#include "source/common/upstream/retry_factory.h"
17

            
18
namespace Envoy {
19
namespace Http {
20

            
21
const absl::string_view AsyncClientImpl::ResponseBufferLimit = "http.async_response_buffer_limit";
22

            
23
AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster,
24
                                 Stats::Store& stats_store, Event::Dispatcher& dispatcher,
25
                                 Upstream::ClusterManager& cm,
26
                                 Server::Configuration::CommonFactoryContext& factory_context,
27
                                 Router::ShadowWriterPtr&& shadow_writer,
28
                                 Http::Context& http_context, Router::Context& router_context)
29
2147
    : factory_context_(factory_context), cluster_(cluster),
30
2147
      config_(std::make_shared<Router::FilterConfig>(
31
2147
          factory_context, http_context.asyncClientStatPrefix(), *stats_store.rootScope(), cm,
32
2147
          factory_context.runtime(), factory_context.api().randomGenerator(),
33
2147
          std::move(shadow_writer), true, false, false, false, false, false, false,
34
2147
          Protobuf::RepeatedPtrField<std::string>{}, dispatcher.timeSource(), http_context,
35
2147
          router_context)),
36
2147
      dispatcher_(dispatcher), local_reply_(LocalReply::Factory::createDefault()) {}
37

            
38
2147
AsyncClientImpl::~AsyncClientImpl() {
39
2172
  while (!active_streams_.empty()) {
40
25
    active_streams_.front()->reset();
41
25
  }
42
2147
}
43

            
44
408
template <typename T> T* AsyncClientImpl::internalStartRequest(T* async_request) {
45
408
  if (!async_request) {
46
    return nullptr;
47
  }
48
408
  async_request->initialize();
49
408
  std::unique_ptr<AsyncStreamImpl> new_request{async_request};
50

            
51
  // The request may get immediately failed. If so, we will return nullptr.
52
408
  if (!new_request->remote_closed_) {
53
404
    LinkedList::moveIntoList(std::move(new_request), active_streams_);
54
404
    return async_request;
55
404
  } else {
56
4
    new_request->cleanup();
57
4
    return nullptr;
58
4
  }
59
408
}
60

            
61
template AsyncRequestImpl*
62
AsyncClientImpl::internalStartRequest<AsyncRequestImpl>(AsyncRequestImpl*);
63
template AsyncOngoingRequestImpl*
64
AsyncClientImpl::internalStartRequest<AsyncOngoingRequestImpl>(AsyncOngoingRequestImpl*);
65

            
66
AsyncClient::Request* AsyncClientImpl::send(RequestMessagePtr&& request,
67
                                            AsyncClient::Callbacks& callbacks,
68
325
                                            const AsyncClient::RequestOptions& options) {
69
325
  AsyncRequestImpl* async_request =
70
325
      AsyncRequestImpl::create(std::move(request), *this, callbacks, options);
71
325
  return internalStartRequest(async_request);
72
325
}
73

            
74
AsyncClient::OngoingRequest*
75
AsyncClientImpl::startRequest(RequestHeaderMapPtr&& request_headers, Callbacks& callbacks,
76
83
                              const AsyncClient::RequestOptions& options) {
77
83
  AsyncOngoingRequestImpl* async_request =
78
83
      AsyncOngoingRequestImpl::create(std::move(request_headers), *this, callbacks, options);
79
83
  return internalStartRequest(async_request);
80
83
}
81

            
82
AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks,
83
2738
                                            const AsyncClient::StreamOptions& options) {
84
2738
  auto stream_or_error = AsyncStreamImpl::create(*this, callbacks, options);
85
2738
  if (!stream_or_error.ok()) {
86
    callbacks.onReset();
87
    return nullptr;
88
  }
89
2738
  LinkedList::moveIntoList(std::move(stream_or_error.value()), active_streams_);
90
2738
  return active_streams_.front().get();
91
2738
}
92

            
93
Router::RetryPolicyConstSharedPtr
94
createRetryPolicy(const AsyncClient::StreamOptions& options,
95
                  Server::Configuration::CommonFactoryContext& context,
96
3154
                  absl::Status& creation_status) {
97
3154
  if (options.retry_policy.has_value()) {
98
17
    auto policy_or_error = Router::RetryPolicyImpl::create(
99
17
        options.retry_policy.value(), ProtobufMessage::getNullValidationVisitor(), context);
100
17
    creation_status = policy_or_error.status();
101
17
    return policy_or_error.status().ok() ? std::move(policy_or_error.value())
102
17
                                         : Router::RetryPolicyImpl::DefaultRetryPolicy;
103
17
  }
104
3137
  return options.parsed_retry_policy != nullptr ? options.parsed_retry_policy
105
3137
                                                : Router::RetryPolicyImpl::DefaultRetryPolicy;
106
3154
}
107

            
108
AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
109
                                 const AsyncClient::StreamOptions& options,
110
                                 absl::Status& creation_status)
111
3154
    : parent_(parent),
112

            
113
3154
      discard_response_body_(options.discard_response_body),
114
3154
      new_async_client_retry_logic_(Runtime::runtimeFeatureEnabled(
115
3154
          "envoy.reloadable_features.http_async_client_retry_respect_buffer_limits")),
116
3154
      buffer_limit_(options.buffer_limit_), stream_callbacks_(callbacks),
117
3154
      stream_id_(parent.config_->random_.random()),
118
3154
      router_(options.filter_config_ ? options.filter_config_ : parent.config_,
119
3154
              parent.config_->async_stats_),
120
3154
      stream_info_(Protocol::Http11, parent.dispatcher().timeSource(), nullptr,
121
3154
                   options.filter_state != nullptr
122
3154
                       ? options.filter_state
123
3154
                       : std::make_shared<StreamInfo::FilterStateImpl>(
124
3153
                             StreamInfo::FilterState::LifeSpan::FilterChain)),
125
3154
      tracing_config_(Tracing::EgressConfig::get()), local_reply_(*parent.local_reply_),
126
3154
      account_(options.account_), send_xff_(options.send_xff),
127
3154
      send_internal_(options.send_internal),
128
3154
      upstream_override_host_(options.upstream_override_host_) {
129
3154
  auto retry_policy = createRetryPolicy(options, parent.factory_context_, creation_status);
130

            
131
  // A field initialization may set the creation-status as unsuccessful.
132
  // In that case return immediately.
133
3154
  if (!creation_status.ok()) {
134
1
    return;
135
1
  }
136

            
137
3153
  const Router::MetadataMatchCriteria* metadata_matching_criteria = nullptr;
138
3153
  if (options.parent_context.stream_info != nullptr) {
139
900
    stream_info_.setParentStreamInfo(*options.parent_context.stream_info);
140
    // Keep the parent root to ensure the metadata_matching_criteria will not become
141
    // dangling pointer once the parent downstream request is gone.
142
900
    parent_route_ = options.parent_context.stream_info->route();
143
900
    if (parent_route_ != nullptr) {
144
868
      const auto* route_entry = parent_route_->routeEntry();
145
868
      if (route_entry != nullptr) {
146
867
        metadata_matching_criteria = route_entry->metadataMatchCriteria();
147
867
      }
148
868
    }
149
900
  }
150

            
151
3153
  auto route_or_error = NullRouteImpl::create(
152
3153
      parent_.cluster_->name(), std::move(retry_policy), parent_.factory_context_.regexEngine(),
153
3153
      options.timeout, options.hash_policy, metadata_matching_criteria);
154
3153
  SET_AND_RETURN_IF_NOT_OK(route_or_error.status(), creation_status);
155
3153
  route_ = std::move(*route_or_error);
156
3153
  stream_info_.dynamicMetadata().MergeFrom(options.metadata);
157
3153
  stream_info_.setIsShadow(options.is_shadow);
158
3153
  stream_info_.setUpstreamClusterInfo(parent_.cluster_);
159
3153
  stream_info_.route_ = route_;
160

            
161
3153
  if (options.buffer_body_for_retry) {
162
226
    buffered_body_ = std::make_unique<Buffer::OwnedImpl>(account_);
163
226
  }
164

            
165
3153
  router_.setDecoderFilterCallbacks(*this);
166
  // TODO(mattklein123): Correctly set protocol in stream info when we support access logging.
167
3153
}
168

            
169
void AsyncStreamImpl::sendLocalReply(Code code, absl::string_view body,
170
                                     std::function<void(ResponseHeaderMap& headers)> modify_headers,
171
                                     const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
172
1531
                                     absl::string_view details) {
173
1531
  stream_info_.setResponseCodeDetails(details);
174
1531
  if (encoded_response_headers_) {
175
1140
    resetStream();
176
1140
    return;
177
1140
  }
178
391
  Utility::sendLocalReply(
179
391
      remote_closed_,
180
391
      Utility::EncodeFunctions{
181
391
          [modify_headers](ResponseHeaderMap& headers) -> void {
182
391
            if (modify_headers != nullptr) {
183
387
              modify_headers(headers);
184
387
            }
185
391
          },
186
391
          [this](ResponseHeaderMap& response_headers, Code& code, std::string& body,
187
391
                 absl::string_view& content_type) -> void {
188
391
            local_reply_.rewrite(request_headers_, response_headers, stream_info_, code, body,
189
391
                                 content_type);
190
391
          },
191
391
          [this, &details](ResponseHeaderMapPtr&& headers, bool end_stream) -> void {
192
391
            encodeHeaders(std::move(headers), end_stream, details);
193
391
          },
194
391
          [this](Buffer::Instance& data, bool end_stream) -> void {
195
58
            encodeData(data, end_stream);
196
58
          }},
197
391
      Utility::LocalReplyData{is_grpc_request_, code, body, grpc_status, is_head_request_});
198
391
}
199
void AsyncStreamImpl::encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
200
3096
                                    absl::string_view) {
201
3096
  ENVOY_LOG(debug, "async http request response headers (end_stream={}):\n{}", end_stream,
202
3096
            *headers);
203
3096
  ASSERT(!remote_closed_);
204
3096
  encoded_response_headers_ = true;
205
3096
  stream_callbacks_.onHeaders(std::move(headers), end_stream);
206
3096
  closeRemote(end_stream);
207
  // At present, the AsyncStream is always fully closed when the server half closes the stream.
208
  //
209
  // Always ensure we close locally to trigger completion. Another option would be to issue a stream
210
  // reset here if local isn't yet closed, triggering cleanup along a more standardized path.
211
  // However, this would require additional logic to handle the response completion and subsequent
212
  // reset, and run the risk of being interpreted as a failure, when in fact no error has
213
  // necessarily occurred. Gracefully closing seems most in-line with behavior elsewhere in Envoy
214
  // for now.
215
3096
  closeLocal(end_stream);
216
3096
}
217

            
218
104077
void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
219
104077
  ENVOY_LOG(trace, "async http request response data (length={} end_stream={})", data.length(),
220
104077
            end_stream);
221
104077
  ASSERT(!remote_closed_);
222
104077
  stream_callbacks_.onData(data, end_stream);
223
104077
  closeRemote(end_stream);
224
  // Ensure we close locally on receiving a complete response; see comment in encodeHeaders for
225
  // rationale.
226
104077
  closeLocal(end_stream);
227
104077
}
228

            
229
392
void AsyncStreamImpl::encodeTrailers(ResponseTrailerMapPtr&& trailers) {
230
392
  ENVOY_LOG(debug, "async http request response trailers:\n{}", *trailers);
231
392
  ASSERT(!remote_closed_);
232
392
  stream_callbacks_.onTrailers(std::move(trailers));
233
392
  closeRemote(true);
234
  // Ensure we close locally on receiving a complete response; see comment in encodeHeaders for
235
  // rationale.
236
392
  closeLocal(true);
237
392
}
238

            
239
3141
void AsyncStreamImpl::sendHeaders(RequestHeaderMap& headers, bool end_stream) {
240
3141
  request_headers_ = &headers;
241

            
242
3141
  if (Http::Headers::get().MethodValues.Head == headers.getMethodValue()) {
243
55
    is_head_request_ = true;
244
55
  }
245

            
246
3141
  is_grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
247
3141
  if (send_internal_) {
248
3094
    headers.setReferenceEnvoyInternalRequest(Headers::get().EnvoyInternalRequestValues.True);
249
3094
  }
250

            
251
3141
  if (send_xff_) {
252
3023
    Utility::appendXff(headers, *parent_.config_->factory_context_.localInfo().address());
253
3023
  }
254

            
255
3141
  router_.decodeHeaders(headers, end_stream);
256
3141
  closeLocal(end_stream);
257
3141
}
258

            
259
70633
void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
260
70633
  ASSERT(dispatcher().isThreadSafe());
261
  // Map send calls after local closure to no-ops. The send call could have been queued prior to
262
  // remote reset or closure, and/or closure could have occurred synchronously in response to a
263
  // previous send. In these cases the router will have already cleaned up stream state. This
264
  // parallels handling in the main Http::ConnectionManagerImpl as well.
265
70633
  if (local_closed_) {
266
6
    return;
267
6
  }
268

            
269
70627
  if (!new_async_client_retry_logic_) {
270
2
    if (buffered_body_ != nullptr) {
271
      // TODO(shikugawa): Currently, data is dropped when the retry buffer overflows and there is no
272
      // ability implement any error handling. We need to implement buffer overflow handling in the
273
      // future. Options include configuring the max buffer size, or for use cases like gRPC
274
      // streaming, deleting old data in the retry buffer.
275
2
      if (buffered_body_->length() + data.length() > kDefaultDecoderBufferLimit) {
276
1
        ENVOY_LOG_EVERY_POW_2(
277
1
            warn, "the buffer size limit (64KB) for async client retries has been exceeded.");
278
1
      } else {
279
1
        buffered_body_->add(data);
280
1
      }
281
2
    }
282
2
  }
283

            
284
70627
  if (router_.awaitingHost()) {
285
3
    ENVOY_LOG_EVERY_POW_2(warn, "the buffer limit for the async client has been exceeded "
286
3
                                "due to async host selection");
287
3
    reset();
288
3
    return;
289
3
  }
290

            
291
70624
  router_.decodeData(data, end_stream);
292
70624
  closeLocal(end_stream);
293
70624
}
294

            
295
7
void AsyncStreamImpl::sendTrailers(RequestTrailerMap& trailers) {
296
7
  request_trailers_ = &trailers;
297

            
298
7
  ASSERT(dispatcher().isThreadSafe());
299
  // See explanation in sendData.
300
7
  if (local_closed_) {
301
1
    return;
302
1
  }
303

            
304
6
  router_.decodeTrailers(trailers);
305
6
  closeLocal(true);
306
6
}
307

            
308
181336
void AsyncStreamImpl::closeLocal(bool end_stream) {
309
  // This guard ensures that we don't attempt to clean up a stream or fire a completion callback
310
  // for a stream that has already been closed. Both send* calls and resets can result in stream
311
  // closure, and this state may be updated synchronously during stream interaction and callbacks.
312
  // Additionally AsyncRequestImpl maintains behavior wherein its onComplete callback will fire
313
  // immediately upon receiving a complete response, regardless of whether it has finished sending
314
  // a request.
315
  // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage
316
  // guards against redundant cleanup), but to surface consistent stream state via callbacks,
317
  // it's necessary to be more rigorous.
318
  // TODO(goaway): Consider deeper cleanup of assumptions here.
319
181336
  if (local_closed_) {
320
3026
    return;
321
3026
  }
322

            
323
178310
  local_closed_ = end_stream;
324
178310
  if (complete()) {
325
10
    stream_callbacks_.onComplete();
326
10
    cleanup();
327
10
  }
328
178310
}
329

            
330
107565
void AsyncStreamImpl::closeRemote(bool end_stream) {
331
  // This guard ensures that we don't attempt to clean up a stream or fire a completion callback for
332
  // a stream that has already been closed. This function is called synchronously after callbacks
333
  // have executed, and it's possible for callbacks to, for instance, directly reset a stream or
334
  // close the remote manually. The test case ResetInOnHeaders covers this case specifically.
335
  // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage
336
  // guards against redundant cleanup), but to surface consistent stream state via callbacks, it's
337
  // necessary to be more rigorous.
338
  // TODO(goaway): Consider deeper cleanup of assumptions here.
339
107565
  if (remote_closed_) {
340
1344
    return;
341
1344
  }
342

            
343
106221
  remote_closed_ = end_stream;
344
106221
  if (complete()) {
345
525
    stream_callbacks_.onComplete();
346
525
    cleanup();
347
525
  }
348
106221
}
349

            
350
1492
void AsyncStreamImpl::reset() {
351
1492
  routerDestroy();
352
1492
  resetStream();
353
1492
}
354

            
355
7788
void AsyncStreamImpl::routerDestroy() {
356
7788
  if (!router_destroyed_) {
357
3154
    router_destroyed_ = true;
358
3154
    router_.onDestroy();
359
3154
  }
360
7788
}
361

            
362
3171
void AsyncStreamImpl::cleanup() {
363
3171
  ASSERT(dispatcher().isThreadSafe());
364
3171
  local_closed_ = remote_closed_ = true;
365
  // This will destroy us, but only do so if we are actually in a list. This does not happen in
366
  // the immediate failure case.
367
3171
  if (inserted()) {
368
3142
    routerDestroy();
369
3142
    dispatcher().deferredDelete(removeFromList(parent_.active_streams_));
370
3142
  }
371
3171
}
372

            
373
2632
void AsyncStreamImpl::resetStream(Http::StreamResetReason, absl::string_view) {
374
2632
  stream_callbacks_.onReset();
375
2632
  cleanup();
376
2632
}
377

            
378
AsyncRequestSharedImpl::AsyncRequestSharedImpl(AsyncClientImpl& parent,
379
                                               AsyncClient::Callbacks& callbacks,
380
                                               const AsyncClient::RequestOptions& options,
381
                                               absl::Status& creation_status)
382
408
    : AsyncStreamImpl(parent, *this, options, creation_status), callbacks_(callbacks),
383
408
      response_buffer_limit_(parent.config_->runtime_.snapshot().getInteger(
384
408
          AsyncClientImpl::ResponseBufferLimit, kBufferLimitForResponse)) {
385
408
  if (!creation_status.ok()) {
386
    return;
387
  }
388
408
  if (options.parent_span_ != nullptr) {
389
263
    const std::string child_span_name =
390
263
        options.child_span_name_.empty()
391
263
            ? absl::StrCat("async ", parent.cluster_->name(), " egress")
392
263
            : options.child_span_name_;
393
263
    child_span_ = options.parent_span_->spawnChild(Tracing::EgressConfig::get(), child_span_name,
394
263
                                                   parent.dispatcher().timeSource().systemTime());
395
401
  } else {
396
145
    child_span_ = std::make_unique<Tracing::NullSpan>();
397
145
  }
398
  // Span gets sampled by default, as sampled_ defaults to true.
399
  // If caller overrides sampled_ with empty value, sampling status of the parent is kept.
400
408
  if (options.sampled_.has_value()) {
401
232
    child_span_->setSampled(options.sampled_.value());
402
232
  }
403
408
}
404

            
405
325
void AsyncRequestImpl::initialize() {
406
325
  Tracing::HttpTraceContext trace_context(request_->headers());
407
325
  Tracing::UpstreamContext upstream_context(nullptr,                    // host_
408
325
                                            parent_.cluster_.get(),     // cluster_
409
325
                                            Tracing::ServiceType::Http, // service_type_
410
325
                                            true                        // async_client_span_
411
325
  );
412
325
  child_span_->injectContext(trace_context, upstream_context);
413
325
  sendHeaders(request_->headers(), request_->body().length() == 0);
414
325
  if (request_->body().length() != 0) {
415
    // It's possible this will be a no-op due to a local response synchronously generated in
416
    // sendHeaders; guards handle this within AsyncStreamImpl.
417
130
    sendData(request_->body(), true);
418
130
  }
419
  // TODO(mattklein123): Support request trailers.
420
325
}
421

            
422
83
void AsyncOngoingRequestImpl::initialize() {
423
83
  Tracing::HttpTraceContext trace_context(*request_headers_);
424
83
  Tracing::UpstreamContext upstream_context(nullptr,                    // host_
425
83
                                            parent_.cluster_.get(),     // cluster_
426
83
                                            Tracing::ServiceType::Http, // service_type_
427
83
                                            true                        // async_client_span_
428
83
  );
429
83
  child_span_->injectContext(trace_context, upstream_context);
430
83
  sendHeaders(*request_headers_, false);
431
83
}
432

            
433
368
void AsyncRequestSharedImpl::onComplete() {
434
368
  complete_ = true;
435
368
  callbacks_.onBeforeFinalizeUpstreamSpan(*child_span_, &response_->headers());
436

            
437
368
  Tracing::HttpTracerUtility::finalizeUpstreamSpan(*child_span_, streamInfo(),
438
368
                                                   Tracing::EgressConfig::get());
439

            
440
368
  callbacks_.onSuccess(*this, std::move(response_));
441
368
}
442

            
443
375
void AsyncRequestSharedImpl::onHeaders(ResponseHeaderMapPtr&& headers, bool) {
444
375
  const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
445
375
  streamInfo().setResponseCode(response_code);
446
375
  response_ = std::make_unique<ResponseMessageImpl>(std::move(headers));
447
375
}
448

            
449
352
void AsyncRequestSharedImpl::onData(Buffer::Instance& data, bool) {
450
352
  if (discard_response_body_) {
451
127
    data.drain(data.length());
452
127
    return;
453
127
  }
454

            
455
225
  if (response_->body().length() + data.length() > response_buffer_limit_) {
456
4
    ENVOY_LOG_EVERY_POW_2(warn, "the buffer size limit for async client response body "
457
4
                                "has been exceeded, draining data");
458
4
    data.drain(data.length());
459
4
    response_buffer_overlimit_ = true;
460
4
    reset();
461
221
  } else {
462
221
    response_->body().move(data);
463
221
  }
464
225
}
465

            
466
1
void AsyncRequestSharedImpl::onTrailers(ResponseTrailerMapPtr&& trailers) {
467
1
  response_->trailers(std::move(trailers));
468
1
}
469

            
470
61
void AsyncRequestSharedImpl::onReset() {
471
61
  if (complete_) {
472
    // This request has already been completed; a reset should be ignored.
473
21
    return;
474
21
  }
475

            
476
40
  if (!cancelled_) {
477
    // Set "error reason" tag related to reset. The tagging for "error true" is done inside the
478
    // Tracing::HttpTracerUtility::finalizeUpstreamSpan.
479
25
    child_span_->setTag(Tracing::Tags::get().ErrorReason, "Reset");
480
25
  }
481

            
482
40
  callbacks_.onBeforeFinalizeUpstreamSpan(*child_span_,
483
40
                                          remoteClosed() ? &response_->headers() : nullptr);
484

            
485
  // Finalize the span based on whether we received a response or not.
486
40
  Tracing::HttpTracerUtility::finalizeUpstreamSpan(*child_span_, streamInfo(),
487
40
                                                   Tracing::EgressConfig::get());
488

            
489
40
  if (!cancelled_) {
490
25
    if (response_buffer_overlimit_) {
491
4
      callbacks_.onFailure(*this, AsyncClient::FailureReason::ExceedResponseBufferLimit);
492
23
    } else {
493
      // In this case we don't have a valid response so we do need to raise a failure.
494
21
      callbacks_.onFailure(*this, AsyncClient::FailureReason::Reset);
495
21
    }
496
25
  }
497
40
}
498

            
499
35
void AsyncRequestSharedImpl::cancel() {
500
35
  cancelled_ = true;
501

            
502
  // Add tags about the cancellation.
503
35
  child_span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True);
504

            
505
35
  reset();
506
35
}
507

            
508
} // namespace Http
509
} // namespace Envoy