Line data Source code
1 : #pragma once
2 :
3 : #include <memory>
4 : #include <queue>
5 :
6 : #include "envoy/api/api.h"
7 : #include "envoy/common/platform.h"
8 : #include "envoy/config/core/v3/base.pb.h"
9 : #include "envoy/config/core/v3/grpc_service.pb.h"
10 : #include "envoy/grpc/async_client.h"
11 : #include "envoy/stats/scope.h"
12 : #include "envoy/stream_info/stream_info.h"
13 : #include "envoy/thread/thread.h"
14 : #include "envoy/thread_local/thread_local_object.h"
15 : #include "envoy/tracing/tracer.h"
16 :
17 : #include "source/common/common/linked_object.h"
18 : #include "source/common/common/thread.h"
19 : #include "source/common/common/thread_annotations.h"
20 : #include "source/common/grpc/google_grpc_context.h"
21 : #include "source/common/grpc/stat_names.h"
22 : #include "source/common/grpc/typed_async_client.h"
23 : #include "source/common/router/header_parser.h"
24 : #include "source/common/stream_info/stream_info_impl.h"
25 : #include "source/common/tracing/http_tracer_impl.h"
26 :
27 : #include "absl/container/node_hash_set.h"
28 : #include "grpcpp/generic/generic_stub.h"
29 : #include "grpcpp/grpcpp.h"
30 : #include "grpcpp/support/proto_buffer_writer.h"
31 :
32 : namespace Envoy {
33 : namespace Grpc {
34 :
35 : class GoogleAsyncStreamImpl;
36 :
37 : using GoogleAsyncStreamImplPtr = std::unique_ptr<GoogleAsyncStreamImpl>;
38 :
39 : class GoogleAsyncRequestImpl;
40 :
41 : struct GoogleAsyncTag {
42 : // Operation defines tags that are handed to the gRPC AsyncReaderWriter for use in completion
43 : // notification for their namesake operations. Read* and Write* operations may be outstanding
44 : // simultaneously, but there will be no more than one operation of each type in-flight for a given
45 : // stream. Init and Finish will both be issued exclusively when no other operations are in-flight
46 : // for a stream. See
47 : // https://github.com/grpc/grpc/blob/master/include/grpc%2B%2B/impl/codegen/async_stream.h for
48 : // further insight into the semantics of the different gRPC client operations.
49 : enum Operation {
50 : // Initial stub call issued, waiting for initialization to complete.
51 : Init = 0,
52 : // Waiting for initial meta-data from server following Init completion.
53 : ReadInitialMetadata,
54 : // Waiting for response protobuf from server following ReadInitialMetadata completion.
55 : Read,
56 : // Waiting for write of request protobuf to server to complete.
57 : Write,
58 : // Waiting for write of request protobuf (EOS) __OR__ an EOS WritesDone to server to complete.
59 : WriteLast,
60 : // Waiting for final status. This must only be issued once all Read* and Write* operations have
61 : // completed.
62 : Finish,
63 : };
64 :
65 12 : GoogleAsyncTag(GoogleAsyncStreamImpl& stream, Operation op) : stream_(stream), op_(op) {}
66 :
67 : GoogleAsyncStreamImpl& stream_;
68 : const Operation op_;
69 : };
70 :
71 : class GoogleAsyncClientThreadLocal : public ThreadLocal::ThreadLocalObject,
72 : Logger::Loggable<Logger::Id::grpc> {
73 : public:
74 : GoogleAsyncClientThreadLocal(Api::Api& api);
75 : ~GoogleAsyncClientThreadLocal() override;
76 :
77 2 : grpc::CompletionQueue& completionQueue() { return cq_; }
78 :
79 2 : void registerStream(GoogleAsyncStreamImpl* stream) {
80 2 : ASSERT(streams_.find(stream) == streams_.end());
81 2 : streams_.insert(stream);
82 2 : }
83 :
84 2 : void unregisterStream(GoogleAsyncStreamImpl* stream) {
85 2 : auto it = streams_.find(stream);
86 2 : ASSERT(it != streams_.end());
87 2 : streams_.erase(it);
88 2 : }
89 :
90 : private:
91 : void completionThread();
92 :
93 : // There is blanket google-grpc initialization in MainCommonBase, but that
94 : // doesn't cover unit tests. However, putting blanket coverage in ProcessWide
95 : // causes background threaded memory allocation in all unit tests making it
96 : // hard to measure memory. Thus we also initialize grpc using our idempotent
97 : // wrapper-class in classes that need it. See
98 : // https://github.com/envoyproxy/envoy/issues/8282 for details.
99 : GoogleGrpcContext google_grpc_context_;
100 :
101 : // The CompletionQueue for in-flight operations. This must precede completion_thread_ to ensure it
102 : // is constructed before the thread runs.
103 : grpc::CompletionQueue cq_;
104 : // The threading model for the Google gRPC C++ library is not directly compatible with Envoy's
105 : // siloed model. We resolve this by issuing non-blocking asynchronous
106 : // operations on the GoogleAsyncClientImpl silo thread, and then synchronously
107 : // blocking on a completion queue, cq_, on a distinct thread. When cq_ events
108 : // are delivered, we cross-post to the silo dispatcher to continue the
109 : // operation.
110 : //
111 : // We have an independent completion thread for each TLS silo (i.e. one per worker and
112 : // also one for the main thread).
113 : Thread::ThreadPtr completion_thread_;
114 : // Track all streams that are currently using this CQ, so we can notify them
115 : // on shutdown.
116 : absl::node_hash_set<GoogleAsyncStreamImpl*> streams_;
117 : };
118 :
119 : using GoogleAsyncClientThreadLocalPtr = std::unique_ptr<GoogleAsyncClientThreadLocal>;
120 :
121 : // Google gRPC client stats. TODO(htuch): consider how a wider set of stats collected by the
122 : // library, such as the census related ones, can be externalized as needed.
123 : struct GoogleAsyncClientStats {
124 : // .streams_total
125 : Stats::Counter* streams_total_;
126 : // .streams_closed_<gRPC status code>
127 : std::array<Stats::Counter*, Status::WellKnownGrpcStatus::MaximumKnown + 1> streams_closed_;
128 : };
129 :
130 : // Interface to allow the gRPC stub to be mocked out by tests.
131 : class GoogleStub {
132 : public:
133 2 : virtual ~GoogleStub() = default;
134 :
135 : // See grpc::PrepareCall().
136 : virtual std::unique_ptr<grpc::GenericClientAsyncReaderWriter>
137 : PrepareCall(grpc::ClientContext* context, const grpc::string& method,
138 : grpc::CompletionQueue* cq) PURE;
139 : };
140 :
141 : using GoogleStubSharedPtr = std::shared_ptr<GoogleStub>;
142 :
143 : class GoogleGenericStub : public GoogleStub {
144 : public:
145 2 : GoogleGenericStub(std::shared_ptr<grpc::Channel> channel) : stub_(channel) {}
146 :
147 : std::unique_ptr<grpc::GenericClientAsyncReaderWriter>
148 : PrepareCall(grpc::ClientContext* context, const grpc::string& method,
149 2 : grpc::CompletionQueue* cq) override {
150 2 : return stub_.PrepareCall(context, method, cq);
151 2 : }
152 :
153 : private:
154 : grpc::GenericStub stub_;
155 : };
156 :
157 : // Interface to allow the gRPC stub creation to be mocked out by tests.
158 : class GoogleStubFactory {
159 : public:
160 2 : virtual ~GoogleStubFactory() = default;
161 :
162 : // Create a stub from a given channel.
163 : virtual GoogleStubSharedPtr createStub(std::shared_ptr<grpc::Channel> channel) PURE;
164 : };
165 :
166 : class GoogleGenericStubFactory : public GoogleStubFactory {
167 : public:
168 2 : GoogleStubSharedPtr createStub(std::shared_ptr<grpc::Channel> channel) override {
169 2 : return std::make_shared<GoogleGenericStub>(channel);
170 2 : }
171 : };
172 :
173 : // Google gRPC C++ client library implementation of Grpc::AsyncClient.
174 : class GoogleAsyncClientImpl final : public RawAsyncClient, Logger::Loggable<Logger::Id::grpc> {
175 : public:
176 : GoogleAsyncClientImpl(Event::Dispatcher& dispatcher, GoogleAsyncClientThreadLocal& tls,
177 : GoogleStubFactory& stub_factory, Stats::ScopeSharedPtr scope,
178 : const envoy::config::core::v3::GrpcService& config, Api::Api& api,
179 : const StatNames& stat_names);
180 : ~GoogleAsyncClientImpl() override;
181 :
182 : // Grpc::AsyncClient
183 : AsyncRequest* sendRaw(absl::string_view service_full_name, absl::string_view method_name,
184 : Buffer::InstancePtr&& request, RawAsyncRequestCallbacks& callbacks,
185 : Tracing::Span& parent_span,
186 : const Http::AsyncClient::RequestOptions& options) override;
187 : RawAsyncStream* startRaw(absl::string_view service_full_name, absl::string_view method_name,
188 : RawAsyncStreamCallbacks& callbacks,
189 : const Http::AsyncClient::StreamOptions& options) override;
190 0 : absl::string_view destination() override { return target_uri_; }
191 :
192 0 : TimeSource& timeSource() { return dispatcher_.timeSource(); }
193 0 : uint64_t perStreamBufferLimitBytes() const { return per_stream_buffer_limit_bytes_; }
194 :
195 : private:
196 : Event::Dispatcher& dispatcher_;
197 : GoogleAsyncClientThreadLocal& tls_;
198 : // This is shared with child streams, so that they can cleanup independent of
199 : // the client if it gets destructed. The streams need to wait for their tags
200 : // to drain from the CQ.
201 : GoogleStubSharedPtr stub_;
202 : std::list<GoogleAsyncStreamImplPtr> active_streams_;
203 : const std::string stat_prefix_;
204 : const std::string target_uri_;
205 : Stats::ScopeSharedPtr scope_;
206 : GoogleAsyncClientStats stats_;
207 : uint64_t per_stream_buffer_limit_bytes_;
208 : Router::HeaderParserPtr metadata_parser_;
209 :
210 : friend class GoogleAsyncClientThreadLocal;
211 : friend class GoogleAsyncRequestImpl;
212 : friend class GoogleAsyncStreamImpl;
213 : };
214 :
215 : class GoogleAsyncStreamImpl : public RawAsyncStream,
216 : public Event::DeferredDeletable,
217 : Logger::Loggable<Logger::Id::grpc>,
218 : public LinkedObject<GoogleAsyncStreamImpl> {
219 : public:
220 : GoogleAsyncStreamImpl(GoogleAsyncClientImpl& parent, absl::string_view service_full_name,
221 : absl::string_view method_name, RawAsyncStreamCallbacks& callbacks,
222 : const Http::AsyncClient::StreamOptions& options);
223 : ~GoogleAsyncStreamImpl() override;
224 :
225 : virtual void initialize(bool buffer_body_for_retry);
226 :
227 : // Grpc::RawAsyncStream
228 : void sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) override;
229 : void closeStream() override;
230 : void resetStream() override;
231 : // While the Google-gRPC code doesn't use Envoy watermark buffers, the logical
232 : // analog is to make sure that the aren't too many bytes in the pending write
233 : // queue.
234 0 : bool isAboveWriteBufferHighWatermark() const override {
235 0 : return bytes_in_write_pending_queue_ > parent_.perStreamBufferLimitBytes();
236 0 : }
237 0 : const StreamInfo::StreamInfo& streamInfo() const override { return unused_stream_info_; }
238 :
239 : protected:
240 2 : bool callFailed() const { return call_failed_; }
241 :
242 : private:
243 : // Process queued events in completed_ops_ with handleOpCompletion() on
244 : // GoogleAsyncClient silo thread.
245 : void onCompletedOps();
246 : // Handle Operation completion on GoogleAsyncClient silo thread. This is posted by
247 : // GoogleAsyncClientThreadLocal::completionThread() when a message is received on cq_.
248 : void handleOpCompletion(GoogleAsyncTag::Operation op, bool ok);
249 : // Convert from Google gRPC client std::multimap metadata to Envoy Http::HeaderMap.
250 : void metadataTranslate(const std::multimap<grpc::string_ref, grpc::string_ref>& grpc_metadata,
251 : Http::HeaderMap& header_map);
252 : // Write the first PendingMessage in the write queue if non-empty.
253 : void writeQueued();
254 : // Deliver notification and update stats when the connection closes.
255 : void notifyRemoteClose(Status::GrpcStatus grpc_status,
256 : Http::ResponseTrailerMapPtr trailing_metadata, const std::string& message);
257 : // Schedule stream for deferred deletion.
258 : void deferredDelete();
259 : // Cleanup and schedule stream for deferred deletion if no inflight
260 : // completions.
261 : void cleanup();
262 :
263 : // Pending serialized message on write queue. Only one Operation::Write is in-flight at any
264 : // point-in-time, so we queue pending writes here.
265 : struct PendingMessage {
266 : PendingMessage(Buffer::InstancePtr request, bool end_stream);
267 : // End-of-stream with no additional message.
268 0 : PendingMessage() = default;
269 :
270 : const absl::optional<grpc::ByteBuffer> buf_{};
271 : const bool end_stream_{true};
272 : };
273 :
274 : GoogleAsyncTag init_tag_{*this, GoogleAsyncTag::Operation::Init};
275 : GoogleAsyncTag read_initial_metadata_tag_{*this, GoogleAsyncTag::Operation::ReadInitialMetadata};
276 : GoogleAsyncTag read_tag_{*this, GoogleAsyncTag::Operation::Read};
277 : GoogleAsyncTag write_tag_{*this, GoogleAsyncTag::Operation::Write};
278 : GoogleAsyncTag write_last_tag_{*this, GoogleAsyncTag::Operation::WriteLast};
279 : GoogleAsyncTag finish_tag_{*this, GoogleAsyncTag::Operation::Finish};
280 :
281 : GoogleAsyncClientImpl& parent_;
282 : GoogleAsyncClientThreadLocal& tls_;
283 : // Latch our own version of this reference, so that completionThread() doesn't
284 : // try and access via parent_, which might not exist in teardown. We assume
285 : // that the dispatcher lives longer than completionThread() life, which should
286 : // hold for the expected server object lifetimes.
287 : Event::Dispatcher& dispatcher_;
288 : // We hold a ref count on the stub_ to allow the stream to wait for its tags
289 : // to drain from the CQ on cleanup.
290 : GoogleStubSharedPtr stub_;
291 : std::string service_full_name_;
292 : std::string method_name_;
293 : RawAsyncStreamCallbacks& callbacks_;
294 : const Http::AsyncClient::StreamOptions& options_;
295 : grpc::ClientContext ctxt_;
296 : std::unique_ptr<grpc::GenericClientAsyncReaderWriter> rw_;
297 : std::queue<PendingMessage> write_pending_queue_;
298 : uint64_t bytes_in_write_pending_queue_{};
299 : grpc::ByteBuffer read_buf_;
300 : grpc::Status status_;
301 : // Has Operation::Init completed?
302 : bool call_initialized_{};
303 : // Did the stub Call fail? If this is true, no Operation::Init completion will ever occur.
304 : bool call_failed_{};
305 : // Is there an Operation::Write[Last] in-flight?
306 : bool write_pending_{};
307 : // Is an Operation::Finish in-flight?
308 : bool finish_pending_{};
309 : // Have we entered CQ draining state? If so, we're just waiting for all our
310 : // ops on the CQ to drain away before freeing the stream.
311 : bool draining_cq_{};
312 : // Count of the tags in-flight. This must hit zero before the stream can be
313 : // freed.
314 : uint32_t inflight_tags_{};
315 :
316 : // This is unused.
317 : StreamInfo::StreamInfoImpl unused_stream_info_;
318 :
319 : // Queue of completed (op, ok) passed from completionThread() to
320 : // handleOpCompletion().
321 : std::deque<std::pair<GoogleAsyncTag::Operation, bool>>
322 : completed_ops_ ABSL_GUARDED_BY(completed_ops_lock_);
323 : Thread::MutexBasicLockable completed_ops_lock_;
324 :
325 : friend class GoogleAsyncClientImpl;
326 : friend class GoogleAsyncClientThreadLocal;
327 : };
328 :
329 : class GoogleAsyncRequestImpl : public AsyncRequest,
330 : public GoogleAsyncStreamImpl,
331 : RawAsyncStreamCallbacks {
332 : public:
333 : GoogleAsyncRequestImpl(GoogleAsyncClientImpl& parent, absl::string_view service_full_name,
334 : absl::string_view method_name, Buffer::InstancePtr request,
335 : RawAsyncRequestCallbacks& callbacks, Tracing::Span& parent_span,
336 : const Http::AsyncClient::RequestOptions& options);
337 :
338 : void initialize(bool buffer_body_for_retry) override;
339 :
340 : // Grpc::AsyncRequest
341 : void cancel() override;
342 :
343 : private:
344 : // Grpc::RawAsyncStreamCallbacks
345 : void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
346 : void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override;
347 : bool onReceiveMessageRaw(Buffer::InstancePtr&& response) override;
348 : void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override;
349 : void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;
350 :
351 : Buffer::InstancePtr request_;
352 : RawAsyncRequestCallbacks& callbacks_;
353 : Tracing::SpanPtr current_span_;
354 : Buffer::InstancePtr response_;
355 : };
356 :
357 : } // namespace Grpc
358 : } // namespace Envoy
|