1
#include "source/common/grpc/async_client_impl.h"
2

            
3
#include "envoy/config/core/v3/grpc_service.pb.h"
4

            
5
#include "source/common/buffer/zero_copy_input_stream_impl.h"
6
#include "source/common/common/enum_to_int.h"
7
#include "source/common/common/utility.h"
8
#include "source/common/grpc/common.h"
9
#include "source/common/http/header_map_impl.h"
10
#include "source/common/http/utility.h"
11

            
12
#include "absl/strings/str_cat.h"
13

            
14
namespace Envoy {
15
namespace Grpc {
16
namespace {
17
std::string enhancedGrpcMessage(const std::string& original_message,
18
2024
                                const Http::AsyncClient::Stream* stream) {
19
2024
  const auto& http_response_code_details = (stream && stream->streamInfo().responseCodeDetails())
20
2024
                                               ? *stream->streamInfo().responseCodeDetails()
21
2024
                                               : EMPTY_STRING;
22
2024
  return original_message.empty() ? http_response_code_details
23
2024
         : http_response_code_details.empty()
24
402
             ? original_message
25
402
             : absl::StrCat(original_message, "{", http_response_code_details, "}");
26
2024
}
27

            
28
2414
void base64EscapeBinHeaders(Http::RequestHeaderMap& headers) {
29
2414
  absl::flat_hash_map<absl::string_view, std::string> bin_metadata;
30
12265
  headers.iterate([&bin_metadata](const Http::HeaderEntry& header) {
31
12265
    if (absl::EndsWith(header.key().getStringView(), "-bin")) {
32
4
      bin_metadata.emplace(header.key().getStringView(),
33
4
                           absl::Base64Escape(header.value().getStringView()));
34
4
    }
35
12265
    return Http::HeaderMap::Iterate::Continue;
36
12265
  });
37
2414
  for (const auto& [key, value] : bin_metadata) {
38
4
    Http::LowerCaseString key_string(key);
39
4
    headers.remove(key_string);
40
4
    headers.addCopy(key_string, value);
41
4
  }
42
2414
}
43
} // namespace
44

            
45
absl::StatusOr<std::unique_ptr<AsyncClientImpl>>
46
AsyncClientImpl::create(const envoy::config::core::v3::GrpcService& config,
47
1777
                        Server::Configuration::CommonFactoryContext& context) {
48
1777
  absl::Status creation_status = absl::OkStatus();
49
1777
  auto ret =
50
1777
      std::unique_ptr<AsyncClientImpl>(new AsyncClientImpl(config, context, creation_status));
51
1777
  RETURN_IF_NOT_OK(creation_status);
52
1777
  return ret;
53
1777
}
54

            
55
AsyncClientImpl::AsyncClientImpl(const envoy::config::core::v3::GrpcService& config,
56
                                 Server::Configuration::CommonFactoryContext& context,
57
                                 absl::Status& creation_status)
58
    : max_recv_message_length_(
59
1777
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.envoy_grpc(), max_receive_message_length, 0)),
60
1777
      skip_envoy_headers_(config.envoy_grpc().skip_envoy_headers()), cm_(context.clusterManager()),
61
1777
      remote_cluster_name_(config.envoy_grpc().cluster_name()),
62
1777
      host_name_(config.envoy_grpc().authority()), time_source_(context.timeSource()) {
63
1777
  auto parser_or_error = Router::HeaderParser::configure(
64
1777
      config.initial_metadata(),
65
1777
      envoy::config::core::v3::HeaderValueOption::OVERWRITE_IF_EXISTS_OR_ADD);
66
1777
  SET_AND_RETURN_IF_NOT_OK(parser_or_error.status(), creation_status);
67

            
68
1777
  if (config.has_retry_policy()) {
69
25
    auto route_policy = Http::Utility::convertCoreToRouteRetryPolicy(config.retry_policy(), "");
70
25
    auto policy_or_error = Router::RetryPolicyImpl::create(
71
25
        route_policy, ProtobufMessage::getNullValidationVisitor(), context);
72
25
    SET_AND_RETURN_IF_NOT_OK(policy_or_error.status(), creation_status);
73
25
    retry_policy_ = std::move(*policy_or_error);
74
25
  }
75

            
76
1777
  metadata_parser_ = std::move(*parser_or_error);
77
1777
}
78

            
79
1777
AsyncClientImpl::~AsyncClientImpl() {
80
1777
  ASSERT(isThreadSafe());
81
1814
  while (!active_streams_.empty()) {
82
37
    active_streams_.front()->resetStream();
83
37
  }
84
1777
}
85

            
86
AsyncRequest* AsyncClientImpl::sendRaw(absl::string_view service_full_name,
87
                                       absl::string_view method_name, Buffer::InstancePtr&& request,
88
                                       RawAsyncRequestCallbacks& callbacks,
89
                                       Tracing::Span& parent_span,
90
186
                                       const Http::AsyncClient::RequestOptions& options) {
91
186
  ASSERT(isThreadSafe());
92
186
  auto* const async_request = new AsyncRequestImpl(
93
186
      *this, service_full_name, method_name, std::move(request), callbacks, parent_span, options);
94
186
  AsyncStreamImplPtr grpc_stream{async_request};
95

            
96
186
  grpc_stream->initialize(true);
97
186
  if (grpc_stream->hasResetStream()) {
98
2
    return nullptr;
99
2
  }
100

            
101
184
  LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
102
184
  return async_request;
103
186
}
104

            
105
RawAsyncStream* AsyncClientImpl::startRaw(absl::string_view service_full_name,
106
                                          absl::string_view method_name,
107
                                          RawAsyncStreamCallbacks& callbacks,
108
2297
                                          const Http::AsyncClient::StreamOptions& options) {
109
2297
  ASSERT(isThreadSafe());
110
2297
  auto grpc_stream =
111
2297
      std::make_unique<AsyncStreamImpl>(*this, service_full_name, method_name, callbacks, options);
112

            
113
2297
  grpc_stream->initialize(options.buffer_body_for_retry);
114
2297
  if (grpc_stream->hasResetStream()) {
115
79
    return nullptr;
116
79
  }
117

            
118
2218
  LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
119
2218
  return active_streams_.front().get();
120
2297
}
121

            
122
AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
123
                                 absl::string_view method_name, RawAsyncStreamCallbacks& callbacks,
124
                                 const Http::AsyncClient::StreamOptions& options)
125
2483
    : parent_(parent), service_full_name_(service_full_name), method_name_(method_name),
126
2483
      callbacks_(callbacks), options_(options) {
127
  // Apply parent retry policy if no per-stream override.
128
2483
  if (!options.retry_policy.has_value() && options.parsed_retry_policy == nullptr &&
129
2483
      parent_.retryPolicy() != nullptr) {
130
20
    options_.setRetryPolicy(parent_.retryPolicy());
131
20
  }
132

            
133
  // Apply parent `skip_envoy_headers_` setting from configuration, if no per-stream
134
  // override. (i.e., no override of default stream option from true to false)
135
2483
  if (options.send_internal) {
136
2482
    options_.setSendInternal(!parent_.skip_envoy_headers_);
137
2482
  }
138
2483
  if (options.send_xff) {
139
2482
    options_.setSendXff(!parent_.skip_envoy_headers_);
140
2482
  }
141

            
142
  // Configure the maximum frame length
143
2483
  decoder_.setMaxFrameLength(parent_.max_recv_message_length_);
144

            
145
2483
  if (options.parent_span_ != nullptr) {
146
644
    const std::string child_span_name =
147
644
        options.child_span_name_.empty()
148
644
            ? absl::StrCat("async ", service_full_name, ".", method_name, " egress")
149
644
            : options.child_span_name_;
150

            
151
644
    current_span_ = options.parent_span_->spawnChild(Tracing::EgressConfig::get(), child_span_name,
152
644
                                                     parent.time_source_.systemTime());
153
644
    current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.remote_cluster_name_);
154
644
    current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.host_name_.empty()
155
644
                                                                    ? parent.remote_cluster_name_
156
644
                                                                    : parent.host_name_);
157
644
    current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
158
2481
  } else {
159
1839
    current_span_ = std::make_unique<Tracing::NullSpan>();
160
1839
  }
161

            
162
2483
  if (options.sampled_.has_value()) {
163
1841
    current_span_->setSampled(options.sampled_.value());
164
1841
  }
165
2483
}
166

            
167
2483
AsyncStreamImpl::~AsyncStreamImpl() {
168
2483
  if (options_.on_delete_callback_for_test_only) {
169
2
    options_.on_delete_callback_for_test_only();
170
2
  }
171
2483
}
172

            
173
2483
void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
174
2483
  const auto thread_local_cluster = parent_.cm_.getThreadLocalCluster(parent_.remote_cluster_name_);
175
2483
  if (thread_local_cluster == nullptr) {
176
66
    notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "Cluster not available");
177
66
    http_reset_ = true;
178
66
    return;
179
66
  }
180

            
181
2417
  cluster_info_ = thread_local_cluster->info();
182
2417
  auto& http_async_client = thread_local_cluster->httpAsyncClient();
183
2417
  dispatcher_ = &http_async_client.dispatcher();
184
2417
  stream_ = http_async_client.start(*this, options_.setBufferBodyForRetry(buffer_body_for_retry));
185
2417
  if (stream_ == nullptr) {
186
3
    notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, EMPTY_STRING);
187
3
    http_reset_ = true;
188
3
    return;
189
3
  }
190

            
191
2414
  if (options_.sidestream_watermark_callbacks != nullptr) {
192
660
    stream_->setWatermarkCallbacks(*options_.sidestream_watermark_callbacks);
193
660
  }
194

            
195
2414
  headers_message_ = Common::prepareHeaders(
196
2414
      parent_.host_name_.empty() ? parent_.remote_cluster_name_ : parent_.host_name_,
197
2414
      service_full_name_, method_name_, options_.timeout);
198
  // Fill service-wide initial metadata.
199
  // TODO(cpakulski): Find a better way to access requestHeaders
200
  // request headers should not be stored in stream_info.
201
  // Maybe put it to parent_context?
202
  // Since request headers may be empty, consider using Envoy::OptRef.
203
2414
  parent_.metadata_parser_->evaluateHeaders(headers_message_->headers(),
204
2414
                                            options_.parent_context.stream_info);
205

            
206
2414
  Tracing::HttpTraceContext trace_context(headers_message_->headers());
207
2414
  Tracing::UpstreamContext upstream_context(nullptr,                         // host_
208
2414
                                            cluster_info_.get(),             // cluster_
209
2414
                                            Tracing::ServiceType::EnvoyGrpc, // service_type_
210
2414
                                            true                             // async_client_span_
211
2414
  );
212
2414
  current_span_->injectContext(trace_context, upstream_context);
213
2414
  callbacks_.onCreateInitialMetadata(headers_message_->headers());
214
  // base64 encode on "-bin" metadata.
215
2414
  base64EscapeBinHeaders(headers_message_->headers());
216
2414
  stream_->sendHeaders(headers_message_->headers(), false);
217
2414
}
218

            
219
// TODO(htuch): match Google gRPC base64 encoding behavior for *-bin headers, see
220
// https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
221
// Pending on https://github.com/envoyproxy/envoy/issues/39054, we are not doing "-bin" decoding
222
// right now.
223
2393
void AsyncStreamImpl::onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
224
2393
  const auto http_response_status = Http::Utility::getResponseStatus(*headers);
225
2393
  const auto grpc_status = Common::getGrpcStatus(*headers);
226

            
227
2393
  if (http_response_status != enumToInt(Http::Code::OK)) {
228
    // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that
229
    // grpc-status be used if available.
230
15
    if (end_stream && grpc_status) {
231
      // Trailers-only response.
232
3
      callbacks_.onReceiveInitialMetadata(Http::ResponseHeaderMapImpl::create());
233
      // Due to headers/trailers type differences we need to copy here. This is an uncommon case but
234
      // we can potentially optimize in the future.
235
3
      onTrailers(Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*headers));
236
3
      return;
237
3
    }
238
12
    callbacks_.onReceiveInitialMetadata(Http::ResponseHeaderMapImpl::create());
239
    // Status is translated via Utility::httpToGrpcStatus per
240
    // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
241
12
    streamError(Utility::httpToGrpcStatus(http_response_status));
242
12
    return;
243
15
  }
244
2378
  if (end_stream) {
245
    // Trailers-only response.
246
437
    callbacks_.onReceiveInitialMetadata(Http::ResponseHeaderMapImpl::create());
247
    // Due to headers/trailers type differences we need to copy here. This is an uncommon case but
248
    // we can potentially optimize in the future.
249
437
    onTrailers(Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*headers));
250
437
    return;
251
437
  }
252
  // Normal response headers/Server initial metadata.
253
1941
  if (!waiting_to_delete_on_remote_close_) {
254
1941
    callbacks_.onReceiveInitialMetadata(end_stream ? Http::ResponseHeaderMapImpl::create()
255
1941
                                                   : std::move(headers));
256
1941
  }
257
1941
}
258

            
259
103287
void AsyncStreamImpl::onData(Buffer::Instance& data, bool end_stream) {
260
103287
  decoded_frames_.clear();
261
103287
  auto status = decoder_.decode(data, decoded_frames_);
262

            
263
  // decode() currently only returns two types of error:
264
  // - decoding error is mapped to ResourceExhausted
265
  // - over-limit error is mapped to Internal.
266
  // Other potential errors in the future are mapped to internal for now.
267
103287
  if (status.code() == absl::StatusCode::kResourceExhausted) {
268
2
    streamError(Status::WellKnownGrpcStatus::ResourceExhausted);
269
2
    return;
270
2
  }
271
103285
  if (status.code() != absl::StatusCode::kOk) {
272
1
    streamError(Status::WellKnownGrpcStatus::Internal);
273
1
    return;
274
1
  }
275

            
276
103284
  for (auto& frame : decoded_frames_) {
277
6424
    if (frame.length_ > 0 && frame.flags_ != GRPC_FH_DEFAULT) {
278
      streamError(Status::WellKnownGrpcStatus::Internal);
279
      return;
280
    }
281
6424
    if (!waiting_to_delete_on_remote_close_ &&
282
6424
        !callbacks_.onReceiveMessageRaw(frame.data_ ? std::move(frame.data_)
283
6424
                                                    : std::make_unique<Buffer::OwnedImpl>())) {
284
1
      streamError(Status::WellKnownGrpcStatus::Internal);
285
1
      return;
286
1
    }
287
6424
  }
288

            
289
103283
  if (end_stream) {
290
1
    streamError(Status::WellKnownGrpcStatus::Unknown);
291
1
  }
292
103283
}
293

            
294
// TODO(htuch): match Google gRPC base64 decoding behavior for *-bin headers, see
295
// https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
296
// Pending on https://github.com/envoyproxy/envoy/issues/39054, we are not doing "-bin" decoding
297
// right now.
298
796
void AsyncStreamImpl::onTrailers(Http::ResponseTrailerMapPtr&& trailers) {
299
796
  auto grpc_status = Common::getGrpcStatus(*trailers);
300
796
  const std::string grpc_message = Common::getGrpcMessage(*trailers);
301
796
  if (!waiting_to_delete_on_remote_close_) {
302
792
    callbacks_.onReceiveTrailingMetadata(std::move(trailers));
303
792
  }
304
796
  if (!grpc_status) {
305
2
    grpc_status = Status::WellKnownGrpcStatus::Unknown;
306
2
  }
307
796
  notifyRemoteClose(grpc_status.value(), grpc_message);
308
796
  cleanup();
309
796
}
310

            
311
1163
void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::string& message) {
312
1163
  if (!waiting_to_delete_on_remote_close_) {
313
1163
    callbacks_.onReceiveTrailingMetadata(Http::ResponseTrailerMapImpl::create());
314
1163
  }
315
1163
  notifyRemoteClose(grpc_status, message);
316
1163
  resetStream();
317
1163
}
318

            
319
void AsyncStreamImpl::notifyRemoteClose(Grpc::Status::GrpcStatus status,
320
2028
                                        const std::string& message) {
321
2028
  current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status));
322
2028
  if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
323
1623
    current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
324
1623
  }
325
2028
  current_span_->finishSpan();
326
2028
  if (!waiting_to_delete_on_remote_close_) {
327
2024
    callbacks_.onRemoteClose(status, enhancedGrpcMessage(message, stream_));
328
2024
  }
329
2028
}
330

            
331
4
void AsyncStreamImpl::onComplete() {
332
  // No-op since stream completion is handled within other callbacks.
333
4
}
334

            
335
2407
void AsyncStreamImpl::onReset() {
336
2407
  if (http_reset_) {
337
1261
    return;
338
1261
  }
339

            
340
1146
  http_reset_ = true;
341
1146
  streamError(Status::WellKnownGrpcStatus::Internal);
342
1146
}
343

            
344
69728
void AsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& buffer, bool end_stream) {
345
69728
  Common::prependGrpcFrameHeader(*buffer);
346
69728
  stream_->sendData(*buffer, end_stream);
347
69728
}
348

            
349
420
void AsyncStreamImpl::closeStream() {
350
420
  Buffer::OwnedImpl empty_buffer;
351
420
  stream_->sendData(empty_buffer, true);
352
420
  current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
353
420
  current_span_->finishSpan();
354
420
}
355

            
356
1616
void AsyncStreamImpl::resetStream() { cleanup(); }
357

            
358
2414
void AsyncStreamImpl::cleanup() {
359
  // Unsubscribe the side stream watermark callbacks, if hasn't done so.
360
2414
  if (options_.sidestream_watermark_callbacks != nullptr) {
361
244
    stream_->removeWatermarkCallbacks();
362
244
    options_.sidestream_watermark_callbacks = nullptr;
363
244
  }
364

            
365
  // Do not reset if the stream is being cleaning up after server has half-closed.
366
2414
  if (!http_reset_ && !waiting_to_delete_on_remote_close_) {
367
1264
    http_reset_ = true;
368
1264
    stream_->reset();
369
1264
  }
370

            
371
  // This will destroy us, but only do so if we are actually in a list. This does not happen in
372
  // the immediate failure case.
373
2414
  if (LinkedObject<AsyncStreamImpl>::inserted()) {
374
2402
    ASSERT(dispatcher_->isThreadSafe());
375
2402
    dispatcher_->deferredDelete(
376
2402
        LinkedObject<AsyncStreamImpl>::removeFromList(parent_.active_streams_));
377
2402
  }
378
2414
  remote_close_timer_ = nullptr;
379
2414
}
380

            
381
6
void AsyncStreamImpl::waitForRemoteCloseAndDelete() {
382
6
  if (!waiting_to_delete_on_remote_close_) {
383
6
    waiting_to_delete_on_remote_close_ = true;
384

            
385
6
    if (options_.sidestream_watermark_callbacks != nullptr) {
386
      stream_->removeWatermarkCallbacks();
387
      options_.sidestream_watermark_callbacks = nullptr;
388
    }
389
6
    remote_close_timer_ = dispatcher_->createTimer([this] {
390
2
      waiting_to_delete_on_remote_close_ = false;
391
2
      cleanup();
392
2
    });
393
6
    remote_close_timer_->enableTimer(options_.remote_close_timeout);
394
6
  }
395
6
}
396

            
397
AsyncRequestImpl::AsyncRequestImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
398
                                   absl::string_view method_name, Buffer::InstancePtr&& request,
399
                                   RawAsyncRequestCallbacks& callbacks, Tracing::Span& parent_span,
400
                                   const Http::AsyncClient::RequestOptions& options)
401
186
    : AsyncStreamImpl(parent, service_full_name, method_name, *this, options),
402
186
      request_(std::move(request)), callbacks_(callbacks) {
403

            
404
186
  current_span_ =
405
186
      parent_span.spawnChild(Tracing::EgressConfig::get(),
406
186
                             absl::StrCat("async ", service_full_name, ".", method_name, " egress"),
407
186
                             parent.time_source_.systemTime());
408
186
  current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.remote_cluster_name_);
409
186
  current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.host_name_.empty()
410
186
                                                                  ? parent.remote_cluster_name_
411
186
                                                                  : parent.host_name_);
412
186
  current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
413
186
}
414

            
415
186
void AsyncRequestImpl::initialize(bool buffer_body_for_retry) {
416
186
  AsyncStreamImpl::initialize(buffer_body_for_retry);
417
186
  if (this->hasResetStream()) {
418
2
    return;
419
2
  }
420
184
  this->sendMessageRaw(std::move(request_), true);
421
184
}
422

            
423
1
void AsyncRequestImpl::cancel() {
424
1
  current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
425
1
  current_span_->finishSpan();
426
1
  this->resetStream();
427
1
}
428

            
429
59
const StreamInfo::StreamInfo& AsyncRequestImpl::streamInfo() const {
430
59
  return AsyncStreamImpl::streamInfo();
431
59
}
432

            
433
1
void AsyncRequestImpl::detach() {
434
  // TODO(wbpcode): In most tracers the span will hold a reference to the tracer self
435
  // and it's possible that become a dangling reference for long time async request.
436
  // This require further PR to resolve.
437

            
438
1
  if (options_.sidestream_watermark_callbacks != nullptr) {
439
1
    stream_->removeWatermarkCallbacks();
440
1
    options_.sidestream_watermark_callbacks = nullptr;
441
1
  }
442
1
  options_.parent_span_ = nullptr;
443
1
  options_.parent_context.stream_info = nullptr;
444

            
445
1
  streamInfo().clearParentStreamInfo();
446
1
}
447

            
448
185
void AsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
449
185
  Tracing::HttpTraceContext trace_context(metadata);
450
185
  Tracing::UpstreamContext upstream_context(nullptr,                         // host_
451
185
                                            cluster_info_.get(),             // cluster_
452
185
                                            Tracing::ServiceType::EnvoyGrpc, // service_type_
453
185
                                            true                             // async_client_span_
454
185
  );
455
185
  current_span_->injectContext(trace_context, upstream_context);
456
185
  callbacks_.onCreateInitialMetadata(metadata);
457
185
}
458

            
459
182
void AsyncRequestImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}
460

            
461
161
bool AsyncRequestImpl::onReceiveMessageRaw(Buffer::InstancePtr&& response) {
462
161
  response_ = std::move(response);
463
161
  return true;
464
161
}
465

            
466
184
void AsyncRequestImpl::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) {}
467

            
468
185
void AsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
469
185
  current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status));
470

            
471
185
  if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
472
23
    current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
473
23
    callbacks_.onFailure(status, message, *current_span_);
474
165
  } else if (response_ == nullptr) {
475
1
    current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
476
1
    callbacks_.onFailure(Status::Internal, EMPTY_STRING, *current_span_);
477
161
  } else {
478
161
    callbacks_.onSuccessRaw(std::move(response_), *current_span_);
479
161
  }
480

            
481
185
  current_span_->finishSpan();
482
185
}
483

            
484
} // namespace Grpc
485
} // namespace Envoy