1
#include "source/common/grpc/async_client_impl.h"
2

            
3
#include "envoy/config/core/v3/grpc_service.pb.h"
4

            
5
#include "source/common/buffer/zero_copy_input_stream_impl.h"
6
#include "source/common/common/enum_to_int.h"
7
#include "source/common/common/utility.h"
8
#include "source/common/grpc/common.h"
9
#include "source/common/http/header_map_impl.h"
10
#include "source/common/http/utility.h"
11

            
12
#include "absl/strings/str_cat.h"
13

            
14
namespace Envoy {
15
namespace Grpc {
16
namespace {
17
std::string enhancedGrpcMessage(const std::string& original_message,
18
1966
                                const Http::AsyncClient::Stream* stream) {
19
1966
  const auto& http_response_code_details = (stream && stream->streamInfo().responseCodeDetails())
20
1966
                                               ? *stream->streamInfo().responseCodeDetails()
21
1966
                                               : EMPTY_STRING;
22
1966
  return original_message.empty() ? http_response_code_details
23
1966
         : http_response_code_details.empty()
24
396
             ? original_message
25
396
             : absl::StrCat(original_message, "{", http_response_code_details, "}");
26
1966
}
27

            
28
2359
void base64EscapeBinHeaders(Http::RequestHeaderMap& headers) {
29
2359
  absl::flat_hash_map<absl::string_view, std::string> bin_metadata;
30
11990
  headers.iterate([&bin_metadata](const Http::HeaderEntry& header) {
31
11990
    if (absl::EndsWith(header.key().getStringView(), "-bin")) {
32
4
      bin_metadata.emplace(header.key().getStringView(),
33
4
                           absl::Base64Escape(header.value().getStringView()));
34
4
    }
35
11990
    return Http::HeaderMap::Iterate::Continue;
36
11990
  });
37
2359
  for (const auto& [key, value] : bin_metadata) {
38
4
    Http::LowerCaseString key_string(key);
39
4
    headers.remove(key_string);
40
4
    headers.addCopy(key_string, value);
41
4
  }
42
2359
}
43
} // namespace
44

            
45
absl::StatusOr<std::unique_ptr<AsyncClientImpl>>
46
AsyncClientImpl::create(const envoy::config::core::v3::GrpcService& config,
47
1739
                        Server::Configuration::CommonFactoryContext& context) {
48
1739
  absl::Status creation_status = absl::OkStatus();
49
1739
  auto ret =
50
1739
      std::unique_ptr<AsyncClientImpl>(new AsyncClientImpl(config, context, creation_status));
51
1739
  RETURN_IF_NOT_OK(creation_status);
52
1739
  return ret;
53
1739
}
54

            
55
AsyncClientImpl::AsyncClientImpl(const envoy::config::core::v3::GrpcService& config,
56
                                 Server::Configuration::CommonFactoryContext& context,
57
                                 absl::Status& creation_status)
58
    : max_recv_message_length_(
59
1739
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.envoy_grpc(), max_receive_message_length, 0)),
60
1739
      skip_envoy_headers_(config.envoy_grpc().skip_envoy_headers()), cm_(context.clusterManager()),
61
1739
      remote_cluster_name_(config.envoy_grpc().cluster_name()),
62
1739
      host_name_(config.envoy_grpc().authority()), time_source_(context.timeSource()) {
63
1739
  auto parser_or_error = Router::HeaderParser::configure(
64
1739
      config.initial_metadata(),
65
1739
      envoy::config::core::v3::HeaderValueOption::OVERWRITE_IF_EXISTS_OR_ADD);
66
1739
  SET_AND_RETURN_IF_NOT_OK(parser_or_error.status(), creation_status);
67

            
68
1739
  if (config.has_retry_policy()) {
69
25
    auto route_policy = Http::Utility::convertCoreToRouteRetryPolicy(config.retry_policy(), "");
70
25
    auto policy_or_error = Router::RetryPolicyImpl::create(
71
25
        route_policy, ProtobufMessage::getNullValidationVisitor(), context);
72
25
    SET_AND_RETURN_IF_NOT_OK(policy_or_error.status(), creation_status);
73
25
    retry_policy_ = std::move(*policy_or_error);
74
25
  }
75

            
76
1739
  metadata_parser_ = std::move(*parser_or_error);
77
1739
}
78

            
79
1739
AsyncClientImpl::~AsyncClientImpl() {
80
1739
  ASSERT(isThreadSafe());
81
1778
  while (!active_streams_.empty()) {
82
39
    active_streams_.front()->resetStream();
83
39
  }
84
1739
}
85

            
86
AsyncRequest* AsyncClientImpl::sendRaw(absl::string_view service_full_name,
87
                                       absl::string_view method_name, Buffer::InstancePtr&& request,
88
                                       RawAsyncRequestCallbacks& callbacks,
89
                                       Tracing::Span& parent_span,
90
186
                                       const Http::AsyncClient::RequestOptions& options) {
91
186
  ASSERT(isThreadSafe());
92
186
  auto* const async_request = new AsyncRequestImpl(
93
186
      *this, service_full_name, method_name, std::move(request), callbacks, parent_span, options);
94
186
  AsyncStreamImplPtr grpc_stream{async_request};
95

            
96
186
  grpc_stream->initialize(true);
97
186
  if (grpc_stream->hasResetStream()) {
98
2
    return nullptr;
99
2
  }
100

            
101
184
  LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
102
184
  return async_request;
103
186
}
104

            
105
RawAsyncStream* AsyncClientImpl::startRaw(absl::string_view service_full_name,
106
                                          absl::string_view method_name,
107
                                          RawAsyncStreamCallbacks& callbacks,
108
2246
                                          const Http::AsyncClient::StreamOptions& options) {
109
2246
  ASSERT(isThreadSafe());
110
2246
  auto grpc_stream =
111
2246
      std::make_unique<AsyncStreamImpl>(*this, service_full_name, method_name, callbacks, options);
112

            
113
2246
  grpc_stream->initialize(options.buffer_body_for_retry);
114
2246
  if (grpc_stream->hasResetStream()) {
115
83
    return nullptr;
116
83
  }
117

            
118
2163
  LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
119
2163
  return active_streams_.front().get();
120
2246
}
121

            
122
AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
123
                                 absl::string_view method_name, RawAsyncStreamCallbacks& callbacks,
124
                                 const Http::AsyncClient::StreamOptions& options)
125
2432
    : parent_(parent), service_full_name_(service_full_name), method_name_(method_name),
126
2432
      callbacks_(callbacks), options_(options) {
127
  // Apply parent retry policy if no per-stream override.
128
2432
  if (!options.retry_policy.has_value() && options.parsed_retry_policy == nullptr &&
129
2432
      parent_.retryPolicy() != nullptr) {
130
20
    options_.setRetryPolicy(parent_.retryPolicy());
131
20
  }
132

            
133
  // Apply parent `skip_envoy_headers_` setting from configuration, if no per-stream
134
  // override. (i.e., no override of default stream option from true to false)
135
2432
  if (options.send_internal) {
136
2431
    options_.setSendInternal(!parent_.skip_envoy_headers_);
137
2431
  }
138
2432
  if (options.send_xff) {
139
2431
    options_.setSendXff(!parent_.skip_envoy_headers_);
140
2431
  }
141

            
142
  // Configure the maximum frame length
143
2432
  decoder_.setMaxFrameLength(parent_.max_recv_message_length_);
144

            
145
2432
  if (options.parent_span_ != nullptr) {
146
644
    const std::string child_span_name =
147
644
        options.child_span_name_.empty()
148
644
            ? absl::StrCat("async ", service_full_name, ".", method_name, " egress")
149
644
            : options.child_span_name_;
150

            
151
644
    current_span_ = options.parent_span_->spawnChild(Tracing::EgressConfig::get(), child_span_name,
152
644
                                                     parent.time_source_.systemTime());
153
644
    current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.remote_cluster_name_);
154
644
    current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.host_name_.empty()
155
644
                                                                    ? parent.remote_cluster_name_
156
644
                                                                    : parent.host_name_);
157
644
    current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
158
2430
  } else {
159
1788
    current_span_ = std::make_unique<Tracing::NullSpan>();
160
1788
  }
161

            
162
2432
  if (options.sampled_.has_value()) {
163
1790
    current_span_->setSampled(options.sampled_.value());
164
1790
  }
165
2432
}
166

            
167
2432
AsyncStreamImpl::~AsyncStreamImpl() {
168
2432
  if (options_.on_delete_callback_for_test_only) {
169
2
    options_.on_delete_callback_for_test_only();
170
2
  }
171
2432
}
172

            
173
2432
void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
174
2432
  const auto thread_local_cluster = parent_.cm_.getThreadLocalCluster(parent_.remote_cluster_name_);
175
2432
  if (thread_local_cluster == nullptr) {
176
70
    notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "Cluster not available");
177
70
    http_reset_ = true;
178
70
    return;
179
70
  }
180

            
181
2362
  cluster_info_ = thread_local_cluster->info();
182
2362
  auto& http_async_client = thread_local_cluster->httpAsyncClient();
183
2362
  dispatcher_ = &http_async_client.dispatcher();
184
2362
  stream_ = http_async_client.start(*this, options_.setBufferBodyForRetry(buffer_body_for_retry));
185
2362
  if (stream_ == nullptr) {
186
3
    notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, EMPTY_STRING);
187
3
    http_reset_ = true;
188
3
    return;
189
3
  }
190

            
191
2359
  if (options_.sidestream_watermark_callbacks != nullptr) {
192
660
    stream_->setWatermarkCallbacks(*options_.sidestream_watermark_callbacks);
193
660
  }
194

            
195
2359
  headers_message_ = Common::prepareHeaders(
196
2359
      parent_.host_name_.empty() ? parent_.remote_cluster_name_ : parent_.host_name_,
197
2359
      service_full_name_, method_name_, options_.timeout);
198
  // Fill service-wide initial metadata.
199
  // TODO(cpakulski): Find a better way to access requestHeaders
200
  // request headers should not be stored in stream_info.
201
  // Maybe put it to parent_context?
202
  // Since request headers may be empty, consider using Envoy::OptRef.
203
2359
  parent_.metadata_parser_->evaluateHeaders(headers_message_->headers(),
204
2359
                                            options_.parent_context.stream_info);
205

            
206
2359
  Tracing::HttpTraceContext trace_context(headers_message_->headers());
207
2359
  Tracing::UpstreamContext upstream_context(nullptr,                         // host_
208
2359
                                            cluster_info_.get(),             // cluster_
209
2359
                                            Tracing::ServiceType::EnvoyGrpc, // service_type_
210
2359
                                            true                             // async_client_span_
211
2359
  );
212
2359
  current_span_->injectContext(trace_context, upstream_context);
213
2359
  callbacks_.onCreateInitialMetadata(headers_message_->headers());
214
  // base64 encode on "-bin" metadata.
215
2359
  base64EscapeBinHeaders(headers_message_->headers());
216
2359
  stream_->sendHeaders(headers_message_->headers(), false);
217
2359
}
218

            
219
// TODO(htuch): match Google gRPC base64 encoding behavior for *-bin headers, see
220
// https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
221
// Pending on https://github.com/envoyproxy/envoy/issues/39054, we are not doing "-bin" decoding
222
// right now.
223
2336
void AsyncStreamImpl::onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
224
2336
  const auto http_response_status = Http::Utility::getResponseStatus(*headers);
225
2336
  const auto grpc_status = Common::getGrpcStatus(*headers);
226

            
227
2336
  if (http_response_status != enumToInt(Http::Code::OK)) {
228
    // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that
229
    // grpc-status be used if available.
230
15
    if (end_stream && grpc_status) {
231
      // Trailers-only response.
232
3
      callbacks_.onReceiveInitialMetadata(Http::ResponseHeaderMapImpl::create());
233
      // Due to headers/trailers type differences we need to copy here. This is an uncommon case but
234
      // we can potentially optimize in the future.
235
3
      onTrailers(Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*headers));
236
3
      return;
237
3
    }
238
12
    callbacks_.onReceiveInitialMetadata(Http::ResponseHeaderMapImpl::create());
239
    // Status is translated via Utility::httpToGrpcStatus per
240
    // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
241
12
    streamError(Utility::httpToGrpcStatus(http_response_status));
242
12
    return;
243
15
  }
244
2321
  if (end_stream) {
245
    // Trailers-only response.
246
427
    callbacks_.onReceiveInitialMetadata(Http::ResponseHeaderMapImpl::create());
247
    // Due to headers/trailers type differences we need to copy here. This is an uncommon case but
248
    // we can potentially optimize in the future.
249
427
    onTrailers(Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*headers));
250
427
    return;
251
427
  }
252
  // Normal response headers/Server initial metadata.
253
1894
  if (!waiting_to_delete_on_remote_close_) {
254
1894
    callbacks_.onReceiveInitialMetadata(end_stream ? Http::ResponseHeaderMapImpl::create()
255
1894
                                                   : std::move(headers));
256
1894
  }
257
1894
}
258

            
259
103266
void AsyncStreamImpl::onData(Buffer::Instance& data, bool end_stream) {
260
103266
  decoded_frames_.clear();
261
103266
  auto status = decoder_.decode(data, decoded_frames_);
262

            
263
  // decode() currently only returns two types of error:
264
  // - decoding error is mapped to ResourceExhausted
265
  // - over-limit error is mapped to Internal.
266
  // Other potential errors in the future are mapped to internal for now.
267
103266
  if (status.code() == absl::StatusCode::kResourceExhausted) {
268
2
    streamError(Status::WellKnownGrpcStatus::ResourceExhausted);
269
2
    return;
270
2
  }
271
103264
  if (status.code() != absl::StatusCode::kOk) {
272
1
    streamError(Status::WellKnownGrpcStatus::Internal);
273
1
    return;
274
1
  }
275

            
276
103263
  for (auto& frame : decoded_frames_) {
277
6338
    if (frame.length_ > 0 && frame.flags_ != GRPC_FH_DEFAULT) {
278
      streamError(Status::WellKnownGrpcStatus::Internal);
279
      return;
280
    }
281
6338
    if (!waiting_to_delete_on_remote_close_ &&
282
6338
        !callbacks_.onReceiveMessageRaw(frame.data_ ? std::move(frame.data_)
283
6338
                                                    : std::make_unique<Buffer::OwnedImpl>())) {
284
1
      streamError(Status::WellKnownGrpcStatus::Internal);
285
1
      return;
286
1
    }
287
6338
  }
288

            
289
103262
  if (end_stream) {
290
1
    streamError(Status::WellKnownGrpcStatus::Unknown);
291
1
  }
292
103262
}
293

            
294
// TODO(htuch): match Google gRPC base64 decoding behavior for *-bin headers, see
295
// https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
296
// Pending on https://github.com/envoyproxy/envoy/issues/39054, we are not doing "-bin" decoding
297
// right now.
298
784
void AsyncStreamImpl::onTrailers(Http::ResponseTrailerMapPtr&& trailers) {
299
784
  auto grpc_status = Common::getGrpcStatus(*trailers);
300
784
  const std::string grpc_message = Common::getGrpcMessage(*trailers);
301
784
  if (!waiting_to_delete_on_remote_close_) {
302
780
    callbacks_.onReceiveTrailingMetadata(std::move(trailers));
303
780
  }
304
784
  if (!grpc_status) {
305
2
    grpc_status = Status::WellKnownGrpcStatus::Unknown;
306
2
  }
307
784
  notifyRemoteClose(grpc_status.value(), grpc_message);
308
784
  cleanup();
309
784
}
310

            
311
1113
void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::string& message) {
312
1113
  if (!waiting_to_delete_on_remote_close_) {
313
1113
    callbacks_.onReceiveTrailingMetadata(Http::ResponseTrailerMapImpl::create());
314
1113
  }
315
1113
  notifyRemoteClose(grpc_status, message);
316
1113
  resetStream();
317
1113
}
318

            
319
void AsyncStreamImpl::notifyRemoteClose(Grpc::Status::GrpcStatus status,
320
1970
                                        const std::string& message) {
321
1970
  current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status));
322
1970
  if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
323
1565
    current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
324
1565
  }
325
1970
  current_span_->finishSpan();
326
1970
  if (!waiting_to_delete_on_remote_close_) {
327
1966
    callbacks_.onRemoteClose(status, enhancedGrpcMessage(message, stream_));
328
1966
  }
329
1970
}
330

            
331
4
void AsyncStreamImpl::onComplete() {
332
  // No-op since stream completion is handled within other callbacks.
333
4
}
334

            
335
2352
void AsyncStreamImpl::onReset() {
336
2352
  if (http_reset_) {
337
1256
    return;
338
1256
  }
339

            
340
1096
  http_reset_ = true;
341
1096
  streamError(Status::WellKnownGrpcStatus::Internal);
342
1096
}
343

            
344
69584
void AsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& buffer, bool end_stream) {
345
69584
  Common::prependGrpcFrameHeader(*buffer);
346
69584
  stream_->sendData(*buffer, end_stream);
347
69584
}
348

            
349
423
void AsyncStreamImpl::closeStream() {
350
423
  Buffer::OwnedImpl empty_buffer;
351
423
  stream_->sendData(empty_buffer, true);
352
423
  current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
353
423
  current_span_->finishSpan();
354
423
}
355

            
356
1573
void AsyncStreamImpl::resetStream() { cleanup(); }
357

            
358
2359
void AsyncStreamImpl::cleanup() {
359
  // Unsubscribe the side stream watermark callbacks, if hasn't done so.
360
2359
  if (options_.sidestream_watermark_callbacks != nullptr) {
361
244
    stream_->removeWatermarkCallbacks();
362
244
    options_.sidestream_watermark_callbacks = nullptr;
363
244
  }
364

            
365
  // Do not reset if the stream is being cleaning up after server has half-closed.
366
2359
  if (!http_reset_ && !waiting_to_delete_on_remote_close_) {
367
1259
    http_reset_ = true;
368
1259
    stream_->reset();
369
1259
  }
370

            
371
  // This will destroy us, but only do so if we are actually in a list. This does not happen in
372
  // the immediate failure case.
373
2359
  if (LinkedObject<AsyncStreamImpl>::inserted()) {
374
2347
    ASSERT(dispatcher_->isThreadSafe());
375
2347
    dispatcher_->deferredDelete(
376
2347
        LinkedObject<AsyncStreamImpl>::removeFromList(parent_.active_streams_));
377
2347
  }
378
2359
  remote_close_timer_ = nullptr;
379
2359
}
380

            
381
6
void AsyncStreamImpl::waitForRemoteCloseAndDelete() {
382
6
  if (!waiting_to_delete_on_remote_close_) {
383
6
    waiting_to_delete_on_remote_close_ = true;
384

            
385
6
    if (options_.sidestream_watermark_callbacks != nullptr) {
386
      stream_->removeWatermarkCallbacks();
387
      options_.sidestream_watermark_callbacks = nullptr;
388
    }
389
6
    remote_close_timer_ = dispatcher_->createTimer([this] {
390
2
      waiting_to_delete_on_remote_close_ = false;
391
2
      cleanup();
392
2
    });
393
6
    remote_close_timer_->enableTimer(options_.remote_close_timeout);
394
6
  }
395
6
}
396

            
397
AsyncRequestImpl::AsyncRequestImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
398
                                   absl::string_view method_name, Buffer::InstancePtr&& request,
399
                                   RawAsyncRequestCallbacks& callbacks, Tracing::Span& parent_span,
400
                                   const Http::AsyncClient::RequestOptions& options)
401
186
    : AsyncStreamImpl(parent, service_full_name, method_name, *this, options),
402
186
      request_(std::move(request)), callbacks_(callbacks) {
403

            
404
186
  current_span_ =
405
186
      parent_span.spawnChild(Tracing::EgressConfig::get(),
406
186
                             absl::StrCat("async ", service_full_name, ".", method_name, " egress"),
407
186
                             parent.time_source_.systemTime());
408
186
  current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.remote_cluster_name_);
409
186
  current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.host_name_.empty()
410
186
                                                                  ? parent.remote_cluster_name_
411
186
                                                                  : parent.host_name_);
412
186
  current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
413
186
}
414

            
415
186
void AsyncRequestImpl::initialize(bool buffer_body_for_retry) {
416
186
  AsyncStreamImpl::initialize(buffer_body_for_retry);
417
186
  if (this->hasResetStream()) {
418
2
    return;
419
2
  }
420
184
  this->sendMessageRaw(std::move(request_), true);
421
184
}
422

            
423
1
void AsyncRequestImpl::cancel() {
424
1
  current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
425
1
  current_span_->finishSpan();
426
1
  this->resetStream();
427
1
}
428

            
429
59
const StreamInfo::StreamInfo& AsyncRequestImpl::streamInfo() const {
430
59
  return AsyncStreamImpl::streamInfo();
431
59
}
432

            
433
1
void AsyncRequestImpl::detach() {
434
  // TODO(wbpcode): In most tracers the span will hold a reference to the tracer self
435
  // and it's possible that become a dangling reference for long time async request.
436
  // This require further PR to resolve.
437

            
438
1
  if (options_.sidestream_watermark_callbacks != nullptr) {
439
1
    stream_->removeWatermarkCallbacks();
440
1
    options_.sidestream_watermark_callbacks = nullptr;
441
1
  }
442
1
  options_.parent_span_ = nullptr;
443
1
  options_.parent_context.stream_info = nullptr;
444

            
445
1
  streamInfo().clearParentStreamInfo();
446
1
}
447

            
448
185
void AsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
449
185
  Tracing::HttpTraceContext trace_context(metadata);
450
185
  Tracing::UpstreamContext upstream_context(nullptr,                         // host_
451
185
                                            cluster_info_.get(),             // cluster_
452
185
                                            Tracing::ServiceType::EnvoyGrpc, // service_type_
453
185
                                            true                             // async_client_span_
454
185
  );
455
185
  current_span_->injectContext(trace_context, upstream_context);
456
185
  callbacks_.onCreateInitialMetadata(metadata);
457
185
}
458

            
459
182
void AsyncRequestImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}
460

            
461
161
bool AsyncRequestImpl::onReceiveMessageRaw(Buffer::InstancePtr&& response) {
462
161
  response_ = std::move(response);
463
161
  return true;
464
161
}
465

            
466
184
void AsyncRequestImpl::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) {}
467

            
468
185
void AsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
469
185
  current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status));
470

            
471
185
  if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
472
23
    current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
473
23
    callbacks_.onFailure(status, message, *current_span_);
474
165
  } else if (response_ == nullptr) {
475
1
    current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
476
1
    callbacks_.onFailure(Status::Internal, EMPTY_STRING, *current_span_);
477
161
  } else {
478
161
    callbacks_.onSuccessRaw(std::move(response_), *current_span_);
479
161
  }
480

            
481
185
  current_span_->finishSpan();
482
185
}
483

            
484
} // namespace Grpc
485
} // namespace Envoy