1
#include "source/common/grpc/google_async_client_impl.h"
2

            
3
#include "envoy/common/time.h"
4
#include "envoy/config/core/v3/grpc_service.pb.h"
5
#include "envoy/http/protocol.h"
6
#include "envoy/stats/scope.h"
7

            
8
#include "source/common/common/base64.h"
9
#include "source/common/common/empty_string.h"
10
#include "source/common/common/lock_guard.h"
11
#include "source/common/common/utility.h"
12
#include "source/common/config/datasource.h"
13
#include "source/common/grpc/common.h"
14
#include "source/common/grpc/google_grpc_creds_impl.h"
15
#include "source/common/grpc/google_grpc_utils.h"
16
#include "source/common/router/header_parser.h"
17
#include "source/common/tracing/http_tracer_impl.h"
18

            
19
#include "absl/strings/str_cat.h"
20
#include "grpcpp/support/proto_buffer_reader.h"
21

            
22
namespace Envoy {
23
namespace Grpc {
24
namespace {
25
static constexpr int DefaultBufferLimitBytes = 1024 * 1024;
26
}
27

            
28
GoogleAsyncClientThreadLocal::GoogleAsyncClientThreadLocal(Api::Api& api)
29
21892
    : completion_thread_(api.threadFactory().createThread([this] { completionThread(); },
30
21892
                                                          Thread::Options{"GrpcGoogClient"})) {}
31

            
32
21892
GoogleAsyncClientThreadLocal::~GoogleAsyncClientThreadLocal() {
33
  // Force streams to shutdown and invoke TryCancel() to start the drain of
34
  // pending op. If we don't do this, Shutdown() below can jam on pending ops.
35
  // This is also required to satisfy the contract that once Shutdown is called,
36
  // streams no longer queue any additional tags.
37
21911
  for (auto it = streams_.begin(); it != streams_.end();) {
38
    // resetStream() may result in immediate unregisterStream() and erase(),
39
    // which would invalidate the iterator for the current element, so make sure
40
    // we point to the next one first.
41
19
    (*it++)->resetStream();
42
19
  }
43
21892
  cq_.Shutdown();
44
21892
  ENVOY_LOG(debug, "Joining completionThread");
45
21892
  completion_thread_->join();
46
21892
  ENVOY_LOG(debug, "Joined completionThread");
47
  // Ensure that we have cleaned up all orphan streams, now that CQ is gone.
48
21911
  while (!streams_.empty()) {
49
19
    (*streams_.begin())->onCompletedOps();
50
19
  }
51
21892
}
52

            
53
21892
void GoogleAsyncClientThreadLocal::completionThread() {
54
21892
  ENVOY_LOG(debug, "completionThread running");
55
21892
  void* tag;
56
21892
  bool ok;
57
95708
  while (cq_.Next(&tag, &ok)) {
58
73816
    const auto& google_async_tag = *reinterpret_cast<GoogleAsyncTag*>(tag);
59
73816
    const GoogleAsyncTag::Operation op = google_async_tag.op_;
60
73816
    GoogleAsyncStreamImpl& stream = google_async_tag.stream_;
61
73816
    ENVOY_LOG(trace, "completionThread CQ event {} {}", static_cast<int>(op), ok);
62
73816
    Thread::LockGuard lock(stream.completed_ops_lock_);
63

            
64
    // It's an invariant that there must only be one pending post for arbitrary
65
    // length completed_ops_, otherwise we can race in stream destruction, where
66
    // we process multiple events in onCompletedOps() but have only partially
67
    // consumed the posts on the dispatcher.
68
    // TODO(htuch): This may result in unbounded processing on the silo thread
69
    // in onCompletedOps() in extreme cases, when we emplace_back() in
70
    // completionThread() at a high rate, consider bounding the length of such
71
    // sequences if this behavior becomes an issue.
72
73816
    if (stream.completed_ops_.empty()) {
73
72657
      stream.dispatcher_.post([&stream] { stream.onCompletedOps(); });
74
72657
    }
75
73816
    stream.completed_ops_.emplace_back(op, ok);
76
73816
  }
77
21892
  ENVOY_LOG(debug, "completionThread exiting");
78
21892
}
79

            
80
GoogleAsyncClientImpl::GoogleAsyncClientImpl(Event::Dispatcher& dispatcher,
81
                                             GoogleAsyncClientThreadLocal& tls,
82
                                             GoogleStubFactory& stub_factory,
83
                                             Stats::ScopeSharedPtr scope,
84
                                             const envoy::config::core::v3::GrpcService& config,
85
                                             Server::Configuration::CommonFactoryContext& context,
86
                                             const StatNames& stat_names)
87
1089
    : dispatcher_(dispatcher), tls_(tls), stat_prefix_(config.google_grpc().stat_prefix()),
88
1089
      target_uri_(config.google_grpc().target_uri()), scope_(scope),
89
1089
      per_stream_buffer_limit_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
90
          config.google_grpc(), per_stream_buffer_limit_bytes, DefaultBufferLimitBytes)),
91
1089
      metadata_parser_(THROW_OR_RETURN_VALUE(
92
          Router::HeaderParser::configure(
93
              config.initial_metadata(),
94
              envoy::config::core::v3::HeaderValueOption::OVERWRITE_IF_EXISTS_OR_ADD),
95
1089
          Router::HeaderParserPtr)) {
96
  // We rebuild the channel each time we construct the channel. It appears that the gRPC library is
97
  // smart enough to do connection pooling and reuse with identical channel args, so this should
98
  // have comparable overhead to what we are doing in Grpc::AsyncClientImpl, i.e. no expensive
99
  // new connection implied.
100
1089
  std::shared_ptr<grpc::Channel> channel = GoogleGrpcUtils::createChannel(config, context);
101
  // Get state with try_to_connect = true to try connection at channel creation.
102
  // This is for initializing gRPC channel at channel creation. This GetState(true) is used to poke
103
  // the gRPC lb at channel creation, it doesn't have any effect no matter it succeeds or fails. But
104
  // it helps on initialization. Otherwise, the channel establishment still happens at the first
105
  // request, no matter when we create the channel.
106
1089
  channel->GetState(true);
107
1089
  stub_ = stub_factory.createStub(channel);
108
1089
  scope_->counterFromStatName(stat_names.google_grpc_client_creation_).inc();
109
  // Initialize client stats.
110
  // TODO(jmarantz): Capture these names in async_client_manager_impl.cc and
111
  // pass in a struct of StatName objects so we don't have to take locks here.
112
1089
  stats_.streams_total_ = &scope_->counterFromStatName(stat_names.streams_total_);
113
19585
  for (uint32_t i = 0; i <= Status::WellKnownGrpcStatus::MaximumKnown; ++i) {
114
18496
    stats_.streams_closed_[i] = &scope_->counterFromStatName(stat_names.streams_closed_[i]);
115
18496
  }
116
1089
}
117

            
118
1088
GoogleAsyncClientImpl::~GoogleAsyncClientImpl() {
119
1088
  ASSERT(isThreadSafe());
120
1088
  ENVOY_LOG(debug, "Client teardown, resetting streams");
121
1101
  while (!active_streams_.empty()) {
122
13
    active_streams_.front()->resetStream();
123
13
  }
124
1088
}
125

            
126
AsyncRequest* GoogleAsyncClientImpl::sendRaw(absl::string_view service_full_name,
127
                                             absl::string_view method_name,
128
                                             Buffer::InstancePtr&& request,
129
                                             RawAsyncRequestCallbacks& callbacks,
130
                                             Tracing::Span& parent_span,
131
176
                                             const Http::AsyncClient::RequestOptions& options) {
132
176
  ASSERT(isThreadSafe());
133
176
  auto* const async_request = new GoogleAsyncRequestImpl(
134
176
      *this, service_full_name, method_name, std::move(request), callbacks, parent_span, options);
135
176
  GoogleAsyncStreamImplPtr grpc_stream{async_request};
136

            
137
176
  grpc_stream->initialize(true);
138
176
  if (grpc_stream->callFailed()) {
139
1
    return nullptr;
140
1
  }
141

            
142
175
  LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
143
175
  return async_request;
144
176
}
145

            
146
RawAsyncStream* GoogleAsyncClientImpl::startRaw(absl::string_view service_full_name,
147
                                                absl::string_view method_name,
148
                                                RawAsyncStreamCallbacks& callbacks,
149
1062
                                                const Http::AsyncClient::StreamOptions& options) {
150
1062
  ASSERT(isThreadSafe());
151
1062
  auto grpc_stream = std::make_unique<GoogleAsyncStreamImpl>(*this, service_full_name, method_name,
152
1062
                                                             callbacks, options);
153

            
154
1062
  grpc_stream->initialize(false);
155
1062
  if (grpc_stream->callFailed()) {
156
3
    return nullptr;
157
3
  }
158

            
159
1059
  LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
160
1059
  return active_streams_.front().get();
161
1062
}
162

            
163
GoogleAsyncStreamImpl::GoogleAsyncStreamImpl(GoogleAsyncClientImpl& parent,
164
                                             absl::string_view service_full_name,
165
                                             absl::string_view method_name,
166
                                             RawAsyncStreamCallbacks& callbacks,
167
                                             const Http::AsyncClient::StreamOptions& options)
168
1238
    : parent_(parent), tls_(parent_.tls_), dispatcher_(parent_.dispatcher_), stub_(parent_.stub_),
169
1238
      service_full_name_(service_full_name), method_name_(method_name), callbacks_(callbacks),
170
1238
      options_(options), unused_stream_info_(Http::Protocol::Http2, dispatcher_.timeSource(),
171
1238
                                             Network::ConnectionInfoProviderSharedPtr{},
172
1238
                                             StreamInfo::FilterState::LifeSpan::FilterChain) {
173
  // TODO(cainelli): add a common library for tracing tags between gRPC implementations.
174
1238
  if (options.parent_span_ != nullptr) {
175
237
    const std::string child_span_name =
176
237
        options.child_span_name_.empty()
177
237
            ? absl::StrCat("async ", service_full_name, ".", method_name, " egress")
178
237
            : options.child_span_name_;
179
237
    current_span_ = options.parent_span_->spawnChild(Tracing::EgressConfig::get(), child_span_name,
180
237
                                                     parent.timeSource().systemTime());
181
237
    current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.stat_prefix_);
182
237
    current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.target_uri_);
183
237
    current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
184
1236
  } else {
185
1001
    current_span_ = std::make_unique<Tracing::NullSpan>();
186
1001
  }
187

            
188
1238
  if (options.sampled_.has_value()) {
189
1003
    current_span_->setSampled(options.sampled_.value());
190
1003
  }
191
1238
}
192

            
193
1238
GoogleAsyncStreamImpl::~GoogleAsyncStreamImpl() {
194
1238
  ENVOY_LOG(debug, "GoogleAsyncStreamImpl destruct");
195
1238
  if (options_.on_delete_callback_for_test_only) {
196
2
    options_.on_delete_callback_for_test_only();
197
2
  }
198
1238
}
199

            
200
GoogleAsyncStreamImpl::PendingMessage::PendingMessage(Buffer::InstancePtr request, bool end_stream)
201
65289
    : buf_(GoogleGrpcUtils::makeByteBuffer(std::move(request))), end_stream_(end_stream) {}
202

            
203
// TODO(htuch): figure out how to propagate "this request should be buffered for
204
// retry" bit to Google gRPC library.
205
1238
void GoogleAsyncStreamImpl::initialize(bool /*buffer_body_for_retry*/) {
206
1238
  parent_.stats_.streams_total_->inc();
207
1238
  gpr_timespec abs_deadline =
208
1238
      options_.timeout
209
1238
          ? gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
210
144
                         gpr_time_from_millis(options_.timeout.value().count(), GPR_TIMESPAN))
211
1238
          : gpr_inf_future(GPR_CLOCK_REALTIME);
212
1238
  ctxt_.set_deadline(abs_deadline);
213
  // Fill service-wide initial metadata.
214
1238
  auto initial_metadata = Http::RequestHeaderMapImpl::create();
215
  // TODO(cpakulski): Find a better way to access requestHeaders
216
  // request headers should not be stored in stream_info.
217
  // Maybe put it to parent_context?
218
1238
  parent_.metadata_parser_->evaluateHeaders(*initial_metadata, options_.parent_context.stream_info);
219
1238
  Tracing::HttpTraceContext trace_context(*initial_metadata);
220
1238
  Tracing::UpstreamContext upstream_context(nullptr,                          // host_
221
1238
                                            nullptr,                          // cluster_
222
1238
                                            Tracing::ServiceType::GoogleGrpc, // service_type_
223
1238
                                            true                              // async_client_span_
224
1238
  );
225
1238
  current_span_->injectContext(trace_context, upstream_context);
226
1238
  callbacks_.onCreateInitialMetadata(*initial_metadata);
227
1238
  initial_metadata->iterate([this](const Http::HeaderEntry& header) {
228
25
    ctxt_.AddMetadata(std::string(header.key().getStringView()),
229
25
                      std::string(header.value().getStringView()));
230
25
    return Http::HeaderMap::Iterate::Continue;
231
25
  });
232
  // Invoke stub call.
233
1238
  rw_ = parent_.stub_->PrepareCall(&ctxt_, "/" + service_full_name_ + "/" + method_name_,
234
1238
                                   &parent_.tls_.completionQueue());
235
1238
  if (rw_ == nullptr) {
236
4
    notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, nullptr, EMPTY_STRING);
237
4
    call_failed_ = true;
238
4
    return;
239
4
  }
240
1234
  parent_.tls_.registerStream(this);
241
1234
  rw_->StartCall(&init_tag_);
242
1234
  ++inflight_tags_;
243
1234
}
244

            
245
void GoogleAsyncStreamImpl::notifyRemoteClose(Status::GrpcStatus grpc_status,
246
                                              Http::ResponseTrailerMapPtr trailing_metadata,
247
1002
                                              const std::string& message) {
248
1002
  if (grpc_status > Status::WellKnownGrpcStatus::MaximumKnown || grpc_status < 0) {
249
1
    ENVOY_LOG(error, "notifyRemoteClose invalid gRPC status code {}", grpc_status);
250
    // Set the grpc_status as InvalidCode but increment the Unknown stream to avoid out-of-range
251
    // crash..
252
1
    grpc_status = Status::WellKnownGrpcStatus::InvalidCode;
253
1
    parent_.stats_.streams_closed_[Status::WellKnownGrpcStatus::Unknown]->inc();
254
1001
  } else {
255
1001
    parent_.stats_.streams_closed_[grpc_status]->inc();
256
1001
  }
257
1002
  ENVOY_LOG(debug, "notifyRemoteClose {} {}", grpc_status, message);
258
1002
  current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(grpc_status));
259
1002
  if (grpc_status != Status::WellKnownGrpcStatus::Ok) {
260
814
    current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
261
814
  }
262
1002
  current_span_->finishSpan();
263
1002
  if (!waiting_to_delete_on_remote_close_) {
264
998
    callbacks_.onReceiveTrailingMetadata(
265
998
        trailing_metadata ? std::move(trailing_metadata) : Http::ResponseTrailerMapImpl::create());
266
998
    callbacks_.onRemoteClose(grpc_status, message);
267
998
  }
268
1002
}
269

            
270
65289
void GoogleAsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) {
271
65289
  write_pending_queue_.emplace(std::move(request), end_stream);
272
65289
  ENVOY_LOG(trace, "Queued message to write ({} bytes)",
273
65289
            write_pending_queue_.back().buf_.value().Length());
274
65289
  bytes_in_write_pending_queue_ += write_pending_queue_.back().buf_.value().Length();
275
65289
  writeQueued();
276
65289
}
277

            
278
212
void GoogleAsyncStreamImpl::closeStream() {
279
  // Empty EOS write queued.
280
212
  current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
281
212
  current_span_->finishSpan();
282

            
283
212
  write_pending_queue_.emplace();
284
212
  writeQueued();
285
212
}
286

            
287
376
void GoogleAsyncStreamImpl::resetStream() {
288
376
  ENVOY_LOG(debug, "resetStream");
289
  // The gRPC API requires calling Finish() at the end of a stream, even
290
  // if the stream is cancelled.
291
376
  if (!finish_pending_) {
292
367
    finish_pending_ = true;
293
367
    rw_->Finish(&status_, &finish_tag_);
294
367
    ++inflight_tags_;
295
367
  }
296
376
  cleanup();
297
376
}
298

            
299
6
void GoogleAsyncStreamImpl::waitForRemoteCloseAndDelete() {
300
6
  if (!waiting_to_delete_on_remote_close_) {
301
6
    waiting_to_delete_on_remote_close_ = true;
302
6
    remote_close_timer_ = dispatcher_.createTimer([this] { resetStream(); });
303
6
    remote_close_timer_->enableTimer(options_.remote_close_timeout);
304
6
  }
305
6
}
306

            
307
131506
void GoogleAsyncStreamImpl::writeQueued() {
308
131506
  if (!call_initialized_ || finish_pending_ || write_pending_ || write_pending_queue_.empty() ||
309
131506
      draining_cq_) {
310
66201
    return;
311
66201
  }
312
65305
  write_pending_ = true;
313
65305
  const PendingMessage& msg = write_pending_queue_.front();
314

            
315
65305
  if (!msg.buf_) {
316
209
    ASSERT(msg.end_stream_);
317
209
    rw_->WritesDone(&write_last_tag_);
318
209
    ++inflight_tags_;
319
65096
  } else if (msg.end_stream_) {
320
158
    grpc::WriteOptions write_options;
321
158
    rw_->WriteLast(msg.buf_.value(), write_options, &write_last_tag_);
322
158
    ++inflight_tags_;
323
65083
  } else {
324
64938
    rw_->Write(msg.buf_.value(), &write_tag_);
325
64938
    ++inflight_tags_;
326
64938
  }
327
65305
  ENVOY_LOG(trace, "Write op dispatched");
328
65305
}
329

            
330
72657
void GoogleAsyncStreamImpl::onCompletedOps() {
331
  // The items in completed_ops_ execute in the order they were originally added to the queue since
332
  // both the post callback scheduled by the completionThread and the deferred deletion of the
333
  // GoogleAsyncClientThreadLocal happen on the dispatcher thread.
334
72657
  std::deque<std::pair<GoogleAsyncTag::Operation, bool>> completed_ops;
335
72657
  {
336
72657
    Thread::LockGuard lock(completed_ops_lock_);
337
72657
    completed_ops = std::move(completed_ops_);
338
    // completed_ops_ should be empty after the move.
339
72657
    ASSERT(completed_ops_.empty());
340
72657
  }
341

            
342
146473
  while (!completed_ops.empty()) {
343
73816
    GoogleAsyncTag::Operation op;
344
73816
    bool ok;
345
73816
    std::tie(op, ok) = completed_ops.front();
346
73816
    completed_ops.pop_front();
347
73816
    handleOpCompletion(op, ok);
348
73816
  }
349
72657
}
350

            
351
73816
void GoogleAsyncStreamImpl::handleOpCompletion(GoogleAsyncTag::Operation op, bool ok) {
352
73816
  ENVOY_LOG(trace, "handleOpCompletion op={} ok={} inflight={}", static_cast<int>(op), ok,
353
73816
            inflight_tags_);
354
73816
  ASSERT(inflight_tags_ > 0);
355
73816
  --inflight_tags_;
356
73816
  if (draining_cq_) {
357
821
    if (inflight_tags_ == 0) {
358
367
      deferredDelete();
359
367
    }
360
    // Ignore op completions while draining CQ.
361
821
    return;
362
821
  }
363
  // Consider failure cases first.
364
72995
  if (!ok) {
365
    // Early fails can be just treated as Internal.
366
1011
    if (op == GoogleAsyncTag::Operation::Init ||
367
1011
        op == GoogleAsyncTag::Operation::ReadInitialMetadata) {
368
130
      notifyRemoteClose(Status::WellKnownGrpcStatus::Internal, nullptr, EMPTY_STRING);
369
130
      resetStream();
370
130
      return;
371
130
    }
372
    // Remote server has closed, we can pick up some meaningful status.
373
    // TODO(htuch): We're assuming here that a failed Write/WriteLast operation will result in
374
    // stream termination, and pick up on the failed Read here. Confirm that this assumption is
375
    // valid.
376
881
    if (op == GoogleAsyncTag::Operation::Read) {
377
867
      finish_pending_ = true;
378
867
      rw_->Finish(&status_, &finish_tag_);
379
867
      ++inflight_tags_;
380
867
    }
381
881
    return;
382
1011
  }
383
71984
  switch (op) {
384
1096
  case GoogleAsyncTag::Operation::Init: {
385
1096
    ASSERT(ok);
386
1096
    ASSERT(!call_initialized_);
387
1096
    call_initialized_ = true;
388
1096
    rw_->ReadInitialMetadata(&read_initial_metadata_tag_);
389
1096
    ++inflight_tags_;
390
1096
    writeQueued();
391
1096
    break;
392
  }
393
1091
  case GoogleAsyncTag::Operation::ReadInitialMetadata: {
394
1091
    ASSERT(ok);
395
1091
    ASSERT(call_initialized_);
396
1091
    rw_->Read(&read_buf_, &read_tag_);
397
1091
    ++inflight_tags_;
398
1091
    Http::ResponseHeaderMapPtr initial_metadata = Http::ResponseHeaderMapImpl::create();
399
1091
    metadataTranslate(ctxt_.GetServerInitialMetadata(), *initial_metadata);
400
1091
    if (!waiting_to_delete_on_remote_close_) {
401
1091
      callbacks_.onReceiveInitialMetadata(std::move(initial_metadata));
402
1091
    }
403
1091
    break;
404
  }
405
64909
  case GoogleAsyncTag::Operation::Write: {
406
64909
    ASSERT(ok);
407
64909
    write_pending_ = false;
408
64909
    bytes_in_write_pending_queue_ -= write_pending_queue_.front().buf_.value().Length();
409
64909
    write_pending_queue_.pop();
410
64909
    writeQueued();
411
64909
    break;
412
  }
413
164
  case GoogleAsyncTag::Operation::WriteLast: {
414
164
    ASSERT(ok);
415
164
    write_pending_ = false;
416
164
    break;
417
  }
418
3857
  case GoogleAsyncTag::Operation::Read: {
419
3857
    ASSERT(ok);
420
3857
    auto buffer = GoogleGrpcUtils::makeBufferInstance(read_buf_);
421
3857
    if (!buffer || (!waiting_to_delete_on_remote_close_ &&
422
3857
                    !callbacks_.onReceiveMessageRaw(std::move(buffer)))) {
423
      // This is basically streamError in Grpc::AsyncClientImpl.
424
1
      notifyRemoteClose(Status::WellKnownGrpcStatus::Internal, nullptr, EMPTY_STRING);
425
1
      resetStream();
426
1
      break;
427
1
    }
428
3856
    rw_->Read(&read_buf_, &read_tag_);
429
3856
    ++inflight_tags_;
430
3856
    break;
431
3857
  }
432
867
  case GoogleAsyncTag::Operation::Finish: {
433
867
    ASSERT(finish_pending_);
434
867
    ENVOY_LOG(debug, "Finish with grpc-status code {}", static_cast<int>(status_.error_code()));
435
867
    Http::ResponseTrailerMapPtr trailing_metadata = Http::ResponseTrailerMapImpl::create();
436
867
    metadataTranslate(ctxt_.GetServerTrailingMetadata(), *trailing_metadata);
437
867
    notifyRemoteClose(static_cast<Status::GrpcStatus>(status_.error_code()),
438
867
                      std::move(trailing_metadata), status_.error_message());
439
867
    cleanup();
440
867
    break;
441
3857
  }
442
71984
  }
443
71984
}
444

            
445
void GoogleAsyncStreamImpl::metadataTranslate(
446
    const std::multimap<grpc::string_ref, grpc::string_ref>& grpc_metadata,
447
1958
    Http::HeaderMap& header_map) {
448
  // More painful copying, this time due to the mismatch in header
449
  // representation data structures in Envoy and Google gRPC.
450
1958
  for (const auto& it : grpc_metadata) {
451
7
    auto key = Http::LowerCaseString(std::string(it.first.data(), it.first.size()));
452
7
    if (absl::EndsWith(key.get(), "-bin")) {
453
2
      auto value = Base64::encode(it.second.data(), it.second.size());
454
2
      header_map.addCopy(key, value);
455
2
      continue;
456
2
    }
457
5
    header_map.addCopy(key, std::string(it.second.data(), it.second.size()));
458
5
  }
459
1958
}
460

            
461
1234
void GoogleAsyncStreamImpl::deferredDelete() {
462
1234
  ENVOY_LOG(debug, "Deferred delete");
463
1234
  tls_.unregisterStream(this);
464
  // We only get here following cleanup(), which has performed a
465
  // remoteFromList(), resulting in self-ownership of the object's memory.
466
  // Hence, it is safe here to create a unique_ptr to this and transfer
467
  // ownership to dispatcher_.deferredDelete(). After this call, no further
468
  // methods may be invoked on this object.
469
1234
  dispatcher_.deferredDelete(GoogleAsyncStreamImplPtr(this));
470
1234
}
471

            
472
1243
void GoogleAsyncStreamImpl::cleanup() {
473
1243
  ENVOY_LOG(debug, "Stream cleanup with {} in-flight tags", inflight_tags_);
474
  // We can get here if the client has already issued resetStream() and, while
475
  // this is in progress, the destructor runs.
476
1243
  if (draining_cq_) {
477
9
    ENVOY_LOG(debug, "Cleanup already in progress");
478
9
    return;
479
9
  }
480
1234
  draining_cq_ = true;
481
1234
  ctxt_.TryCancel();
482
1234
  if (LinkedObject<GoogleAsyncStreamImpl>::inserted()) {
483
    // We take ownership of our own memory at this point.
484
1234
    LinkedObject<GoogleAsyncStreamImpl>::removeFromList(parent_.active_streams_).release();
485
1234
    if (inflight_tags_ == 0) {
486
867
      deferredDelete();
487
867
    }
488
1234
  }
489
1234
  remote_close_timer_ = nullptr;
490
1234
}
491

            
492
GoogleAsyncRequestImpl::GoogleAsyncRequestImpl(
493
    GoogleAsyncClientImpl& parent, absl::string_view service_full_name,
494
    absl::string_view method_name, Buffer::InstancePtr request, RawAsyncRequestCallbacks& callbacks,
495
    Tracing::Span& parent_span, const Http::AsyncClient::RequestOptions& options)
496
176
    : GoogleAsyncStreamImpl(parent, service_full_name, method_name, *this, options),
497
176
      request_(std::move(request)), callbacks_(callbacks) {
498
176
  current_span_ =
499
176
      parent_span.spawnChild(Tracing::EgressConfig::get(),
500
176
                             absl::StrCat("async ", service_full_name, ".", method_name, " egress"),
501
176
                             parent.timeSource().systemTime());
502
176
  current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.stat_prefix_);
503
176
  current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.target_uri_);
504
176
  current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
505
176
}
506

            
507
176
void GoogleAsyncRequestImpl::initialize(bool buffer_body_for_retry) {
508
176
  GoogleAsyncStreamImpl::initialize(buffer_body_for_retry);
509
176
  if (callFailed()) {
510
1
    return;
511
1
  }
512
175
  sendMessageRaw(std::move(request_), true);
513
175
}
514

            
515
2
void GoogleAsyncRequestImpl::cancel() {
516
2
  current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
517
2
  current_span_->finishSpan();
518
2
  resetStream();
519
2
}
520

            
521
1
void GoogleAsyncRequestImpl::detach() {
522
  // TODO(wbpcode): In most tracers the span will hold a reference to the tracer self
523
  // and it's possible that become a dangling reference for long time async request.
524
  // This require further PR to resolve.
525

            
526
1
  options_.sidestream_watermark_callbacks = nullptr;
527
1
  options_.parent_span_ = nullptr;
528
1
  options_.parent_context.stream_info = nullptr;
529

            
530
1
  streamInfo().clearParentStreamInfo();
531
1
}
532

            
533
176
void GoogleAsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
534
176
  Tracing::HttpTraceContext trace_context(metadata);
535
176
  Tracing::UpstreamContext upstream_context(nullptr,                          // host_
536
176
                                            nullptr,                          // cluster_
537
176
                                            Tracing::ServiceType::GoogleGrpc, // service_type_
538
176
                                            true                              // async_client_span_
539
176
  );
540
176
  current_span_->injectContext(trace_context, upstream_context);
541
176
  callbacks_.onCreateInitialMetadata(metadata);
542
176
}
543

            
544
154
void GoogleAsyncRequestImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}
545

            
546
151
bool GoogleAsyncRequestImpl::onReceiveMessageRaw(Buffer::InstancePtr&& response) {
547
151
  response_ = std::move(response);
548
151
  return true;
549
151
}
550

            
551
171
void GoogleAsyncRequestImpl::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) {}
552

            
553
void GoogleAsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
554
171
                                           const std::string& message) {
555
171
  current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status));
556

            
557
171
  if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
558
19
    current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
559
19
    callbacks_.onFailure(status, message, *current_span_);
560
153
  } else if (response_ == nullptr) {
561
1
    current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
562
1
    callbacks_.onFailure(Status::Internal, EMPTY_STRING, *current_span_);
563
151
  } else {
564
151
    callbacks_.onSuccessRaw(std::move(response_), *current_span_);
565
151
  }
566

            
567
171
  current_span_->finishSpan();
568
171
}
569

            
570
} // namespace Grpc
571
} // namespace Envoy