1
#pragma once
2

            
3
#include <memory>
4
#include <queue>
5

            
6
#include "envoy/api/api.h"
7
#include "envoy/common/platform.h"
8
#include "envoy/config/core/v3/base.pb.h"
9
#include "envoy/config/core/v3/grpc_service.pb.h"
10
#include "envoy/grpc/async_client.h"
11
#include "envoy/stats/scope.h"
12
#include "envoy/stream_info/stream_info.h"
13
#include "envoy/thread/thread.h"
14
#include "envoy/thread_local/thread_local_object.h"
15
#include "envoy/tracing/tracer.h"
16

            
17
#include "source/common/common/linked_object.h"
18
#include "source/common/common/thread.h"
19
#include "source/common/common/thread_annotations.h"
20
#include "source/common/grpc/google_grpc_context.h"
21
#include "source/common/grpc/stat_names.h"
22
#include "source/common/grpc/typed_async_client.h"
23
#include "source/common/router/header_parser.h"
24
#include "source/common/stream_info/stream_info_impl.h"
25
#include "source/common/tracing/http_tracer_impl.h"
26

            
27
#include "absl/container/node_hash_set.h"
28
#include "grpcpp/generic/generic_stub.h"
29
#include "grpcpp/grpcpp.h"
30
#include "grpcpp/support/proto_buffer_writer.h"
31

            
32
namespace Envoy {
33
namespace Grpc {
34

            
35
class GoogleAsyncStreamImpl;
36

            
37
using GoogleAsyncStreamImplPtr = std::unique_ptr<GoogleAsyncStreamImpl>;
38

            
39
class GoogleAsyncRequestImpl;
40

            
41
struct GoogleAsyncTag {
42
  // Operation defines tags that are handed to the gRPC AsyncReaderWriter for use in completion
43
  // notification for their namesake operations. Read* and Write* operations may be outstanding
44
  // simultaneously, but there will be no more than one operation of each type in-flight for a given
45
  // stream. Init and Finish will both be issued exclusively when no other operations are in-flight
46
  // for a stream. See
47
  // https://github.com/grpc/grpc/blob/master/include/grpc%2B%2B/impl/codegen/async_stream.h for
48
  // further insight into the semantics of the different gRPC client operations.
49
  enum Operation {
50
    // Initial stub call issued, waiting for initialization to complete.
51
    Init = 0,
52
    // Waiting for initial meta-data from server following Init completion.
53
    ReadInitialMetadata,
54
    // Waiting for response protobuf from server following ReadInitialMetadata completion.
55
    Read,
56
    // Waiting for write of request protobuf to server to complete.
57
    Write,
58
    // Waiting for write of request protobuf (EOS) __OR__ an EOS WritesDone to server to complete.
59
    WriteLast,
60
    // Waiting for final status. This must only be issued once all Read* and Write* operations have
61
    // completed.
62
    Finish,
63
  };
64

            
65
7428
  GoogleAsyncTag(GoogleAsyncStreamImpl& stream, Operation op) : stream_(stream), op_(op) {}
66

            
67
  GoogleAsyncStreamImpl& stream_;
68
  const Operation op_;
69
};
70

            
71
class GoogleAsyncClientThreadLocal : public ThreadLocal::ThreadLocalObject,
72
                                     Logger::Loggable<Logger::Id::grpc> {
73
public:
74
  GoogleAsyncClientThreadLocal(Api::Api& api);
75
  ~GoogleAsyncClientThreadLocal() override;
76

            
77
1238
  grpc::CompletionQueue& completionQueue() { return cq_; }
78

            
79
1234
  void registerStream(GoogleAsyncStreamImpl* stream) {
80
1234
    ASSERT(streams_.find(stream) == streams_.end());
81
1234
    streams_.insert(stream);
82
1234
  }
83

            
84
1234
  void unregisterStream(GoogleAsyncStreamImpl* stream) {
85
1234
    auto it = streams_.find(stream);
86
1234
    ASSERT(it != streams_.end());
87
1234
    streams_.erase(it);
88
1234
  }
89

            
90
private:
91
  void completionThread();
92

            
93
  // There is blanket google-grpc initialization in MainCommonBase, but that
94
  // doesn't cover unit tests. However, putting blanket coverage in ProcessWide
95
  // causes background threaded memory allocation in all unit tests making it
96
  // hard to measure memory. Thus we also initialize grpc using our idempotent
97
  // wrapper-class in classes that need it. See
98
  // https://github.com/envoyproxy/envoy/issues/8282 for details.
99
  GoogleGrpcContext google_grpc_context_;
100

            
101
  // The CompletionQueue for in-flight operations. This must precede completion_thread_ to ensure it
102
  // is constructed before the thread runs.
103
  grpc::CompletionQueue cq_;
104
  // The threading model for the Google gRPC C++ library is not directly compatible with Envoy's
105
  // siloed model. We resolve this by issuing non-blocking asynchronous
106
  // operations on the GoogleAsyncClientImpl silo thread, and then synchronously
107
  // blocking on a completion queue, cq_, on a distinct thread. When cq_ events
108
  // are delivered, we cross-post to the silo dispatcher to continue the
109
  // operation.
110
  //
111
  // We have an independent completion thread for each TLS silo (i.e. one per worker and
112
  // also one for the main thread).
113
  Thread::ThreadPtr completion_thread_;
114
  // Track all streams that are currently using this CQ, so we can notify them
115
  // on shutdown.
116
  absl::node_hash_set<GoogleAsyncStreamImpl*> streams_;
117
};
118

            
119
using GoogleAsyncClientThreadLocalPtr = std::unique_ptr<GoogleAsyncClientThreadLocal>;
120

            
121
// Google gRPC client stats. TODO(htuch): consider how a wider set of stats collected by the
122
// library, such as the census related ones, can be externalized as needed.
123
struct GoogleAsyncClientStats {
124
  // .streams_total
125
  Stats::Counter* streams_total_;
126
  // .streams_closed_<gRPC status code>
127
  std::array<Stats::Counter*, Status::WellKnownGrpcStatus::MaximumKnown + 1> streams_closed_;
128
};
129

            
130
// Interface to allow the gRPC stub to be mocked out by tests.
131
class GoogleStub {
132
public:
133
1090
  virtual ~GoogleStub() = default;
134

            
135
  // See grpc::PrepareCall().
136
  virtual std::unique_ptr<grpc::GenericClientAsyncReaderWriter>
137
  PrepareCall(grpc::ClientContext* context, const grpc::string& method,
138
              grpc::CompletionQueue* cq) PURE;
139
};
140

            
141
using GoogleStubSharedPtr = std::shared_ptr<GoogleStub>;
142

            
143
class GoogleGenericStub : public GoogleStub {
144
public:
145
1084
  GoogleGenericStub(std::shared_ptr<grpc::Channel> channel) : stub_(channel) {}
146

            
147
  std::unique_ptr<grpc::GenericClientAsyncReaderWriter>
148
  PrepareCall(grpc::ClientContext* context, const grpc::string& method,
149
1234
              grpc::CompletionQueue* cq) override {
150
1234
    return stub_.PrepareCall(context, method, cq);
151
1234
  }
152

            
153
private:
154
  grpc::GenericStub stub_;
155
};
156

            
157
// Interface to allow the gRPC stub creation to be mocked out by tests.
158
class GoogleStubFactory {
159
public:
160
8
  virtual ~GoogleStubFactory() = default;
161

            
162
  // Create a stub from a given channel.
163
  virtual GoogleStubSharedPtr createStub(std::shared_ptr<grpc::Channel> channel) PURE;
164
};
165

            
166
class GoogleGenericStubFactory : public GoogleStubFactory {
167
public:
168
1084
  GoogleStubSharedPtr createStub(std::shared_ptr<grpc::Channel> channel) override {
169
1084
    return std::make_shared<GoogleGenericStub>(channel);
170
1084
  }
171
};
172

            
173
// Google gRPC C++ client library implementation of Grpc::AsyncClient.
174
class GoogleAsyncClientImpl final : public RawAsyncClient, Logger::Loggable<Logger::Id::grpc> {
175
public:
176
  GoogleAsyncClientImpl(Event::Dispatcher& dispatcher, GoogleAsyncClientThreadLocal& tls,
177
                        GoogleStubFactory& stub_factory, Stats::ScopeSharedPtr scope,
178
                        const envoy::config::core::v3::GrpcService& config,
179
                        Server::Configuration::CommonFactoryContext& context,
180
                        const StatNames& stat_names);
181
  ~GoogleAsyncClientImpl() override;
182

            
183
  // Grpc::AsyncClient
184
  AsyncRequest* sendRaw(absl::string_view service_full_name, absl::string_view method_name,
185
                        Buffer::InstancePtr&& request, RawAsyncRequestCallbacks& callbacks,
186
                        Tracing::Span& parent_span,
187
                        const Http::AsyncClient::RequestOptions& options) override;
188
  RawAsyncStream* startRaw(absl::string_view service_full_name, absl::string_view method_name,
189
                           RawAsyncStreamCallbacks& callbacks,
190
                           const Http::AsyncClient::StreamOptions& options) override;
191
  absl::string_view destination() override { return target_uri_; }
192

            
193
413
  TimeSource& timeSource() { return dispatcher_.timeSource(); }
194
18
  uint64_t perStreamBufferLimitBytes() const { return per_stream_buffer_limit_bytes_; }
195

            
196
private:
197
  Event::Dispatcher& dispatcher_;
198
  GoogleAsyncClientThreadLocal& tls_;
199
  // This is shared with child streams, so that they can cleanup independent of
200
  // the client if it gets destructed. The streams need to wait for their tags
201
  // to drain from the CQ.
202
  GoogleStubSharedPtr stub_;
203
  std::list<GoogleAsyncStreamImplPtr> active_streams_;
204
  const std::string stat_prefix_;
205
  const std::string target_uri_;
206
  Stats::ScopeSharedPtr scope_;
207
  GoogleAsyncClientStats stats_;
208
  uint64_t per_stream_buffer_limit_bytes_;
209
  Router::HeaderParserPtr metadata_parser_;
210

            
211
  friend class GoogleAsyncClientThreadLocal;
212
  friend class GoogleAsyncRequestImpl;
213
  friend class GoogleAsyncStreamImpl;
214
};
215

            
216
class GoogleAsyncStreamImpl : public RawAsyncStream,
217
                              public Event::DeferredDeletable,
218
                              Logger::Loggable<Logger::Id::grpc>,
219
                              public LinkedObject<GoogleAsyncStreamImpl> {
220
public:
221
  GoogleAsyncStreamImpl(GoogleAsyncClientImpl& parent, absl::string_view service_full_name,
222
                        absl::string_view method_name, RawAsyncStreamCallbacks& callbacks,
223
                        const Http::AsyncClient::StreamOptions& options);
224
  ~GoogleAsyncStreamImpl() override;
225

            
226
  virtual void initialize(bool buffer_body_for_retry);
227

            
228
  // Grpc::RawAsyncStream
229
  void sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) override;
230
  void closeStream() override;
231
  void resetStream() override;
232
  void waitForRemoteCloseAndDelete() override;
233
  // While the Google-gRPC code doesn't use Envoy watermark buffers, the logical
234
  // analog is to make sure that the aren't too many bytes in the pending write
235
  // queue.
236
18
  bool isAboveWriteBufferHighWatermark() const override {
237
18
    return bytes_in_write_pending_queue_ > parent_.perStreamBufferLimitBytes();
238
18
  }
239
56
  const StreamInfo::StreamInfo& streamInfo() const override { return unused_stream_info_; }
240
8
  StreamInfo::StreamInfo& streamInfo() override { return unused_stream_info_; }
241

            
242
  // Google-gRPC code doesn't use Envoy watermark buffers, so the functions below are not used.
243
  void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks&) override {}
244
209
  void removeWatermarkCallbacks() override {}
245

            
246
protected:
247
1414
  bool callFailed() const { return call_failed_; }
248

            
249
private:
250
  // Process queued events in completed_ops_ with handleOpCompletion() on
251
  // GoogleAsyncClient silo thread.
252
  void onCompletedOps();
253
  // Handle Operation completion on GoogleAsyncClient silo thread. This is posted by
254
  // GoogleAsyncClientThreadLocal::completionThread() when a message is received on cq_.
255
  void handleOpCompletion(GoogleAsyncTag::Operation op, bool ok);
256
  // Convert from Google gRPC client std::multimap metadata to Envoy Http::HeaderMap.
257
  void metadataTranslate(const std::multimap<grpc::string_ref, grpc::string_ref>& grpc_metadata,
258
                         Http::HeaderMap& header_map);
259
  // Write the first PendingMessage in the write queue if non-empty.
260
  void writeQueued();
261
  // Deliver notification and update stats when the connection closes.
262
  void notifyRemoteClose(Status::GrpcStatus grpc_status,
263
                         Http::ResponseTrailerMapPtr trailing_metadata, const std::string& message);
264
  // Schedule stream for deferred deletion.
265
  void deferredDelete();
266
  // Cleanup and schedule stream for deferred deletion if no inflight
267
  // completions.
268
  void cleanup();
269

            
270
  // Pending serialized message on write queue. Only one Operation::Write is in-flight at any
271
  // point-in-time, so we queue pending writes here.
272
  struct PendingMessage {
273
    PendingMessage(Buffer::InstancePtr request, bool end_stream);
274
    // End-of-stream with no additional message.
275
212
    PendingMessage() = default;
276

            
277
    const absl::optional<grpc::ByteBuffer> buf_;
278
    const bool end_stream_{true};
279
  };
280

            
281
protected:
282
  GoogleAsyncTag init_tag_{*this, GoogleAsyncTag::Operation::Init};
283
  GoogleAsyncTag read_initial_metadata_tag_{*this, GoogleAsyncTag::Operation::ReadInitialMetadata};
284
  GoogleAsyncTag read_tag_{*this, GoogleAsyncTag::Operation::Read};
285
  GoogleAsyncTag write_tag_{*this, GoogleAsyncTag::Operation::Write};
286
  GoogleAsyncTag write_last_tag_{*this, GoogleAsyncTag::Operation::WriteLast};
287
  GoogleAsyncTag finish_tag_{*this, GoogleAsyncTag::Operation::Finish};
288

            
289
  GoogleAsyncClientImpl& parent_;
290
  GoogleAsyncClientThreadLocal& tls_;
291
  // Latch our own version of this reference, so that completionThread() doesn't
292
  // try and access via parent_, which might not exist in teardown. We assume
293
  // that the dispatcher lives longer than completionThread() life, which should
294
  // hold for the expected server object lifetimes.
295
  Event::Dispatcher& dispatcher_;
296
  // We hold a ref count on the stub_ to allow the stream to wait for its tags
297
  // to drain from the CQ on cleanup.
298
  GoogleStubSharedPtr stub_;
299
  std::string service_full_name_;
300
  std::string method_name_;
301
  RawAsyncStreamCallbacks& callbacks_;
302
  Http::AsyncClient::StreamOptions options_;
303
  grpc::ClientContext ctxt_;
304
  std::unique_ptr<grpc::GenericClientAsyncReaderWriter> rw_;
305
  std::queue<PendingMessage> write_pending_queue_;
306
  uint64_t bytes_in_write_pending_queue_{};
307
  grpc::ByteBuffer read_buf_;
308
  grpc::Status status_;
309
  // Has Operation::Init completed?
310
  bool call_initialized_{};
311
  // Did the stub Call fail? If this is true, no Operation::Init completion will ever occur.
312
  bool call_failed_{};
313
  // Is there an Operation::Write[Last] in-flight?
314
  bool write_pending_{};
315
  // Is an Operation::Finish in-flight?
316
  bool finish_pending_{};
317
  // Have we entered CQ draining state? If so, we're just waiting for all our
318
  // ops on the CQ to drain away before freeing the stream.
319
  bool draining_cq_{};
320
  bool waiting_to_delete_on_remote_close_{};
321
  // Count of the tags in-flight. This must hit zero before the stream can be
322
  // freed.
323
  uint32_t inflight_tags_{};
324

            
325
  Tracing::SpanPtr current_span_;
326
  // This is unused.
327
  StreamInfo::StreamInfoImpl unused_stream_info_;
328

            
329
  // Queue of completed (op, ok) passed from completionThread() to
330
  // handleOpCompletion().
331
  std::deque<std::pair<GoogleAsyncTag::Operation, bool>>
332
      completed_ops_ ABSL_GUARDED_BY(completed_ops_lock_);
333
  Thread::MutexBasicLockable completed_ops_lock_;
334
  Event::TimerPtr remote_close_timer_;
335

            
336
  friend class GoogleAsyncClientImpl;
337
  friend class GoogleAsyncClientThreadLocal;
338
};
339

            
340
class GoogleAsyncRequestImpl : public AsyncRequest,
341
                               public GoogleAsyncStreamImpl,
342
                               RawAsyncStreamCallbacks {
343
public:
344
  GoogleAsyncRequestImpl(GoogleAsyncClientImpl& parent, absl::string_view service_full_name,
345
                         absl::string_view method_name, Buffer::InstancePtr request,
346
                         RawAsyncRequestCallbacks& callbacks, Tracing::Span& parent_span,
347
                         const Http::AsyncClient::RequestOptions& options);
348

            
349
  void initialize(bool buffer_body_for_retry) override;
350

            
351
  // Grpc::AsyncRequest
352
  void cancel() override;
353
56
  const StreamInfo::StreamInfo& streamInfo() const override {
354
56
    return GoogleAsyncStreamImpl::streamInfo();
355
56
  }
356
  void detach() override;
357

            
358
private:
359
  using GoogleAsyncStreamImpl::streamInfo;
360
  // Grpc::RawAsyncStreamCallbacks
361
  void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
362
  void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override;
363
  bool onReceiveMessageRaw(Buffer::InstancePtr&& response) override;
364
  void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override;
365
  void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;
366

            
367
  Buffer::InstancePtr request_;
368
  RawAsyncRequestCallbacks& callbacks_;
369
  Tracing::SpanPtr current_span_;
370
  Buffer::InstancePtr response_;
371
};
372

            
373
} // namespace Grpc
374
} // namespace Envoy