1
#pragma once
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7

            
8
#include "envoy/buffer/buffer.h"
9
#include "envoy/config/route/v3/route_components.pb.h"
10
#include "envoy/event/dispatcher.h"
11
#include "envoy/http/filter.h"
12
#include "envoy/http/header_map.h"
13
#include "envoy/http/message.h"
14
#include "envoy/stream_info/filter_state.h"
15
#include "envoy/stream_info/stream_info.h"
16
#include "envoy/tracing/tracer.h"
17
#include "envoy/upstream/load_balancer.h"
18

            
19
#include "source/common/protobuf/protobuf.h"
20

            
21
#include "absl/types/optional.h"
22

            
23
namespace Envoy {
24
namespace Router {
25
class FilterConfig;
26
using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;
27
} // namespace Router
28
namespace Http {
29

            
30
/**
31
 * Callbacks for sidestream connection (from http async client) watermark limits.
32
 */
33
class SidestreamWatermarkCallbacks {
34
public:
35
96692
  virtual ~SidestreamWatermarkCallbacks() = default;
36

            
37
  /**
38
   * Called when the sidestream connection or stream goes over its high watermark. Note that this
39
   * may be called separately for both the stream going over and the connection going over. It
40
   * is the responsibility of the sidestreamWatermarkCallbacks implementation to handle unwinding
41
   * multiple high and low watermark calls.
42
   */
43
  virtual void onSidestreamAboveHighWatermark() PURE;
44

            
45
  /**
46
   * Called when the sidestream connection or stream goes from over its high watermark to under its
47
   * low watermark. As with onSidestreamAboveHighWatermark above, this may be called independently
48
   * when both the stream and the connection go under the low watermark limit, and the callee must
49
   * ensure that the flow of data does not resume until all callers which were above their high
50
   * watermarks have gone below.
51
   */
52
  virtual void onSidestreamBelowLowWatermark() PURE;
53

            
54
  /**
55
    Sidestream subscribes to downstream watermark events on the downstream stream and downstream
56
    connection.
57
  */
58
  virtual void addDownstreamWatermarkCallbacks(Http::DownstreamWatermarkCallbacks& callbacks) PURE;
59
  /**
60
    Sidestream stop subscribing to watermark events on the downstream stream and downstream
61
    connection.
62
   */
63
  virtual void
64
  removeDownstreamWatermarkCallbacks(Http::DownstreamWatermarkCallbacks& callbacks) PURE;
65
};
66

            
67
/**
68
 * Supports sending an HTTP request message and receiving a response asynchronously.
69
 */
70
class AsyncClient {
71
public:
72
  /**
73
   * An in-flight HTTP request.
74
   */
75
  class Request {
76
  public:
77
878
    virtual ~Request() = default;
78

            
79
    /**
80
     * Signals that the request should be cancelled.
81
     */
82
    virtual void cancel() PURE;
83
  };
84

            
85
  /**
86
   * Async Client failure reasons.
87
   */
88
  enum class FailureReason {
89
    // The stream has been reset.
90
    Reset,
91
    // The stream exceeds the response buffer limit.
92
    ExceedResponseBufferLimit
93
  };
94

            
95
  /**
96
   * Notifies caller of async HTTP request status.
97
   *
98
   * To support a use case where a caller makes multiple requests in parallel,
99
   * individual callback methods provide request context corresponding to that response.
100
   */
101
  class Callbacks {
102
  public:
103
16766
    virtual ~Callbacks() = default;
104

            
105
    /**
106
     * Called when the async HTTP request succeeds.
107
     * @param request  request handle.
108
     *                 NOTE: request handle is passed for correlation purposes only, e.g.
109
     *                 for client code to be able to exclude that handle from a list of
110
     *                 requests in progress.
111
     * @param response the HTTP response
112
     */
113
    virtual void onSuccess(const Request& request, ResponseMessagePtr&& response) PURE;
114

            
115
    /**
116
     * Called when the async HTTP request fails.
117
     * @param request request handle.
118
     *                NOTE: request handle is passed for correlation purposes only, e.g.
119
     *                for client code to be able to exclude that handle from a list of
120
     *                requests in progress.
121
     * @param reason  failure reason
122
     */
123
    virtual void onFailure(const Request& request, FailureReason reason) PURE;
124

            
125
    /**
126
     * Called before finalizing upstream span when the request is complete or reset.
127
     * @param span a tracing span to fill with extra tags.
128
     * @param response_headers the response headers.
129
     */
130
    virtual void onBeforeFinalizeUpstreamSpan(Envoy::Tracing::Span& span,
131
                                              const Http::ResponseHeaderMap* response_headers) PURE;
132
  };
133

            
134
  /**
135
   * Notifies caller of async HTTP stream status.
136
   * Note the HTTP stream is full-duplex, even if the local to remote stream has been ended
137
   * by Stream.sendHeaders/sendData with end_stream=true or sendTrailers,
138
   * StreamCallbacks can continue to receive events until the remote to local stream is closed,
139
   * and vice versa.
140
   */
141
  class StreamCallbacks {
142
  public:
143
3391
    virtual ~StreamCallbacks() = default;
144

            
145
    /**
146
     * Called when all headers get received on the async HTTP stream.
147
     * @param headers the headers received
148
     * @param end_stream whether the response is header only
149
     */
150
    virtual void onHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) PURE;
151

            
152
    /**
153
     * Called when a data frame get received on the async HTTP stream.
154
     * This can be invoked multiple times if the data get streamed.
155
     * @param data the data received
156
     * @param end_stream whether the data is the last data frame
157
     */
158
    virtual void onData(Buffer::Instance& data, bool end_stream) PURE;
159

            
160
    /**
161
     * Called when all trailers get received on the async HTTP stream.
162
     * @param trailers the trailers received.
163
     */
164
    virtual void onTrailers(ResponseTrailerMapPtr&& trailers) PURE;
165

            
166
    /**
167
     * Called when both the local and remote have gracefully closed the stream.
168
     * Useful for asymmetric cases where end_stream may not be bidirectionally observable.
169
     * Note this is NOT called on stream reset.
170
     */
171
    virtual void onComplete() PURE;
172

            
173
    /**
174
     * Called when the async HTTP stream is reset.
175
     */
176
    virtual void onReset() PURE;
177
  };
178

            
179
  using StreamDestructorCallbacks = std::function<void()>;
180

            
181
  /**
182
   * An in-flight HTTP stream.
183
   */
184
  class Stream {
185
  public:
186
3214
    virtual ~Stream() = default;
187

            
188
    /***
189
     * Send headers to the stream. This method cannot be invoked more than once and
190
     * need to be called before sendData.
191
     * @param headers supplies the headers to send.
192
     * @param end_stream supplies whether this is a header only request.
193
     */
194
    virtual void sendHeaders(RequestHeaderMap& headers, bool end_stream) PURE;
195

            
196
    /***
197
     * Send data to the stream. This method can be invoked multiple times if it get streamed.
198
     * To end the stream without data, call this method with empty buffer.
199
     * @param data supplies the data to send.
200
     * @param end_stream supplies whether this is the last data.
201
     */
202
    virtual void sendData(Buffer::Instance& data, bool end_stream) PURE;
203

            
204
    /***
205
     * Send trailers. This method cannot be invoked more than once, and implicitly ends the stream.
206
     * @param trailers supplies the trailers to send.
207
     */
208
    virtual void sendTrailers(RequestTrailerMap& trailers) PURE;
209

            
210
    /***
211
     * Reset the stream.
212
     */
213
    virtual void reset() PURE;
214

            
215
    /***
216
     * Register callback to be called on stream destruction. This callback must persist beyond the
217
     * lifetime of the stream or be unregistered via removeDestructorCallback. If there's already a
218
     * destructor callback registered, this method will ASSERT-fail.
219
     */
220
    virtual void setDestructorCallback(StreamDestructorCallbacks callback) PURE;
221

            
222
    /***
223
     * Remove previously set destructor callback. Calling this without having previously set a
224
     * Destructor callback will ASSERT-fail.
225
     */
226
    virtual void removeDestructorCallback() PURE;
227

            
228
    /***
229
     * Register a callback to be called when high/low write buffer watermark events occur on the
230
     * stream. This callback must persist beyond the lifetime of the stream or be unregistered via
231
     * removeWatermarkCallbacks. If there's already a watermark callback registered, this method
232
     * will trigger ENVOY_BUG.
233
     */
234
    virtual void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) PURE;
235

            
236
    /***
237
     * Remove previously set watermark callbacks. If there's no watermark callback registered, this
238
     * method will trigger ENVOY_BUG.
239
     */
240
    virtual void removeWatermarkCallbacks() PURE;
241

            
242
    /***
243
     * @returns if the stream has enough buffered outbound data to be over the configured buffer
244
     * limits
245
     */
246
    virtual bool isAboveWriteBufferHighWatermark() const PURE;
247

            
248
    /***
249
     * @returns the stream info object associated with the stream.
250
     */
251
    virtual const StreamInfo::StreamInfo& streamInfo() const PURE;
252
    virtual StreamInfo::StreamInfo& streamInfo() PURE;
253
  };
254

            
255
  /***
256
   * An in-flight HTTP request to which additional data and trailers can be sent, as well as resets
257
   * and other stream-cancelling events. Must be terminated by sending trailers or data with
258
   * end_stream.
259
   */
260
  class OngoingRequest : public virtual Request, public virtual Stream {
261
  public:
262
    /***
263
     * Takes ownership of trailers, and sends it to the underlying stream.
264
     * @param trailers owned trailers to pass to upstream.
265
     */
266
    virtual void captureAndSendTrailers(RequestTrailerMapPtr&& trailers) PURE;
267
  };
268

            
269
45337
  virtual ~AsyncClient() = default;
270

            
271
  /**
272
   * A context from the caller of an async client.
273
   */
274
  struct ParentContext {
275
    const StreamInfo::StreamInfo* stream_info{nullptr};
276
  };
277

            
278
  /**
279
   * A structure to hold the options for AsyncStream object.
280
   */
281
  struct StreamOptions {
282
421
    StreamOptions& setTimeout(const absl::optional<std::chrono::milliseconds>& v) {
283
421
      timeout = v;
284
421
      return *this;
285
421
    }
286
832
    StreamOptions& setTimeout(const std::chrono::milliseconds& v) {
287
832
      timeout = v;
288
832
      return *this;
289
832
    }
290
3539
    StreamOptions& setBufferBodyForRetry(bool v) {
291
3539
      buffer_body_for_retry = v;
292
3539
      return *this;
293
3539
    }
294
2770
    StreamOptions& setSendXff(bool v) {
295
2770
      send_xff = v;
296
2770
      return *this;
297
2770
    }
298
2550
    StreamOptions& setSendInternal(bool v) {
299
2550
      send_internal = v;
300
2550
      return *this;
301
2550
    }
302
    StreamOptions& setHashPolicy(
303
106
        const Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy>& v) {
304
106
      hash_policy = v;
305
106
      return *this;
306
106
    }
307
1615
    StreamOptions& setParentContext(const ParentContext& v) {
308
1615
      parent_context = v;
309
1615
      return *this;
310
1615
    }
311
    // Set dynamic metadata of async stream. If a metadata record with filter name 'envoy.lb' is
312
    // provided, metadata match criteria of async stream route will be overridden by the metadata
313
    // and then used by the subset load balancer.
314
68
    StreamOptions& setMetadata(const envoy::config::core::v3::Metadata& m) {
315
68
      metadata = m;
316
68
      return *this;
317
68
    }
318

            
319
    // Set FilterState on async stream allowing upstream filters to access it.
320
1
    StreamOptions& setFilterState(Envoy::StreamInfo::FilterStateSharedPtr fs) {
321
1
      filter_state = fs;
322
1
      return *this;
323
1
    }
324

            
325
    // Set buffer restriction and accounting for the stream.
326
93
    StreamOptions& setBufferAccount(const Buffer::BufferMemoryAccountSharedPtr& account) {
327
93
      account_ = account;
328
93
      return *this;
329
93
    }
330
117
    StreamOptions& setBufferLimit(uint64_t limit) {
331
117
      buffer_limit_ = limit;
332
117
      return *this;
333
117
    }
334

            
335
    // this should be done with setBufferedBodyForRetry=true ?
336
    // The retry policy can be set as either a proto or Router::RetryPolicy but
337
    // not both. If both formats of the options are set, the more recent call
338
    // will overwrite the older one.
339
36
    StreamOptions& setRetryPolicy(const envoy::config::route::v3::RetryPolicy& p) {
340
36
      retry_policy = p;
341
36
      parsed_retry_policy = nullptr;
342
36
      return *this;
343
36
    }
344

            
345
    // The retry policy can be set as either a proto or Router::RetryPolicy but
346
    // not both. If both formats of the options are set, the more recent call
347
    // will overwrite the older one.
348
48
    StreamOptions& setRetryPolicy(Router::RetryPolicyConstSharedPtr p) {
349
48
      parsed_retry_policy = std::move(p);
350
48
      retry_policy = absl::nullopt;
351
48
      return *this;
352
48
    }
353
93
    StreamOptions& setFilterConfig(const Router::FilterConfigSharedPtr& config) {
354
93
      filter_config_ = config;
355
93
      return *this;
356
93
    }
357

            
358
181
    StreamOptions& setIsShadow(bool s) {
359
181
      is_shadow = s;
360
181
      return *this;
361
181
    }
362

            
363
125
    StreamOptions& setDiscardResponseBody(bool discard) {
364
125
      discard_response_body = discard;
365
125
      return *this;
366
125
    }
367

            
368
96
    StreamOptions& setIsShadowSuffixDisabled(bool d) {
369
96
      is_shadow_suffixed_disabled = d;
370
96
      return *this;
371
96
    }
372

            
373
1422
    StreamOptions& setParentSpan(Tracing::Span& parent_span) {
374
1422
      parent_span_ = &parent_span;
375
1422
      return *this;
376
1422
    }
377
285
    StreamOptions& setChildSpanName(const std::string& child_span_name) {
378
285
      child_span_name_ = child_span_name;
379
285
      return *this;
380
285
    }
381
1304
    StreamOptions& setSampled(absl::optional<bool> sampled) {
382
1304
      sampled_ = sampled;
383
1304
      return *this;
384
1304
    }
385
913
    StreamOptions& setSidestreamWatermarkCallbacks(SidestreamWatermarkCallbacks* callbacks) {
386
913
      sidestream_watermark_callbacks = callbacks;
387
913
      return *this;
388
913
    }
389
4
    StreamOptions& setOnDeleteCallbacksForTestOnly(std::function<void()> callback) {
390
4
      on_delete_callback_for_test_only = callback;
391
4
      return *this;
392
4
    }
393
1134
    StreamOptions& setRemoteCloseTimeout(std::chrono::milliseconds timeout) {
394
1134
      remote_close_timeout = timeout;
395
1134
      return *this;
396
1134
    }
397
    StreamOptions&
398
2
    setUpstreamOverrideHost(const Upstream::LoadBalancerContext::OverrideHost& host) {
399
2
      upstream_override_host_ = host;
400
2
      return *this;
401
2
    }
402

            
403
    // For gmock test
404
18
    bool operator==(const StreamOptions& src) const {
405
18
      return timeout == src.timeout && buffer_body_for_retry == src.buffer_body_for_retry &&
406
18
             send_xff == src.send_xff && send_internal == src.send_internal &&
407
18
             parent_span_ == src.parent_span_ && child_span_name_ == src.child_span_name_ &&
408
18
             sampled_ == src.sampled_;
409
18
    }
410

            
411
    // The timeout supplies the stream timeout, measured since when the frame with
412
    // end_stream flag is sent until when the first frame is received.
413
    absl::optional<std::chrono::milliseconds> timeout;
414

            
415
    // The buffer_body_for_retry specifies whether the streamed body will be buffered so that
416
    // it can be retried. In general, this should be set to false for a true stream. However,
417
    // streaming is also used in certain cases such as gRPC unary calls, where retry can
418
    // still be useful.
419
    bool buffer_body_for_retry{false};
420

            
421
    // If true, x-forwarded-for header will be added.
422
    bool send_xff{true};
423

            
424
    // If true, x-envoy-internal header will be added.
425
    bool send_internal{true};
426

            
427
    // Provides the hash policy for hashing load balancing strategies.
428
    Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy> hash_policy;
429

            
430
    // Provides parent context. Currently, this holds stream info from the caller.
431
    ParentContext parent_context;
432

            
433
    envoy::config::core::v3::Metadata metadata;
434
    Envoy::StreamInfo::FilterStateSharedPtr filter_state;
435

            
436
    // Buffer memory account for tracking bytes.
437
    Buffer::BufferMemoryAccountSharedPtr account_{nullptr};
438

            
439
    absl::optional<uint64_t> buffer_limit_;
440

            
441
    absl::optional<envoy::config::route::v3::RetryPolicy> retry_policy;
442
    Router::RetryPolicyConstSharedPtr parsed_retry_policy;
443

            
444
    Router::FilterConfigSharedPtr filter_config_;
445

            
446
    bool is_shadow{false};
447

            
448
    bool is_shadow_suffixed_disabled{false};
449
    bool discard_response_body{false};
450

            
451
    // The parent span that child spans are created under to trace egress requests/responses.
452
    // If not set, requests will not be traced.
453
    Tracing::Span* parent_span_{nullptr};
454
    // The name to give to the child span that represents the async http request.
455
    // If left empty and parent_span_ is set, then the default name will have the cluster name.
456
    // Only used if parent_span_ is set.
457
    std::string child_span_name_{""};
458
    // Sampling decision for the tracing span. The span is sampled by default.
459
    absl::optional<bool> sampled_{true};
460
    // The pointer to sidestream watermark callbacks. Optional, nullptr by default.
461
    Http::SidestreamWatermarkCallbacks* sidestream_watermark_callbacks = nullptr;
462

            
463
    // The amount of tiem to wait for server to half-close its stream after client
464
    // has half-closed its stream.
465
    // Defaults to 1 second.
466
    std::chrono::milliseconds remote_close_timeout{1000};
467

            
468
    // This callback is invoked when AsyncStream object is deleted.
469
    // Test only use to validate deferred deletion.
470
    std::function<void()> on_delete_callback_for_test_only;
471

            
472
    // Optional upstream override host for bypassing load balancer selection
473
    absl::optional<Upstream::LoadBalancerContext::OverrideHost> upstream_override_host_;
474
  };
475

            
476
  /**
477
   * A structure to hold the options for AsyncRequest object.
478
   */
479
  using RequestOptions = StreamOptions;
480

            
481
  /**
482
   * Send an HTTP request asynchronously
483
   * @param request the request to send.
484
   * @param callbacks the callbacks to be notified of request status.
485
   * @param options the data struct to control the request sending.
486
   * @return a request handle or nullptr if no request could be created. NOTE: In this case
487
   *         onFailure() has already been called inline. The client owns the request and the
488
   *         handle should just be used to cancel.
489
   */
490

            
491
  virtual Request* send(RequestMessagePtr&& request, Callbacks& callbacks,
492
                        const RequestOptions& options) PURE;
493

            
494
  /**
495
   * Starts a new OngoingRequest asynchronously with the given headers.
496
   *
497
   * @param request_headers headers to send.
498
   * @param callbacks the callbacks to be notified of request status.
499
   * @param options the data struct to control the request sending.
500
   * @return a request handle or nullptr if no request could be created. See note attached to
501
   * `send`. Calling startRequest will not trigger end stream. For header-only requests, `send`
502
   * should be called instead.
503
   */
504
  virtual OngoingRequest* startRequest(RequestHeaderMapPtr&& request_headers, Callbacks& callbacks,
505
                                       const RequestOptions& options) PURE;
506

            
507
  /**
508
   * Start an HTTP stream asynchronously, without an associated HTTP request.
509
   * @param callbacks the callbacks to be notified of stream status.
510
   * @param options the data struct to control the stream.
511
   * @return a stream handle or nullptr if no stream could be started. NOTE: In this case
512
   *         onResetStream() has already been called inline. The client owns the stream and
513
   *         the handle can be used to send more messages or close the stream.
514
   */
515
  virtual Stream* start(StreamCallbacks& callbacks, const StreamOptions& options) PURE;
516

            
517
  /**
518
   * @return Event::Dispatcher& the dispatcher backing this client.
519
   */
520
  virtual Event::Dispatcher& dispatcher() PURE;
521
};
522

            
523
using AsyncClientPtr = std::unique_ptr<AsyncClient>;
524

            
525
} // namespace Http
526
} // namespace Envoy