Line data Source code
1 : #include "source/common/grpc/google_async_client_impl.h"
2 :
3 : #include "envoy/common/time.h"
4 : #include "envoy/config/core/v3/grpc_service.pb.h"
5 : #include "envoy/http/protocol.h"
6 : #include "envoy/stats/scope.h"
7 :
8 : #include "source/common/common/base64.h"
9 : #include "source/common/common/empty_string.h"
10 : #include "source/common/common/lock_guard.h"
11 : #include "source/common/common/utility.h"
12 : #include "source/common/config/datasource.h"
13 : #include "source/common/grpc/common.h"
14 : #include "source/common/grpc/google_grpc_creds_impl.h"
15 : #include "source/common/grpc/google_grpc_utils.h"
16 : #include "source/common/router/header_parser.h"
17 : #include "source/common/tracing/http_tracer_impl.h"
18 :
19 : #include "absl/strings/str_cat.h"
20 : #include "grpcpp/support/proto_buffer_reader.h"
21 :
22 : namespace Envoy {
23 : namespace Grpc {
24 : namespace {
25 : static constexpr int DefaultBufferLimitBytes = 1024 * 1024;
26 : }
27 :
28 : GoogleAsyncClientThreadLocal::GoogleAsyncClientThreadLocal(Api::Api& api)
29 226 : : completion_thread_(api.threadFactory().createThread([this] { completionThread(); },
30 226 : Thread::Options{"GrpcGoogClient"})) {}
31 :
32 226 : GoogleAsyncClientThreadLocal::~GoogleAsyncClientThreadLocal() {
33 : // Force streams to shutdown and invoke TryCancel() to start the drain of
34 : // pending op. If we don't do this, Shutdown() below can jam on pending ops.
35 : // This is also required to satisfy the contract that once Shutdown is called,
36 : // streams no longer queue any additional tags.
37 228 : for (auto it = streams_.begin(); it != streams_.end();) {
38 : // resetStream() may result in immediate unregisterStream() and erase(),
39 : // which would invalidate the iterator for the current element, so make sure
40 : // we point to the next one first.
41 2 : (*it++)->resetStream();
42 2 : }
43 226 : cq_.Shutdown();
44 226 : ENVOY_LOG(debug, "Joining completionThread");
45 226 : completion_thread_->join();
46 226 : ENVOY_LOG(debug, "Joined completionThread");
47 : // Ensure that we have cleaned up all orphan streams, now that CQ is gone.
48 228 : while (!streams_.empty()) {
49 2 : (*streams_.begin())->onCompletedOps();
50 2 : }
51 226 : }
52 :
53 226 : void GoogleAsyncClientThreadLocal::completionThread() {
54 226 : ENVOY_LOG(debug, "completionThread running");
55 226 : void* tag;
56 226 : bool ok;
57 230 : while (cq_.Next(&tag, &ok)) {
58 4 : const auto& google_async_tag = *reinterpret_cast<GoogleAsyncTag*>(tag);
59 4 : const GoogleAsyncTag::Operation op = google_async_tag.op_;
60 4 : GoogleAsyncStreamImpl& stream = google_async_tag.stream_;
61 4 : ENVOY_LOG(trace, "completionThread CQ event {} {}", op, ok);
62 4 : Thread::LockGuard lock(stream.completed_ops_lock_);
63 :
64 : // It's an invariant that there must only be one pending post for arbitrary
65 : // length completed_ops_, otherwise we can race in stream destruction, where
66 : // we process multiple events in onCompletedOps() but have only partially
67 : // consumed the posts on the dispatcher.
68 : // TODO(htuch): This may result in unbounded processing on the silo thread
69 : // in onCompletedOps() in extreme cases, when we emplace_back() in
70 : // completionThread() at a high rate, consider bounding the length of such
71 : // sequences if this behavior becomes an issue.
72 4 : if (stream.completed_ops_.empty()) {
73 3 : stream.dispatcher_.post([&stream] { stream.onCompletedOps(); });
74 3 : }
75 4 : stream.completed_ops_.emplace_back(op, ok);
76 4 : }
77 226 : ENVOY_LOG(debug, "completionThread exiting");
78 226 : }
79 :
80 : GoogleAsyncClientImpl::GoogleAsyncClientImpl(Event::Dispatcher& dispatcher,
81 : GoogleAsyncClientThreadLocal& tls,
82 : GoogleStubFactory& stub_factory,
83 : Stats::ScopeSharedPtr scope,
84 : const envoy::config::core::v3::GrpcService& config,
85 : Api::Api& api, const StatNames& stat_names)
86 : : dispatcher_(dispatcher), tls_(tls), stat_prefix_(config.google_grpc().stat_prefix()),
87 : target_uri_(config.google_grpc().target_uri()), scope_(scope),
88 : per_stream_buffer_limit_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
89 : config.google_grpc(), per_stream_buffer_limit_bytes, DefaultBufferLimitBytes)),
90 : metadata_parser_(Router::HeaderParser::configure(
91 : config.initial_metadata(),
92 2 : envoy::config::core::v3::HeaderValueOption::OVERWRITE_IF_EXISTS_OR_ADD)) {
93 : // We rebuild the channel each time we construct the channel. It appears that the gRPC library is
94 : // smart enough to do connection pooling and reuse with identical channel args, so this should
95 : // have comparable overhead to what we are doing in Grpc::AsyncClientImpl, i.e. no expensive
96 : // new connection implied.
97 2 : std::shared_ptr<grpc::Channel> channel = GoogleGrpcUtils::createChannel(config, api);
98 : // Get state with try_to_connect = true to try connection at channel creation.
99 : // This is for initializing gRPC channel at channel creation. This GetState(true) is used to poke
100 : // the gRPC lb at channel creation, it doesn't have any effect no matter it succeeds or fails. But
101 : // it helps on initialization. Otherwise, the channel establishment still happens at the first
102 : // request, no matter when we create the channel.
103 2 : channel->GetState(true);
104 2 : stub_ = stub_factory.createStub(channel);
105 2 : scope_->counterFromStatName(stat_names.google_grpc_client_creation_).inc();
106 : // Initialize client stats.
107 : // TODO(jmarantz): Capture these names in async_client_manager_impl.cc and
108 : // pass in a struct of StatName objects so we don't have to take locks here.
109 2 : stats_.streams_total_ = &scope_->counterFromStatName(stat_names.streams_total_);
110 36 : for (uint32_t i = 0; i <= Status::WellKnownGrpcStatus::MaximumKnown; ++i) {
111 34 : stats_.streams_closed_[i] = &scope_->counterFromStatName(stat_names.streams_closed_[i]);
112 34 : }
113 2 : }
114 :
115 2 : GoogleAsyncClientImpl::~GoogleAsyncClientImpl() {
116 2 : ASSERT(isThreadSafe());
117 2 : ENVOY_LOG(debug, "Client teardown, resetting streams");
118 2 : while (!active_streams_.empty()) {
119 0 : active_streams_.front()->resetStream();
120 0 : }
121 2 : }
122 :
123 : AsyncRequest* GoogleAsyncClientImpl::sendRaw(absl::string_view service_full_name,
124 : absl::string_view method_name,
125 : Buffer::InstancePtr&& request,
126 : RawAsyncRequestCallbacks& callbacks,
127 : Tracing::Span& parent_span,
128 0 : const Http::AsyncClient::RequestOptions& options) {
129 0 : ASSERT(isThreadSafe());
130 0 : auto* const async_request = new GoogleAsyncRequestImpl(
131 0 : *this, service_full_name, method_name, std::move(request), callbacks, parent_span, options);
132 0 : GoogleAsyncStreamImplPtr grpc_stream{async_request};
133 :
134 0 : grpc_stream->initialize(true);
135 0 : if (grpc_stream->callFailed()) {
136 0 : return nullptr;
137 0 : }
138 :
139 0 : LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
140 0 : return async_request;
141 0 : }
142 :
143 : RawAsyncStream* GoogleAsyncClientImpl::startRaw(absl::string_view service_full_name,
144 : absl::string_view method_name,
145 : RawAsyncStreamCallbacks& callbacks,
146 2 : const Http::AsyncClient::StreamOptions& options) {
147 2 : ASSERT(isThreadSafe());
148 2 : auto grpc_stream = std::make_unique<GoogleAsyncStreamImpl>(*this, service_full_name, method_name,
149 2 : callbacks, options);
150 :
151 2 : grpc_stream->initialize(false);
152 2 : if (grpc_stream->callFailed()) {
153 0 : return nullptr;
154 0 : }
155 :
156 2 : LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
157 2 : return active_streams_.front().get();
158 2 : }
159 :
160 : GoogleAsyncStreamImpl::GoogleAsyncStreamImpl(GoogleAsyncClientImpl& parent,
161 : absl::string_view service_full_name,
162 : absl::string_view method_name,
163 : RawAsyncStreamCallbacks& callbacks,
164 : const Http::AsyncClient::StreamOptions& options)
165 : : parent_(parent), tls_(parent_.tls_), dispatcher_(parent_.dispatcher_), stub_(parent_.stub_),
166 : service_full_name_(service_full_name), method_name_(method_name), callbacks_(callbacks),
167 : options_(options), unused_stream_info_(Http::Protocol::Http2, dispatcher_.timeSource(),
168 2 : Network::ConnectionInfoProviderSharedPtr{}) {}
169 :
170 2 : GoogleAsyncStreamImpl::~GoogleAsyncStreamImpl() {
171 2 : ENVOY_LOG(debug, "GoogleAsyncStreamImpl destruct");
172 2 : }
173 :
174 : GoogleAsyncStreamImpl::PendingMessage::PendingMessage(Buffer::InstancePtr request, bool end_stream)
175 1 : : buf_(GoogleGrpcUtils::makeByteBuffer(std::move(request))), end_stream_(end_stream) {}
176 :
177 : // TODO(htuch): figure out how to propagate "this request should be buffered for
178 : // retry" bit to Google gRPC library.
179 2 : void GoogleAsyncStreamImpl::initialize(bool /*buffer_body_for_retry*/) {
180 2 : parent_.stats_.streams_total_->inc();
181 2 : gpr_timespec abs_deadline =
182 2 : options_.timeout
183 2 : ? gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
184 0 : gpr_time_from_millis(options_.timeout.value().count(), GPR_TIMESPAN))
185 2 : : gpr_inf_future(GPR_CLOCK_REALTIME);
186 2 : ctxt_.set_deadline(abs_deadline);
187 : // Fill service-wide initial metadata.
188 2 : auto initial_metadata = Http::RequestHeaderMapImpl::create();
189 : // TODO(cpakulski): Find a better way to access requestHeaders
190 : // request headers should not be stored in stream_info.
191 : // Maybe put it to parent_context?
192 2 : parent_.metadata_parser_->evaluateHeaders(*initial_metadata, options_.parent_context.stream_info);
193 2 : callbacks_.onCreateInitialMetadata(*initial_metadata);
194 2 : initial_metadata->iterate([this](const Http::HeaderEntry& header) {
195 0 : ctxt_.AddMetadata(std::string(header.key().getStringView()),
196 0 : std::string(header.value().getStringView()));
197 0 : return Http::HeaderMap::Iterate::Continue;
198 0 : });
199 : // Invoke stub call.
200 2 : rw_ = parent_.stub_->PrepareCall(&ctxt_, "/" + service_full_name_ + "/" + method_name_,
201 2 : &parent_.tls_.completionQueue());
202 2 : if (rw_ == nullptr) {
203 0 : notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, nullptr, EMPTY_STRING);
204 0 : call_failed_ = true;
205 0 : return;
206 0 : }
207 2 : parent_.tls_.registerStream(this);
208 2 : rw_->StartCall(&init_tag_);
209 2 : ++inflight_tags_;
210 2 : }
211 :
212 : void GoogleAsyncStreamImpl::notifyRemoteClose(Status::GrpcStatus grpc_status,
213 : Http::ResponseTrailerMapPtr trailing_metadata,
214 1 : const std::string& message) {
215 1 : if (grpc_status > Status::WellKnownGrpcStatus::MaximumKnown || grpc_status < 0) {
216 0 : ENVOY_LOG(error, "notifyRemoteClose invalid gRPC status code {}", grpc_status);
217 : // Set the grpc_status as InvalidCode but increment the Unknown stream to avoid out-of-range
218 : // crash..
219 0 : grpc_status = Status::WellKnownGrpcStatus::InvalidCode;
220 0 : parent_.stats_.streams_closed_[Status::WellKnownGrpcStatus::Unknown]->inc();
221 1 : } else {
222 1 : parent_.stats_.streams_closed_[grpc_status]->inc();
223 1 : }
224 1 : ENVOY_LOG(debug, "notifyRemoteClose {} {}", grpc_status, message);
225 1 : callbacks_.onReceiveTrailingMetadata(trailing_metadata ? std::move(trailing_metadata)
226 1 : : Http::ResponseTrailerMapImpl::create());
227 1 : callbacks_.onRemoteClose(grpc_status, message);
228 1 : }
229 :
230 1 : void GoogleAsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) {
231 1 : write_pending_queue_.emplace(std::move(request), end_stream);
232 1 : ENVOY_LOG(trace, "Queued message to write ({} bytes)",
233 1 : write_pending_queue_.back().buf_.value().Length());
234 1 : bytes_in_write_pending_queue_ += write_pending_queue_.back().buf_.value().Length();
235 1 : writeQueued();
236 1 : }
237 :
238 0 : void GoogleAsyncStreamImpl::closeStream() {
239 : // Empty EOS write queued.
240 0 : write_pending_queue_.emplace();
241 0 : writeQueued();
242 0 : }
243 :
244 3 : void GoogleAsyncStreamImpl::resetStream() {
245 3 : ENVOY_LOG(debug, "resetStream");
246 : // The gRPC API requires calling Finish() at the end of a stream, even
247 : // if the stream is cancelled.
248 3 : if (!finish_pending_) {
249 2 : finish_pending_ = true;
250 2 : rw_->Finish(&status_, &finish_tag_);
251 2 : ++inflight_tags_;
252 2 : }
253 3 : cleanup();
254 3 : }
255 :
256 1 : void GoogleAsyncStreamImpl::writeQueued() {
257 1 : if (!call_initialized_ || finish_pending_ || write_pending_ || write_pending_queue_.empty() ||
258 1 : draining_cq_) {
259 1 : return;
260 1 : }
261 0 : write_pending_ = true;
262 0 : const PendingMessage& msg = write_pending_queue_.front();
263 :
264 0 : if (!msg.buf_) {
265 0 : ASSERT(msg.end_stream_);
266 0 : rw_->WritesDone(&write_last_tag_);
267 0 : ++inflight_tags_;
268 0 : } else if (msg.end_stream_) {
269 0 : grpc::WriteOptions write_options;
270 0 : rw_->WriteLast(msg.buf_.value(), write_options, &write_last_tag_);
271 0 : ++inflight_tags_;
272 0 : } else {
273 0 : rw_->Write(msg.buf_.value(), &write_tag_);
274 0 : ++inflight_tags_;
275 0 : }
276 0 : ENVOY_LOG(trace, "Write op dispatched");
277 0 : }
278 :
279 3 : void GoogleAsyncStreamImpl::onCompletedOps() {
280 : // The items in completed_ops_ execute in the order they were originally added to the queue since
281 : // both the post callback scheduled by the completionThread and the deferred deletion of the
282 : // GoogleAsyncClientThreadLocal happen on the dispatcher thread.
283 3 : std::deque<std::pair<GoogleAsyncTag::Operation, bool>> completed_ops;
284 3 : {
285 3 : Thread::LockGuard lock(completed_ops_lock_);
286 3 : completed_ops = std::move(completed_ops_);
287 : // completed_ops_ should be empty after the move.
288 3 : ASSERT(completed_ops_.empty());
289 3 : }
290 :
291 7 : while (!completed_ops.empty()) {
292 4 : GoogleAsyncTag::Operation op;
293 4 : bool ok;
294 4 : std::tie(op, ok) = completed_ops.front();
295 4 : completed_ops.pop_front();
296 4 : handleOpCompletion(op, ok);
297 4 : }
298 3 : }
299 :
300 4 : void GoogleAsyncStreamImpl::handleOpCompletion(GoogleAsyncTag::Operation op, bool ok) {
301 4 : ENVOY_LOG(trace, "handleOpCompletion op={} ok={} inflight={}", op, ok, inflight_tags_);
302 4 : ASSERT(inflight_tags_ > 0);
303 4 : --inflight_tags_;
304 4 : if (draining_cq_) {
305 3 : if (inflight_tags_ == 0) {
306 2 : deferredDelete();
307 2 : }
308 : // Ignore op completions while draining CQ.
309 3 : return;
310 3 : }
311 : // Consider failure cases first.
312 1 : if (!ok) {
313 : // Early fails can be just treated as Internal.
314 1 : if (op == GoogleAsyncTag::Operation::Init ||
315 1 : op == GoogleAsyncTag::Operation::ReadInitialMetadata) {
316 1 : notifyRemoteClose(Status::WellKnownGrpcStatus::Internal, nullptr, EMPTY_STRING);
317 1 : resetStream();
318 1 : return;
319 1 : }
320 : // Remote server has closed, we can pick up some meaningful status.
321 : // TODO(htuch): We're assuming here that a failed Write/WriteLast operation will result in
322 : // stream termination, and pick up on the failed Read here. Confirm that this assumption is
323 : // valid.
324 0 : if (op == GoogleAsyncTag::Operation::Read) {
325 0 : finish_pending_ = true;
326 0 : rw_->Finish(&status_, &finish_tag_);
327 0 : ++inflight_tags_;
328 0 : }
329 0 : return;
330 1 : }
331 0 : switch (op) {
332 0 : case GoogleAsyncTag::Operation::Init: {
333 0 : ASSERT(ok);
334 0 : ASSERT(!call_initialized_);
335 0 : call_initialized_ = true;
336 0 : rw_->ReadInitialMetadata(&read_initial_metadata_tag_);
337 0 : ++inflight_tags_;
338 0 : writeQueued();
339 0 : break;
340 0 : }
341 0 : case GoogleAsyncTag::Operation::ReadInitialMetadata: {
342 0 : ASSERT(ok);
343 0 : ASSERT(call_initialized_);
344 0 : rw_->Read(&read_buf_, &read_tag_);
345 0 : ++inflight_tags_;
346 0 : Http::ResponseHeaderMapPtr initial_metadata = Http::ResponseHeaderMapImpl::create();
347 0 : metadataTranslate(ctxt_.GetServerInitialMetadata(), *initial_metadata);
348 0 : callbacks_.onReceiveInitialMetadata(std::move(initial_metadata));
349 0 : break;
350 0 : }
351 0 : case GoogleAsyncTag::Operation::Write: {
352 0 : ASSERT(ok);
353 0 : write_pending_ = false;
354 0 : bytes_in_write_pending_queue_ -= write_pending_queue_.front().buf_.value().Length();
355 0 : write_pending_queue_.pop();
356 0 : writeQueued();
357 0 : break;
358 0 : }
359 0 : case GoogleAsyncTag::Operation::WriteLast: {
360 0 : ASSERT(ok);
361 0 : write_pending_ = false;
362 0 : break;
363 0 : }
364 0 : case GoogleAsyncTag::Operation::Read: {
365 0 : ASSERT(ok);
366 0 : auto buffer = GoogleGrpcUtils::makeBufferInstance(read_buf_);
367 0 : if (!buffer || !callbacks_.onReceiveMessageRaw(std::move(buffer))) {
368 : // This is basically streamError in Grpc::AsyncClientImpl.
369 0 : notifyRemoteClose(Status::WellKnownGrpcStatus::Internal, nullptr, EMPTY_STRING);
370 0 : resetStream();
371 0 : break;
372 0 : }
373 0 : rw_->Read(&read_buf_, &read_tag_);
374 0 : ++inflight_tags_;
375 0 : break;
376 0 : }
377 0 : case GoogleAsyncTag::Operation::Finish: {
378 0 : ASSERT(finish_pending_);
379 0 : ENVOY_LOG(debug, "Finish with grpc-status code {}", status_.error_code());
380 0 : Http::ResponseTrailerMapPtr trailing_metadata = Http::ResponseTrailerMapImpl::create();
381 0 : metadataTranslate(ctxt_.GetServerTrailingMetadata(), *trailing_metadata);
382 0 : notifyRemoteClose(static_cast<Status::GrpcStatus>(status_.error_code()),
383 0 : std::move(trailing_metadata), status_.error_message());
384 0 : cleanup();
385 0 : break;
386 0 : }
387 0 : }
388 0 : }
389 :
390 : void GoogleAsyncStreamImpl::metadataTranslate(
391 : const std::multimap<grpc::string_ref, grpc::string_ref>& grpc_metadata,
392 0 : Http::HeaderMap& header_map) {
393 : // More painful copying, this time due to the mismatch in header
394 : // representation data structures in Envoy and Google gRPC.
395 0 : for (const auto& it : grpc_metadata) {
396 0 : auto key = Http::LowerCaseString(std::string(it.first.data(), it.first.size()));
397 0 : if (absl::EndsWith(key.get(), "-bin")) {
398 0 : auto value = Base64::encode(it.second.data(), it.second.size());
399 0 : header_map.addCopy(key, value);
400 0 : continue;
401 0 : }
402 0 : header_map.addCopy(key, std::string(it.second.data(), it.second.size()));
403 0 : }
404 0 : }
405 :
406 2 : void GoogleAsyncStreamImpl::deferredDelete() {
407 2 : ENVOY_LOG(debug, "Deferred delete");
408 2 : tls_.unregisterStream(this);
409 : // We only get here following cleanup(), which has performed a
410 : // remoteFromList(), resulting in self-ownership of the object's memory.
411 : // Hence, it is safe here to create a unique_ptr to this and transfer
412 : // ownership to dispatcher_.deferredDelete(). After this call, no further
413 : // methods may be invoked on this object.
414 2 : dispatcher_.deferredDelete(GoogleAsyncStreamImplPtr(this));
415 2 : }
416 :
417 3 : void GoogleAsyncStreamImpl::cleanup() {
418 3 : ENVOY_LOG(debug, "Stream cleanup with {} in-flight tags", inflight_tags_);
419 : // We can get here if the client has already issued resetStream() and, while
420 : // this is in progress, the destructor runs.
421 3 : if (draining_cq_) {
422 1 : ENVOY_LOG(debug, "Cleanup already in progress");
423 1 : return;
424 1 : }
425 2 : draining_cq_ = true;
426 2 : ctxt_.TryCancel();
427 2 : if (LinkedObject<GoogleAsyncStreamImpl>::inserted()) {
428 : // We take ownership of our own memory at this point.
429 2 : LinkedObject<GoogleAsyncStreamImpl>::removeFromList(parent_.active_streams_).release();
430 2 : if (inflight_tags_ == 0) {
431 0 : deferredDelete();
432 0 : }
433 2 : }
434 2 : }
435 :
436 : GoogleAsyncRequestImpl::GoogleAsyncRequestImpl(
437 : GoogleAsyncClientImpl& parent, absl::string_view service_full_name,
438 : absl::string_view method_name, Buffer::InstancePtr request, RawAsyncRequestCallbacks& callbacks,
439 : Tracing::Span& parent_span, const Http::AsyncClient::RequestOptions& options)
440 : : GoogleAsyncStreamImpl(parent, service_full_name, method_name, *this, options),
441 0 : request_(std::move(request)), callbacks_(callbacks) {
442 0 : current_span_ =
443 0 : parent_span.spawnChild(Tracing::EgressConfig::get(),
444 0 : absl::StrCat("async ", service_full_name, ".", method_name, " egress"),
445 0 : parent.timeSource().systemTime());
446 0 : current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.stat_prefix_);
447 0 : current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.target_uri_);
448 0 : current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
449 0 : }
450 :
451 0 : void GoogleAsyncRequestImpl::initialize(bool buffer_body_for_retry) {
452 0 : GoogleAsyncStreamImpl::initialize(buffer_body_for_retry);
453 0 : if (callFailed()) {
454 0 : return;
455 0 : }
456 0 : sendMessageRaw(std::move(request_), true);
457 0 : }
458 :
459 0 : void GoogleAsyncRequestImpl::cancel() {
460 0 : current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
461 0 : current_span_->finishSpan();
462 0 : resetStream();
463 0 : }
464 :
465 0 : void GoogleAsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
466 0 : Tracing::HttpTraceContext trace_context(metadata);
467 0 : current_span_->injectContext(trace_context, nullptr);
468 0 : callbacks_.onCreateInitialMetadata(metadata);
469 0 : }
470 :
471 0 : void GoogleAsyncRequestImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}
472 :
473 0 : bool GoogleAsyncRequestImpl::onReceiveMessageRaw(Buffer::InstancePtr&& response) {
474 0 : response_ = std::move(response);
475 0 : return true;
476 0 : }
477 :
478 0 : void GoogleAsyncRequestImpl::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) {}
479 :
480 : void GoogleAsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
481 0 : const std::string& message) {
482 0 : current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status));
483 :
484 0 : if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
485 0 : current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
486 0 : callbacks_.onFailure(status, message, *current_span_);
487 0 : } else if (response_ == nullptr) {
488 0 : current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
489 0 : callbacks_.onFailure(Status::Internal, EMPTY_STRING, *current_span_);
490 0 : } else {
491 0 : callbacks_.onSuccessRaw(std::move(response_), *current_span_);
492 0 : }
493 :
494 0 : current_span_->finishSpan();
495 0 : }
496 :
497 : } // namespace Grpc
498 : } // namespace Envoy
|