Line data Source code
1 : #pragma once 2 : 3 : #include <chrono> 4 : #include <memory> 5 : 6 : #include "envoy/buffer/buffer.h" 7 : #include "envoy/config/route/v3/route_components.pb.h" 8 : #include "envoy/event/dispatcher.h" 9 : #include "envoy/http/filter.h" 10 : #include "envoy/http/header_map.h" 11 : #include "envoy/http/message.h" 12 : #include "envoy/stream_info/stream_info.h" 13 : #include "envoy/tracing/tracer.h" 14 : 15 : #include "source/common/protobuf/protobuf.h" 16 : 17 : #include "absl/types/optional.h" 18 : 19 : namespace Envoy { 20 : namespace Router { 21 : class FilterConfig; 22 : } 23 : namespace Http { 24 : 25 : /** 26 : * Supports sending an HTTP request message and receiving a response asynchronously. 27 : */ 28 : class AsyncClient { 29 : public: 30 : /** 31 : * An in-flight HTTP request. 32 : */ 33 : class Request { 34 : public: 35 23 : virtual ~Request() = default; 36 : 37 : /** 38 : * Signals that the request should be cancelled. 39 : */ 40 : virtual void cancel() PURE; 41 : }; 42 : 43 : /** 44 : * Async Client failure reasons. 45 : */ 46 : enum class FailureReason { 47 : // The stream has been reset. 48 : Reset 49 : }; 50 : 51 : /** 52 : * Notifies caller of async HTTP request status. 53 : * 54 : * To support a use case where a caller makes multiple requests in parallel, 55 : * individual callback methods provide request context corresponding to that response. 56 : */ 57 : class Callbacks { 58 : public: 59 178 : virtual ~Callbacks() = default; 60 : 61 : /** 62 : * Called when the async HTTP request succeeds. 63 : * @param request request handle. 64 : * NOTE: request handle is passed for correlation purposes only, e.g. 65 : * for client code to be able to exclude that handle from a list of 66 : * requests in progress. 67 : * @param response the HTTP response 68 : */ 69 : virtual void onSuccess(const Request& request, ResponseMessagePtr&& response) PURE; 70 : 71 : /** 72 : * Called when the async HTTP request fails. 73 : * @param request request handle. 74 : * NOTE: request handle is passed for correlation purposes only, e.g. 75 : * for client code to be able to exclude that handle from a list of 76 : * requests in progress. 77 : * @param reason failure reason 78 : */ 79 : virtual void onFailure(const Request& request, FailureReason reason) PURE; 80 : 81 : /** 82 : * Called before finalizing upstream span when the request is complete or reset. 83 : * @param span a tracing span to fill with extra tags. 84 : * @param response_headers the response headers. 85 : */ 86 : virtual void onBeforeFinalizeUpstreamSpan(Envoy::Tracing::Span& span, 87 : const Http::ResponseHeaderMap* response_headers) PURE; 88 : }; 89 : 90 : /** 91 : * Notifies caller of async HTTP stream status. 92 : * Note the HTTP stream is full-duplex, even if the local to remote stream has been ended 93 : * by Stream.sendHeaders/sendData with end_stream=true or sendTrailers, 94 : * StreamCallbacks can continue to receive events until the remote to local stream is closed, 95 : * and vice versa. 96 : */ 97 : class StreamCallbacks { 98 : public: 99 68 : virtual ~StreamCallbacks() = default; 100 : 101 : /** 102 : * Called when all headers get received on the async HTTP stream. 103 : * @param headers the headers received 104 : * @param end_stream whether the response is header only 105 : */ 106 : virtual void onHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) PURE; 107 : 108 : /** 109 : * Called when a data frame get received on the async HTTP stream. 110 : * This can be invoked multiple times if the data get streamed. 111 : * @param data the data received 112 : * @param end_stream whether the data is the last data frame 113 : */ 114 : virtual void onData(Buffer::Instance& data, bool end_stream) PURE; 115 : 116 : /** 117 : * Called when all trailers get received on the async HTTP stream. 118 : * @param trailers the trailers received. 119 : */ 120 : virtual void onTrailers(ResponseTrailerMapPtr&& trailers) PURE; 121 : 122 : /** 123 : * Called when both the local and remote have gracefully closed the stream. 124 : * Useful for asymmetric cases where end_stream may not be bidirectionally observable. 125 : * Note this is NOT called on stream reset. 126 : */ 127 : virtual void onComplete() PURE; 128 : 129 : /** 130 : * Called when the async HTTP stream is reset. 131 : */ 132 : virtual void onReset() PURE; 133 : }; 134 : 135 : using StreamDestructorCallbacks = std::function<void()>; 136 : 137 : /** 138 : * An in-flight HTTP stream. 139 : */ 140 : class Stream { 141 : public: 142 68 : virtual ~Stream() = default; 143 : 144 : /*** 145 : * Send headers to the stream. This method cannot be invoked more than once and 146 : * need to be called before sendData. 147 : * @param headers supplies the headers to send. 148 : * @param end_stream supplies whether this is a header only request. 149 : */ 150 : virtual void sendHeaders(RequestHeaderMap& headers, bool end_stream) PURE; 151 : 152 : /*** 153 : * Send data to the stream. This method can be invoked multiple times if it get streamed. 154 : * To end the stream without data, call this method with empty buffer. 155 : * @param data supplies the data to send. 156 : * @param end_stream supplies whether this is the last data. 157 : */ 158 : virtual void sendData(Buffer::Instance& data, bool end_stream) PURE; 159 : 160 : /*** 161 : * Send trailers. This method cannot be invoked more than once, and implicitly ends the stream. 162 : * @param trailers supplies the trailers to send. 163 : */ 164 : virtual void sendTrailers(RequestTrailerMap& trailers) PURE; 165 : 166 : /*** 167 : * Reset the stream. 168 : */ 169 : virtual void reset() PURE; 170 : 171 : /*** 172 : * Register callback to be called on stream destruction. This callback must persist beyond the 173 : * lifetime of the stream or be unregistered via removeDestructorCallback. If there's already a 174 : * destructor callback registered, this method will ASSERT-fail. 175 : */ 176 : virtual void setDestructorCallback(StreamDestructorCallbacks callback) PURE; 177 : 178 : /*** 179 : * Remove previously set destructor callback. Calling this without having previously set a 180 : * Destructor callback will ASSERT-fail. 181 : */ 182 : virtual void removeDestructorCallback() PURE; 183 : 184 : /*** 185 : * Register a callback to be called when high/low write buffer watermark events occur on the 186 : * stream. This callback must persist beyond the lifetime of the stream or be unregistered via 187 : * removeWatermarkCallbacks. If there's already a watermark callback registered, this method 188 : * will ASSERT-fail. 189 : */ 190 : virtual void setWatermarkCallbacks(DecoderFilterWatermarkCallbacks& callbacks) PURE; 191 : 192 : /*** 193 : * Remove previously set watermark callbacks. 194 : */ 195 : virtual void removeWatermarkCallbacks() PURE; 196 : 197 : /*** 198 : * @returns if the stream has enough buffered outbound data to be over the configured buffer 199 : * limits 200 : */ 201 : virtual bool isAboveWriteBufferHighWatermark() const PURE; 202 : 203 : /*** 204 : * @returns the stream info object associated with the stream. 205 : */ 206 : virtual const StreamInfo::StreamInfo& streamInfo() const PURE; 207 : }; 208 : 209 : /*** 210 : * An in-flight HTTP request to which additional data and trailers can be sent, as well as resets 211 : * and other stream-cancelling events. Must be terminated by sending trailers or data with 212 : * end_stream. 213 : */ 214 : class OngoingRequest : public virtual Request, public virtual Stream { 215 : public: 216 : /*** 217 : * Takes ownership of trailers, and sends it to the underlying stream. 218 : * @param trailers owned trailers to pass to upstream. 219 : */ 220 : virtual void captureAndSendTrailers(RequestTrailerMapPtr&& trailers) PURE; 221 : }; 222 : 223 241 : virtual ~AsyncClient() = default; 224 : 225 : /** 226 : * A context from the caller of an async client. 227 : */ 228 : struct ParentContext { 229 : const StreamInfo::StreamInfo* stream_info{nullptr}; 230 : }; 231 : 232 : /** 233 : * A structure to hold the options for AsyncStream object. 234 : */ 235 : struct StreamOptions { 236 0 : StreamOptions& setTimeout(const absl::optional<std::chrono::milliseconds>& v) { 237 0 : timeout = v; 238 0 : return *this; 239 0 : } 240 3 : StreamOptions& setTimeout(const std::chrono::milliseconds& v) { 241 3 : timeout = v; 242 3 : return *this; 243 3 : } 244 68 : StreamOptions& setBufferBodyForRetry(bool v) { 245 68 : buffer_body_for_retry = v; 246 68 : return *this; 247 68 : } 248 0 : StreamOptions& setSendXff(bool v) { 249 0 : send_xff = v; 250 0 : return *this; 251 0 : } 252 : StreamOptions& setHashPolicy( 253 0 : const Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy>& v) { 254 0 : hash_policy = v; 255 0 : return *this; 256 0 : } 257 40 : StreamOptions& setParentContext(const ParentContext& v) { 258 40 : parent_context = v; 259 40 : return *this; 260 40 : } 261 : // Set dynamic metadata of async stream. If a metadata record with filter name 'envoy.lb' is 262 : // provided, metadata match criteria of async stream route will be overridden by the metadata 263 : // and then used by the subset load balancer. 264 0 : StreamOptions& setMetadata(const envoy::config::core::v3::Metadata& m) { 265 0 : metadata = m; 266 0 : return *this; 267 0 : } 268 : 269 : // Set buffer restriction and accounting for the stream. 270 0 : StreamOptions& setBufferAccount(const Buffer::BufferMemoryAccountSharedPtr& account) { 271 0 : account_ = account; 272 0 : return *this; 273 0 : } 274 0 : StreamOptions& setBufferLimit(uint32_t limit) { 275 0 : buffer_limit_ = limit; 276 0 : return *this; 277 0 : } 278 : 279 : // this should be done with setBufferedBodyForRetry=true ? 280 0 : StreamOptions& setRetryPolicy(const envoy::config::route::v3::RetryPolicy& p) { 281 0 : retry_policy = p; 282 0 : return *this; 283 0 : } 284 0 : StreamOptions& setFilterConfig(Router::FilterConfig& config) { 285 0 : filter_config_ = config; 286 0 : return *this; 287 0 : } 288 : 289 0 : StreamOptions& setIsShadow(bool s) { 290 0 : is_shadow = s; 291 0 : return *this; 292 0 : } 293 : 294 : // For gmock test 295 0 : bool operator==(const StreamOptions& src) const { 296 0 : return timeout == src.timeout && buffer_body_for_retry == src.buffer_body_for_retry && 297 0 : send_xff == src.send_xff; 298 0 : } 299 : 300 : // The timeout supplies the stream timeout, measured since when the frame with 301 : // end_stream flag is sent until when the first frame is received. 302 : absl::optional<std::chrono::milliseconds> timeout; 303 : 304 : // The buffer_body_for_retry specifies whether the streamed body will be buffered so that 305 : // it can be retried. In general, this should be set to false for a true stream. However, 306 : // streaming is also used in certain cases such as gRPC unary calls, where retry can 307 : // still be useful. 308 : bool buffer_body_for_retry{false}; 309 : 310 : // If true, x-forwarded-for header will be added. 311 : bool send_xff{true}; 312 : 313 : // Provides the hash policy for hashing load balancing strategies. 314 : Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy> hash_policy; 315 : 316 : // Provides parent context. Currently, this holds stream info from the caller. 317 : ParentContext parent_context; 318 : 319 : envoy::config::core::v3::Metadata metadata; 320 : 321 : // Buffer memory account for tracking bytes. 322 : Buffer::BufferMemoryAccountSharedPtr account_{nullptr}; 323 : 324 : absl::optional<uint32_t> buffer_limit_; 325 : 326 : absl::optional<envoy::config::route::v3::RetryPolicy> retry_policy; 327 : 328 : OptRef<Router::FilterConfig> filter_config_; 329 : 330 : bool is_shadow{false}; 331 : }; 332 : 333 : /** 334 : * A structure to hold the options for AsyncRequest object. 335 : */ 336 : struct RequestOptions : public StreamOptions { 337 0 : RequestOptions& setTimeout(const absl::optional<std::chrono::milliseconds>& v) { 338 0 : StreamOptions::setTimeout(v); 339 0 : return *this; 340 0 : } 341 3 : RequestOptions& setTimeout(const std::chrono::milliseconds& v) { 342 3 : StreamOptions::setTimeout(v); 343 3 : return *this; 344 3 : } 345 0 : RequestOptions& setBufferBodyForRetry(bool v) { 346 0 : StreamOptions::setBufferBodyForRetry(v); 347 0 : return *this; 348 0 : } 349 0 : RequestOptions& setSendXff(bool v) { 350 0 : StreamOptions::setSendXff(v); 351 0 : return *this; 352 0 : } 353 : RequestOptions& setHashPolicy( 354 0 : const Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy>& v) { 355 0 : StreamOptions::setHashPolicy(v); 356 0 : return *this; 357 0 : } 358 0 : RequestOptions& setParentContext(const ParentContext& v) { 359 0 : StreamOptions::setParentContext(v); 360 0 : return *this; 361 0 : } 362 0 : RequestOptions& setMetadata(const envoy::config::core::v3::Metadata& m) { 363 0 : StreamOptions::setMetadata(m); 364 0 : return *this; 365 0 : } 366 0 : RequestOptions& setRetryPolicy(const envoy::config::route::v3::RetryPolicy& p) { 367 0 : StreamOptions::setRetryPolicy(p); 368 0 : return *this; 369 0 : } 370 0 : RequestOptions& setIsShadow(bool s) { 371 0 : StreamOptions::setIsShadow(s); 372 0 : return *this; 373 0 : } 374 3 : RequestOptions& setParentSpan(Tracing::Span& parent_span) { 375 3 : parent_span_ = &parent_span; 376 3 : return *this; 377 3 : } 378 3 : RequestOptions& setChildSpanName(const std::string& child_span_name) { 379 3 : child_span_name_ = child_span_name; 380 3 : return *this; 381 3 : } 382 0 : RequestOptions& setSampled(absl::optional<bool> sampled) { 383 0 : sampled_ = sampled; 384 0 : return *this; 385 0 : } 386 0 : RequestOptions& setBufferAccount(const Buffer::BufferMemoryAccountSharedPtr& account) { 387 0 : account_ = account; 388 0 : return *this; 389 0 : } 390 0 : RequestOptions& setBufferLimit(uint32_t limit) { 391 0 : buffer_limit_ = limit; 392 0 : return *this; 393 0 : } 394 : 395 : // For gmock test 396 0 : bool operator==(const RequestOptions& src) const { 397 0 : return StreamOptions::operator==(src) && parent_span_ == src.parent_span_ && 398 0 : child_span_name_ == src.child_span_name_ && sampled_ == src.sampled_; 399 0 : } 400 : 401 : // The parent span that child spans are created under to trace egress requests/responses. 402 : // If not set, requests will not be traced. 403 : Tracing::Span* parent_span_{nullptr}; 404 : // The name to give to the child span that represents the async http request. 405 : // If left empty and parent_span_ is set, then the default name will have the cluster name. 406 : // Only used if parent_span_ is set. 407 : std::string child_span_name_{""}; 408 : // Sampling decision for the tracing span. The span is sampled by default. 409 : absl::optional<bool> sampled_{true}; 410 : }; 411 : 412 : /** 413 : * Send an HTTP request asynchronously 414 : * @param request the request to send. 415 : * @param callbacks the callbacks to be notified of request status. 416 : * @param options the data struct to control the request sending. 417 : * @return a request handle or nullptr if no request could be created. NOTE: In this case 418 : * onFailure() has already been called inline. The client owns the request and the 419 : * handle should just be used to cancel. 420 : */ 421 : 422 : virtual Request* send(RequestMessagePtr&& request, Callbacks& callbacks, 423 : const RequestOptions& options) PURE; 424 : 425 : /** 426 : * Starts a new OngoingRequest asynchronously with the given headers. 427 : * 428 : * @param request_headers headers to send. 429 : * @param callbacks the callbacks to be notified of request status. 430 : * @param options the data struct to control the request sending. 431 : * @return a request handle or nullptr if no request could be created. See note attached to 432 : * `send`. Calling startRequest will not trigger end stream. For header-only requests, `send` 433 : * should be called instead. 434 : */ 435 : virtual OngoingRequest* startRequest(RequestHeaderMapPtr&& request_headers, Callbacks& callbacks, 436 : const RequestOptions& options) PURE; 437 : 438 : /** 439 : * Start an HTTP stream asynchronously, without an associated HTTP request. 440 : * @param callbacks the callbacks to be notified of stream status. 441 : * @param options the data struct to control the stream. 442 : * @return a stream handle or nullptr if no stream could be started. NOTE: In this case 443 : * onResetStream() has already been called inline. The client owns the stream and 444 : * the handle can be used to send more messages or close the stream. 445 : */ 446 : virtual Stream* start(StreamCallbacks& callbacks, const StreamOptions& options) PURE; 447 : 448 : /** 449 : * @return Event::Dispatcher& the dispatcher backing this client. 450 : */ 451 : virtual Event::Dispatcher& dispatcher() PURE; 452 : }; 453 : 454 : using AsyncClientPtr = std::unique_ptr<AsyncClient>; 455 : 456 : } // namespace Http 457 : } // namespace Envoy