Line data Source code
1 : #include "source/common/grpc/async_client_impl.h"
2 :
3 : #include "envoy/config/core/v3/grpc_service.pb.h"
4 :
5 : #include "source/common/buffer/zero_copy_input_stream_impl.h"
6 : #include "source/common/common/enum_to_int.h"
7 : #include "source/common/common/utility.h"
8 : #include "source/common/grpc/common.h"
9 : #include "source/common/http/header_map_impl.h"
10 : #include "source/common/http/utility.h"
11 :
12 : #include "absl/strings/str_cat.h"
13 :
14 : namespace Envoy {
15 : namespace Grpc {
16 :
17 : AsyncClientImpl::AsyncClientImpl(Upstream::ClusterManager& cm,
18 : const envoy::config::core::v3::GrpcService& config,
19 : TimeSource& time_source)
20 : : cm_(cm), remote_cluster_name_(config.envoy_grpc().cluster_name()),
21 : host_name_(config.envoy_grpc().authority()), time_source_(time_source),
22 : metadata_parser_(Router::HeaderParser::configure(
23 : config.initial_metadata(),
24 33 : envoy::config::core::v3::HeaderValueOption::OVERWRITE_IF_EXISTS_OR_ADD)) {}
25 :
26 33 : AsyncClientImpl::~AsyncClientImpl() {
27 33 : ASSERT(isThreadSafe());
28 33 : while (!active_streams_.empty()) {
29 0 : active_streams_.front()->resetStream();
30 0 : }
31 33 : }
32 :
33 : AsyncRequest* AsyncClientImpl::sendRaw(absl::string_view service_full_name,
34 : absl::string_view method_name, Buffer::InstancePtr&& request,
35 : RawAsyncRequestCallbacks& callbacks,
36 : Tracing::Span& parent_span,
37 0 : const Http::AsyncClient::RequestOptions& options) {
38 0 : ASSERT(isThreadSafe());
39 0 : auto* const async_request = new AsyncRequestImpl(
40 0 : *this, service_full_name, method_name, std::move(request), callbacks, parent_span, options);
41 0 : AsyncStreamImplPtr grpc_stream{async_request};
42 :
43 0 : grpc_stream->initialize(true);
44 0 : if (grpc_stream->hasResetStream()) {
45 0 : return nullptr;
46 0 : }
47 :
48 0 : LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
49 0 : return async_request;
50 0 : }
51 :
52 : RawAsyncStream* AsyncClientImpl::startRaw(absl::string_view service_full_name,
53 : absl::string_view method_name,
54 : RawAsyncStreamCallbacks& callbacks,
55 68 : const Http::AsyncClient::StreamOptions& options) {
56 68 : ASSERT(isThreadSafe());
57 68 : auto grpc_stream =
58 68 : std::make_unique<AsyncStreamImpl>(*this, service_full_name, method_name, callbacks, options);
59 :
60 68 : grpc_stream->initialize(options.buffer_body_for_retry);
61 68 : if (grpc_stream->hasResetStream()) {
62 0 : return nullptr;
63 0 : }
64 :
65 68 : LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
66 68 : return active_streams_.front().get();
67 68 : }
68 :
69 : AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
70 : absl::string_view method_name, RawAsyncStreamCallbacks& callbacks,
71 : const Http::AsyncClient::StreamOptions& options)
72 : : parent_(parent), service_full_name_(service_full_name), method_name_(method_name),
73 68 : callbacks_(callbacks), options_(options) {}
74 :
75 68 : void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
76 68 : const auto thread_local_cluster = parent_.cm_.getThreadLocalCluster(parent_.remote_cluster_name_);
77 68 : if (thread_local_cluster == nullptr) {
78 0 : callbacks_.onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "Cluster not available");
79 0 : http_reset_ = true;
80 0 : return;
81 0 : }
82 :
83 68 : auto& http_async_client = thread_local_cluster->httpAsyncClient();
84 68 : dispatcher_ = &http_async_client.dispatcher();
85 68 : stream_ = http_async_client.start(*this, options_.setBufferBodyForRetry(buffer_body_for_retry));
86 68 : if (stream_ == nullptr) {
87 0 : callbacks_.onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, EMPTY_STRING);
88 0 : http_reset_ = true;
89 0 : return;
90 0 : }
91 :
92 : // TODO(htuch): match Google gRPC base64 encoding behavior for *-bin headers, see
93 : // https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
94 68 : headers_message_ = Common::prepareHeaders(
95 68 : parent_.host_name_.empty() ? parent_.remote_cluster_name_ : parent_.host_name_,
96 68 : service_full_name_, method_name_, options_.timeout);
97 : // Fill service-wide initial metadata.
98 : // TODO(cpakulski): Find a better way to access requestHeaders
99 : // request headers should not be stored in stream_info.
100 : // Maybe put it to parent_context?
101 : // Since request headers may be empty, consider using Envoy::OptRef.
102 68 : parent_.metadata_parser_->evaluateHeaders(headers_message_->headers(),
103 68 : options_.parent_context.stream_info);
104 :
105 68 : callbacks_.onCreateInitialMetadata(headers_message_->headers());
106 68 : stream_->sendHeaders(headers_message_->headers(), false);
107 68 : }
108 :
109 : // TODO(htuch): match Google gRPC base64 encoding behavior for *-bin headers, see
110 : // https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
111 68 : void AsyncStreamImpl::onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
112 68 : const auto http_response_status = Http::Utility::getResponseStatus(*headers);
113 68 : const auto grpc_status = Common::getGrpcStatus(*headers);
114 68 : callbacks_.onReceiveInitialMetadata(end_stream ? Http::ResponseHeaderMapImpl::create()
115 68 : : std::move(headers));
116 68 : if (http_response_status != enumToInt(Http::Code::OK)) {
117 : // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that
118 : // grpc-status be used if available.
119 0 : if (end_stream && grpc_status) {
120 : // Due to headers/trailers type differences we need to copy here. This is an uncommon case but
121 : // we can potentially optimize in the future.
122 :
123 : // TODO(mattklein123): clang-tidy is showing a use after move when passing to
124 : // onReceiveInitialMetadata() above. This looks like an actual bug that I will fix in a
125 : // follow up.
126 : // NOLINTNEXTLINE(bugprone-use-after-move)
127 0 : onTrailers(Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*headers));
128 0 : return;
129 0 : }
130 : // Status is translated via Utility::httpToGrpcStatus per
131 : // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
132 0 : streamError(Utility::httpToGrpcStatus(http_response_status));
133 0 : return;
134 0 : }
135 68 : if (end_stream) {
136 : // Due to headers/trailers type differences we need to copy here. This is an uncommon case but
137 : // we can potentially optimize in the future.
138 19 : onTrailers(Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*headers));
139 19 : }
140 68 : }
141 :
142 253 : void AsyncStreamImpl::onData(Buffer::Instance& data, bool end_stream) {
143 253 : decoded_frames_.clear();
144 253 : if (!decoder_.decode(data, decoded_frames_)) {
145 0 : streamError(Status::WellKnownGrpcStatus::Internal);
146 0 : return;
147 0 : }
148 :
149 253 : for (auto& frame : decoded_frames_) {
150 253 : if (frame.length_ > 0 && frame.flags_ != GRPC_FH_DEFAULT) {
151 0 : streamError(Status::WellKnownGrpcStatus::Internal);
152 0 : return;
153 0 : }
154 253 : if (!callbacks_.onReceiveMessageRaw(frame.data_ ? std::move(frame.data_)
155 253 : : std::make_unique<Buffer::OwnedImpl>())) {
156 0 : streamError(Status::WellKnownGrpcStatus::Internal);
157 0 : return;
158 0 : }
159 253 : }
160 :
161 253 : if (end_stream) {
162 0 : streamError(Status::WellKnownGrpcStatus::Unknown);
163 0 : }
164 253 : }
165 :
166 : // TODO(htuch): match Google gRPC base64 encoding behavior for *-bin headers, see
167 : // https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
168 25 : void AsyncStreamImpl::onTrailers(Http::ResponseTrailerMapPtr&& trailers) {
169 25 : auto grpc_status = Common::getGrpcStatus(*trailers);
170 25 : const std::string grpc_message = Common::getGrpcMessage(*trailers);
171 25 : callbacks_.onReceiveTrailingMetadata(std::move(trailers));
172 25 : if (!grpc_status) {
173 0 : grpc_status = Status::WellKnownGrpcStatus::Unknown;
174 0 : }
175 25 : callbacks_.onRemoteClose(grpc_status.value(), grpc_message);
176 25 : cleanup();
177 25 : }
178 :
179 28 : void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::string& message) {
180 28 : callbacks_.onReceiveTrailingMetadata(Http::ResponseTrailerMapImpl::create());
181 28 : callbacks_.onRemoteClose(grpc_status, message);
182 28 : resetStream();
183 28 : }
184 :
185 0 : void AsyncStreamImpl::onComplete() {
186 : // No-op since stream completion is handled within other callbacks.
187 0 : }
188 :
189 68 : void AsyncStreamImpl::onReset() {
190 68 : if (http_reset_) {
191 40 : return;
192 40 : }
193 :
194 28 : http_reset_ = true;
195 28 : streamError(Status::WellKnownGrpcStatus::Internal);
196 28 : }
197 :
198 416 : void AsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& buffer, bool end_stream) {
199 416 : Common::prependGrpcFrameHeader(*buffer);
200 416 : stream_->sendData(*buffer, end_stream);
201 416 : }
202 :
203 15 : void AsyncStreamImpl::closeStream() {
204 15 : Buffer::OwnedImpl empty_buffer;
205 15 : stream_->sendData(empty_buffer, true);
206 15 : }
207 :
208 43 : void AsyncStreamImpl::resetStream() { cleanup(); }
209 :
210 68 : void AsyncStreamImpl::cleanup() {
211 68 : if (!http_reset_) {
212 40 : http_reset_ = true;
213 40 : stream_->reset();
214 40 : }
215 :
216 : // This will destroy us, but only do so if we are actually in a list. This does not happen in
217 : // the immediate failure case.
218 68 : if (LinkedObject<AsyncStreamImpl>::inserted()) {
219 68 : ASSERT(dispatcher_->isThreadSafe());
220 68 : dispatcher_->deferredDelete(
221 68 : LinkedObject<AsyncStreamImpl>::removeFromList(parent_.active_streams_));
222 68 : }
223 68 : }
224 :
225 : AsyncRequestImpl::AsyncRequestImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
226 : absl::string_view method_name, Buffer::InstancePtr&& request,
227 : RawAsyncRequestCallbacks& callbacks, Tracing::Span& parent_span,
228 : const Http::AsyncClient::RequestOptions& options)
229 : : AsyncStreamImpl(parent, service_full_name, method_name, *this, options),
230 0 : request_(std::move(request)), callbacks_(callbacks) {
231 :
232 0 : current_span_ =
233 0 : parent_span.spawnChild(Tracing::EgressConfig::get(),
234 0 : absl::StrCat("async ", service_full_name, ".", method_name, " egress"),
235 0 : parent.time_source_.systemTime());
236 0 : current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.remote_cluster_name_);
237 0 : current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.host_name_.empty()
238 0 : ? parent.remote_cluster_name_
239 0 : : parent.host_name_);
240 0 : current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
241 0 : }
242 :
243 0 : void AsyncRequestImpl::initialize(bool buffer_body_for_retry) {
244 0 : AsyncStreamImpl::initialize(buffer_body_for_retry);
245 0 : if (this->hasResetStream()) {
246 0 : return;
247 0 : }
248 0 : this->sendMessageRaw(std::move(request_), true);
249 0 : }
250 :
251 0 : void AsyncRequestImpl::cancel() {
252 0 : current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
253 0 : current_span_->finishSpan();
254 0 : this->resetStream();
255 0 : }
256 :
257 0 : void AsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
258 0 : Tracing::HttpTraceContext trace_context(metadata);
259 0 : current_span_->injectContext(trace_context, nullptr);
260 0 : callbacks_.onCreateInitialMetadata(metadata);
261 0 : }
262 :
263 0 : void AsyncRequestImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}
264 :
265 0 : bool AsyncRequestImpl::onReceiveMessageRaw(Buffer::InstancePtr&& response) {
266 0 : response_ = std::move(response);
267 0 : return true;
268 0 : }
269 :
270 0 : void AsyncRequestImpl::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) {}
271 :
272 0 : void AsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
273 0 : current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status));
274 :
275 0 : if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
276 0 : current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
277 0 : callbacks_.onFailure(status, message, *current_span_);
278 0 : } else if (response_ == nullptr) {
279 0 : current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
280 0 : callbacks_.onFailure(Status::Internal, EMPTY_STRING, *current_span_);
281 0 : } else {
282 0 : callbacks_.onSuccessRaw(std::move(response_), *current_span_);
283 0 : }
284 :
285 0 : current_span_->finishSpan();
286 0 : }
287 :
288 : } // namespace Grpc
289 : } // namespace Envoy
|