1
#pragma once
2

            
3
#include <chrono>
4

            
5
#include "envoy/buffer/buffer.h"
6
#include "envoy/common/pure.h"
7
#include "envoy/grpc/status.h"
8
#include "envoy/http/async_client.h"
9
#include "envoy/http/header_map.h"
10
#include "envoy/stream_info/stream_info.h"
11
#include "envoy/tracing/tracer.h"
12

            
13
#include "source/common/common/assert.h"
14
#include "source/common/protobuf/protobuf.h"
15

            
16
#include "absl/types/optional.h"
17

            
18
namespace Envoy {
19
namespace Grpc {
20

            
21
/**
22
 * An in-flight gRPC unary RPC.
23
 */
24
class AsyncRequest {
25
public:
26
1083
  virtual ~AsyncRequest() = default;
27

            
28
  /**
29
   * Signals that the request should be cancelled. No further callbacks will be invoked.
30
   */
31
  virtual void cancel() PURE;
32

            
33
  /**
34
   * Returns the underlying stream info.
35
   */
36
  virtual const StreamInfo::StreamInfo& streamInfo() const PURE;
37

            
38
  /**
39
   * Detach the pending request. This is used for the case where we send a side
40
   * request but never cancel it even if the related downstream main request is
41
   * completed.
42
   *
43
   * This will will clean up all context associated with downstream request like
44
   * downstream stream info, parent tracing span, and so on, to avoid potential
45
   * dangling references.
46
   *
47
   * NOTE: the callbacks that registered to take the response will be kept to do
48
   * some clean up or operations when response arrives. The caller is responsible
49
   * for ensuring that the callbacks have enough lifetime.
50
   */
51
  virtual void detach() PURE;
52
};
53

            
54
/**
55
 * An in-flight gRPC stream.
56
 */
57
class RawAsyncStream {
58
public:
59
4426
  virtual ~RawAsyncStream() = default;
60

            
61
  /**
62
   * Send request message to the stream.
63
   * @param request serialized message.
64
   * @param end_stream close the stream locally. No further methods may be invoked on the stream
65
   *                   object, but callbacks may still be received until the stream is closed
66
   *                   remotely.
67
   */
68
  virtual void sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) PURE;
69

            
70
  /**
71
   * Close the stream locally and send an empty DATA frame to the remote. No further methods may be
72
   * invoked on the stream object, but callbacks may still be received until the stream is closed
73
   * remotely.
74
   */
75
  virtual void closeStream() PURE;
76

            
77
  /**
78
   * Close the stream locally and remotely (as needed). No further methods may be invoked on the
79
   * stream object and no further callbacks will be invoked.
80
   */
81
  virtual void resetStream() PURE;
82

            
83
  /**
84
   * Wait for the server to half-close its stream and then delete the RawAsyncStream object. No
85
   * further methods may be invoked on the stream object and no further callbacks will be invoked.
86
   * The server is expected to half-close within the interval specific in the StreamOptions,
87
   * otherwise the stream is reset.
88
   */
89
  virtual void waitForRemoteCloseAndDelete() PURE;
90

            
91
  /***
92
   * @returns if the stream has enough buffered outbound data to be over the configured buffer
93
   * limits
94
   */
95
  virtual bool isAboveWriteBufferHighWatermark() const PURE;
96

            
97
  /**
98
   * @returns the stream info object associated with this stream.
99
   */
100
  virtual const StreamInfo::StreamInfo& streamInfo() const PURE;
101
  virtual StreamInfo::StreamInfo& streamInfo() PURE;
102

            
103
  /***
104
   * Register a callback to be called when high/low write buffer watermark events occur on the
105
   * stream. This callback must persist beyond the lifetime of the stream or be unregistered via
106
   * removeWatermarkCallbacks. If there's already a watermark callback registered, this method
107
   * will trigger ENVOY_BUG.
108
   */
109
  virtual void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) PURE;
110

            
111
  /***
112
   * Remove previously set watermark callbacks. If there's no watermark callback registered, this
113
   * method will trigger ENVOY_BUG.
114
   */
115
  virtual void removeWatermarkCallbacks() PURE;
116
};
117

            
118
class RawAsyncRequestCallbacks {
119
public:
120
612
  virtual ~RawAsyncRequestCallbacks() = default;
121

            
122
  /**
123
   * Called when populating the headers to send with initial metadata.
124
   * @param metadata initial metadata reference.
125
   */
126
  virtual void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) PURE;
127

            
128
  /**
129
   * Called when the async gRPC request succeeds. No further callbacks will be invoked.
130
   * @param response the gRPC response bytes.
131
   * @param span a tracing span to fill with extra tags.
132
   */
133
  virtual void onSuccessRaw(Buffer::InstancePtr&& response, Tracing::Span& span) PURE;
134

            
135
  /**
136
   * Called when the async gRPC request fails. No further callbacks will be invoked.
137
   * @param status the gRPC status.
138
   * @param message the gRPC status message or empty string if not present.
139
   * @param span a tracing span to fill with extra tags.
140
   */
141
  virtual void onFailure(Status::GrpcStatus status, const std::string& message,
142
                         Tracing::Span& span) PURE;
143
};
144

            
145
/**
146
 * Notifies caller of async gRPC stream status.
147
 * Note the gRPC stream is full-duplex, even if the local to remote stream has been ended by
148
 * AsyncStream.close(), AsyncStreamCallbacks can continue to receive events until the remote
149
 * to local stream is closed (onRemoteClose), and vice versa. Once the stream is closed remotely, no
150
 * further callbacks will be invoked.
151
 */
152
class RawAsyncStreamCallbacks {
153
public:
154
3954
  virtual ~RawAsyncStreamCallbacks() = default;
155

            
156
  /**
157
   * Called when populating the headers to send with initial metadata.
158
   * @param metadata initial metadata reference.
159
   */
160
  virtual void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) PURE;
161

            
162
  /**
163
   * Called when initial metadata is received. This will be called with empty metadata on a
164
   * trailers-only response, followed by onReceiveTrailingMetadata() with the trailing metadata.
165
   * @param metadata initial metadata reference.
166
   */
167
  virtual void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) PURE;
168

            
169
  /**
170
   * Called when an async gRPC message is received.
171
   * @param response the gRPC message.
172
   * @return bool which is true if the message well formed and false otherwise which will cause
173
              the stream to shutdown with an INTERNAL error.
174
   */
175
  virtual bool onReceiveMessageRaw(Buffer::InstancePtr&& response) PURE;
176

            
177
  /**
178
   * Called when trailing metadata is received. This will also be called on non-Ok grpc-status
179
   * stream termination.
180
   * @param metadata trailing metadata reference.
181
   */
182
  virtual void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) PURE;
183

            
184
  /**
185
   * Called when the remote closes or an error occurs on the gRPC stream. The stream is
186
   * considered remotely closed after this invocation and no further callbacks will be
187
   * invoked. In addition, no further stream operations are permitted.
188
   * @param status the gRPC status.
189
   * @param message the gRPC status message or empty string if not present.
190
   */
191
  virtual void onRemoteClose(Status::GrpcStatus status, const std::string& message) PURE;
192
};
193

            
194
/**
195
 * Supports sending gRPC requests and receiving responses asynchronously. This can be used to
196
 * implement either plain gRPC or streaming gRPC calls.
197
 */
198
class RawAsyncClient {
199
public:
200
3488
  virtual ~RawAsyncClient() = default;
201

            
202
  /**
203
   * Start a gRPC unary RPC asynchronously.
204
   * @param service_full_name full name of the service (i.e. service_method.service()->full_name()).
205
   * @param method_name name of the method (i.e. service_method.name()).
206
   * @param request serialized message.
207
   * @param callbacks the callbacks to be notified of RPC status.
208
   * @param parent_span the current parent tracing context.
209
   * @param options the data struct to control the request sending.
210
   * @return a request handle or nullptr if no request could be started. NOTE: In this case
211
   *         onFailure() has already been called inline. The client owns the request and the
212
   *         handle should just be used to cancel.
213
   */
214
  virtual AsyncRequest* sendRaw(absl::string_view service_full_name, absl::string_view method_name,
215
                                Buffer::InstancePtr&& request, RawAsyncRequestCallbacks& callbacks,
216
                                Tracing::Span& parent_span,
217
                                const Http::AsyncClient::RequestOptions& options) PURE;
218

            
219
  /**
220
   * Start a gRPC stream asynchronously.
221
   * @param service_full_name full name of the service (i.e. service_method.service()->full_name()).
222
   * @param method_name name of the method (i.e. service_method.name()).
223
   * @param callbacks the callbacks to be notified of stream status.
224
   * @param options the data struct to control the stream.
225
   * @return a stream handle or nullptr if no stream could be started. NOTE: In this case
226
   *         onRemoteClose() has already been called inline. The client owns the stream and
227
   *         the handle can be used to send more messages or finish the stream. It is expected that
228
   *         closeStream() is invoked by the caller to notify the client that the stream resources
229
   *         may be reclaimed.
230
   */
231
  virtual RawAsyncStream* startRaw(absl::string_view service_full_name,
232
                                   absl::string_view method_name,
233
                                   RawAsyncStreamCallbacks& callbacks,
234
                                   const Http::AsyncClient::StreamOptions& options) PURE;
235

            
236
  /**
237
   * Returns the name of the cluster, or other destination/target, of the client.
238
   */
239
  virtual absl::string_view destination() PURE;
240

            
241
protected:
242
  // The lifetime of RawAsyncClient must be in the same thread.
243
  bool isThreadSafe() { return thread_id_ == std::this_thread::get_id(); }
244
  std::thread::id thread_id_{std::this_thread::get_id()};
245
};
246

            
247
using RawAsyncClientPtr = std::unique_ptr<RawAsyncClient>;
248
using RawAsyncClientSharedPtr = std::shared_ptr<RawAsyncClient>;
249

            
250
} // namespace Grpc
251
} // namespace Envoy