Line data Source code
1 : #pragma once 2 : 3 : #include <chrono> 4 : 5 : #include "envoy/buffer/buffer.h" 6 : #include "envoy/common/pure.h" 7 : #include "envoy/grpc/status.h" 8 : #include "envoy/http/async_client.h" 9 : #include "envoy/http/header_map.h" 10 : #include "envoy/stream_info/stream_info.h" 11 : #include "envoy/tracing/tracer.h" 12 : 13 : #include "source/common/common/assert.h" 14 : #include "source/common/protobuf/protobuf.h" 15 : 16 : #include "absl/types/optional.h" 17 : 18 : namespace Envoy { 19 : namespace Grpc { 20 : 21 : /** 22 : * An in-flight gRPC unary RPC. 23 : */ 24 : class AsyncRequest { 25 : public: 26 1 : virtual ~AsyncRequest() = default; 27 : 28 : /** 29 : * Signals that the request should be cancelled. No further callbacks will be invoked. 30 : */ 31 : virtual void cancel() PURE; 32 : }; 33 : 34 : /** 35 : * An in-flight gRPC stream. 36 : */ 37 : class RawAsyncStream { 38 : public: 39 70 : virtual ~RawAsyncStream() = default; 40 : 41 : /** 42 : * Send request message to the stream. 43 : * @param request serialized message. 44 : * @param end_stream close the stream locally. No further methods may be invoked on the stream 45 : * object, but callbacks may still be received until the stream is closed 46 : * remotely. 47 : */ 48 : virtual void sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) PURE; 49 : 50 : /** 51 : * Close the stream locally and send an empty DATA frame to the remote. No further methods may be 52 : * invoked on the stream object, but callbacks may still be received until the stream is closed 53 : * remotely. 54 : */ 55 : virtual void closeStream() PURE; 56 : 57 : /** 58 : * Close the stream locally and remotely (as needed). No further methods may be invoked on the 59 : * stream object and no further callbacks will be invoked. 60 : */ 61 : virtual void resetStream() PURE; 62 : 63 : /*** 64 : * @returns if the stream has enough buffered outbound data to be over the configured buffer 65 : * limits 66 : */ 67 : virtual bool isAboveWriteBufferHighWatermark() const PURE; 68 : 69 : /** 70 : * @returns the stream info object associated with this stream. 71 : */ 72 : virtual const StreamInfo::StreamInfo& streamInfo() const PURE; 73 : }; 74 : 75 : class RawAsyncRequestCallbacks { 76 : public: 77 0 : virtual ~RawAsyncRequestCallbacks() = default; 78 : 79 : /** 80 : * Called when populating the headers to send with initial metadata. 81 : * @param metadata initial metadata reference. 82 : */ 83 : virtual void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) PURE; 84 : 85 : /** 86 : * Called when the async gRPC request succeeds. No further callbacks will be invoked. 87 : * @param response the gRPC response bytes. 88 : * @param span a tracing span to fill with extra tags. 89 : */ 90 : virtual void onSuccessRaw(Buffer::InstancePtr&& response, Tracing::Span& span) PURE; 91 : 92 : /** 93 : * Called when the async gRPC request fails. No further callbacks will be invoked. 94 : * @param status the gRPC status. 95 : * @param message the gRPC status message or empty string if not present. 96 : * @param span a tracing span to fill with extra tags. 97 : */ 98 : virtual void onFailure(Status::GrpcStatus status, const std::string& message, 99 : Tracing::Span& span) PURE; 100 : }; 101 : 102 : /** 103 : * Notifies caller of async gRPC stream status. 104 : * Note the gRPC stream is full-duplex, even if the local to remote stream has been ended by 105 : * AsyncStream.close(), AsyncStreamCallbacks can continue to receive events until the remote 106 : * to local stream is closed (onRemoteClose), and vice versa. Once the stream is closed remotely, no 107 : * further callbacks will be invoked. 108 : */ 109 : class RawAsyncStreamCallbacks { 110 : public: 111 70 : virtual ~RawAsyncStreamCallbacks() = default; 112 : 113 : /** 114 : * Called when populating the headers to send with initial metadata. 115 : * @param metadata initial metadata reference. 116 : */ 117 : virtual void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) PURE; 118 : 119 : /** 120 : * Called when initial metadata is received. This will be called with empty metadata on a 121 : * trailers-only response, followed by onReceiveTrailingMetadata() with the trailing metadata. 122 : * @param metadata initial metadata reference. 123 : */ 124 : virtual void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) PURE; 125 : 126 : /** 127 : * Called when an async gRPC message is received. 128 : * @param response the gRPC message. 129 : * @return bool which is true if the message well formed and false otherwise which will cause 130 : the stream to shutdown with an INTERNAL error. 131 : */ 132 : virtual bool onReceiveMessageRaw(Buffer::InstancePtr&& response) PURE; 133 : 134 : /** 135 : * Called when trailing metadata is received. This will also be called on non-Ok grpc-status 136 : * stream termination. 137 : * @param metadata trailing metadata reference. 138 : */ 139 : virtual void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) PURE; 140 : 141 : /** 142 : * Called when the remote closes or an error occurs on the gRPC stream. The stream is 143 : * considered remotely closed after this invocation and no further callbacks will be 144 : * invoked. In addition, no further stream operations are permitted. 145 : * @param status the gRPC status. 146 : * @param message the gRPC status message or empty string if not present. 147 : */ 148 : virtual void onRemoteClose(Status::GrpcStatus status, const std::string& message) PURE; 149 : }; 150 : 151 : /** 152 : * Supports sending gRPC requests and receiving responses asynchronously. This can be used to 153 : * implement either plain gRPC or streaming gRPC calls. 154 : */ 155 : class RawAsyncClient { 156 : public: 157 35 : virtual ~RawAsyncClient() = default; 158 : 159 : /** 160 : * Start a gRPC unary RPC asynchronously. 161 : * @param service_full_name full name of the service (i.e. service_method.service()->full_name()). 162 : * @param method_name name of the method (i.e. service_method.name()). 163 : * @param request serialized message. 164 : * @param callbacks the callbacks to be notified of RPC status. 165 : * @param parent_span the current parent tracing context. 166 : * @param options the data struct to control the request sending. 167 : * @return a request handle or nullptr if no request could be started. NOTE: In this case 168 : * onFailure() has already been called inline. The client owns the request and the 169 : * handle should just be used to cancel. 170 : */ 171 : virtual AsyncRequest* sendRaw(absl::string_view service_full_name, absl::string_view method_name, 172 : Buffer::InstancePtr&& request, RawAsyncRequestCallbacks& callbacks, 173 : Tracing::Span& parent_span, 174 : const Http::AsyncClient::RequestOptions& options) PURE; 175 : 176 : /** 177 : * Start a gRPC stream asynchronously. 178 : * TODO(mattklein123): Determine if tracing should be added to streaming requests. 179 : * @param service_full_name full name of the service (i.e. service_method.service()->full_name()). 180 : * @param method_name name of the method (i.e. service_method.name()). 181 : * @param callbacks the callbacks to be notified of stream status. 182 : * @param options the data struct to control the stream. 183 : * @return a stream handle or nullptr if no stream could be started. NOTE: In this case 184 : * onRemoteClose() has already been called inline. The client owns the stream and 185 : * the handle can be used to send more messages or finish the stream. It is expected that 186 : * closeStream() is invoked by the caller to notify the client that the stream resources 187 : * may be reclaimed. 188 : */ 189 : virtual RawAsyncStream* startRaw(absl::string_view service_full_name, 190 : absl::string_view method_name, 191 : RawAsyncStreamCallbacks& callbacks, 192 : const Http::AsyncClient::StreamOptions& options) PURE; 193 : 194 : /** 195 : * Returns the name of the cluster, or other destination/target, of the client. 196 : */ 197 : virtual absl::string_view destination() PURE; 198 : 199 : protected: 200 : // The lifetime of RawAsyncClient must be in the same thread. 201 0 : bool isThreadSafe() { return thread_id_ == std::this_thread::get_id(); } 202 : std::thread::id thread_id_{std::this_thread::get_id()}; 203 : }; 204 : 205 : using RawAsyncClientPtr = std::unique_ptr<RawAsyncClient>; 206 : using RawAsyncClientSharedPtr = std::shared_ptr<RawAsyncClient>; 207 : 208 : } // namespace Grpc 209 : } // namespace Envoy