1
#pragma once
2

            
3
#include <cstdint>
4
#include <list>
5
#include <memory>
6

            
7
#include "envoy/common/random_generator.h"
8
#include "envoy/event/deferred_deletable.h"
9
#include "envoy/event/timer.h"
10
#include "envoy/http/codec.h"
11
#include "envoy/http/header_validator.h"
12
#include "envoy/network/connection.h"
13
#include "envoy/network/filter.h"
14
#include "envoy/upstream/upstream.h"
15

            
16
#include "source/common/common/assert.h"
17
#include "source/common/common/linked_object.h"
18
#include "source/common/common/logger.h"
19
#include "source/common/http/codec_wrappers.h"
20
#include "source/common/network/filter_impl.h"
21

            
22
namespace Envoy {
23
namespace Http {
24

            
25
/**
26
 * Callbacks specific to a codec client.
27
 */
28
class CodecClientCallbacks {
29
public:
30
25018
  virtual ~CodecClientCallbacks() = default;
31

            
32
  // Called in onPreDecodeComplete
33
53212
  virtual void onStreamPreDecodeComplete() {}
34

            
35
  /**
36
   * Called every time an owned stream is destroyed, whether complete or not.
37
   */
38
  virtual void onStreamDestroy() PURE;
39

            
40
  /**
41
   * Called when a stream is reset by the client.
42
   * @param reason supplies the reset reason.
43
   */
44
  virtual void onStreamReset(StreamResetReason reason) PURE;
45
};
46

            
47
/**
48
 * This is an HTTP client that multiple stream management and underlying connection management
49
 * across multiple HTTP codec types.
50
 */
51
class CodecClient : protected Logger::Loggable<Logger::Id::client>,
52
                    public Http::ConnectionCallbacks,
53
                    public Network::ConnectionCallbacks,
54
                    public Event::DeferredDeletable {
55
public:
56
  /**
57
   * Type of HTTP codec to use.
58
   */
59
  // This is a legacy alias.
60
  using Type = Envoy::Http::CodecType;
61

            
62
  /**
63
   * Add a connection callback to the underlying network connection.
64
   */
65
29262
  void addConnectionCallbacks(Network::ConnectionCallbacks& cb) {
66
29262
    connection_->addConnectionCallbacks(cb);
67
29262
  }
68

            
69
  /**
70
   * Return if half-close semantics are enabled on the underlying connection.
71
   */
72
95
  bool isHalfCloseEnabled() { return connection_->isHalfCloseEnabled(); }
73

            
74
  /**
75
   * Initialize all of the installed read filters on the underlying connection.
76
   * This effectively calls onNewConnection() on each of them.
77
   */
78
28606
  void initializeReadFilters() { connection_->initializeReadFilters(); }
79

            
80
  /**
81
   * Close the underlying network connection. This is immediate and will not attempt to flush any
82
   * pending write data.
83
   */
84
  void close(Network::ConnectionCloseType type = Network::ConnectionCloseType::NoFlush,
85
             absl::string_view details = "");
86

            
87
  /**
88
   * Send a codec level go away indication to the peer.
89
   */
90
5
  void goAway() { codec_->goAway(); }
91

            
92
  /**
93
   * @return the underlying connection ID.
94
   */
95
359
  uint64_t id() const { return connection_->id(); }
96

            
97
  /**
98
   * @return the underlying codec protocol.
99
   */
100
170418
  Protocol protocol() { return codec_->protocol(); }
101

            
102
  /**
103
   * @return the underlying connection error.
104
   */
105
57565
  absl::string_view connectionFailureReason() { return connection_->transportFailureReason(); }
106

            
107
  /**
108
   * @return size_t the number of outstanding requests that have not completed or been reset.
109
   */
110
218765
  size_t numActiveRequests() { return active_requests_.size(); }
111

            
112
  /**
113
   * Create a new stream. Note: The CodecClient will NOT buffer multiple requests for HTTP1
114
   * connections. Thus, calling newStream() before the previous request has been fully encoded
115
   * is an error. Pipelining is supported however.
116
   * @param response_decoder supplies the decoder to use for response callbacks.
117
   * @return StreamEncoder& the encoder to use for encoding the request.
118
   */
119
  RequestEncoder& newStream(ResponseDecoder& response_decoder);
120

            
121
  /**
122
   * Create a new stream. Note: The CodecClient will NOT buffer multiple requests for HTTP1
123
   * connections. Thus, calling newStream() before the previous request has been fully encoded
124
   * is an error. Pipelining is supported however.
125
   * @param response_decoder_handle supplies the decoder to use for response callbacks if it's still
126
   * alive.
127
   * @return StreamEncoder& the encoder to use for encoding the request.
128
   */
129
  RequestEncoder& newStream(ResponseDecoderHandlePtr response_decoder_handle);
130

            
131
28915
  void setConnectionStats(const Network::Connection::ConnectionStats& stats) {
132
28915
    connection_->setConnectionStats(stats);
133
28915
  }
134

            
135
25080
  void setCodecClientCallbacks(CodecClientCallbacks& callbacks) {
136
25080
    codec_client_callbacks_ = &callbacks;
137
25080
  }
138

            
139
25291
  void setCodecConnectionCallbacks(Http::ConnectionCallbacks& callbacks) {
140
25291
    codec_callbacks_ = &callbacks;
141
25291
  }
142

            
143
24313
  bool remoteClosed() const { return remote_closed_; }
144

            
145
15183
  CodecType type() const { return type_; }
146

            
147
  // Note this is the L4 stream info, not L7.
148
46730
  StreamInfo::StreamInfo& streamInfo() { return connection_->streamInfo(); }
149

            
150
  /**
151
   * Connect to the host.
152
   * Needs to be called after codec_ is instantiated.
153
   */
154
  void connect();
155

            
156
943
  bool connectCalled() const { return connect_called_; }
157

            
158
protected:
159
  /**
160
   * Create a codec client and connect to a remote host/port.
161
   * @param type supplies the codec type.
162
   * @param connection supplies the connection to communicate on.
163
   * @param host supplies the owning host.
164
   */
165
  CodecClient(CodecType type, Network::ClientConnectionPtr&& connection,
166
              Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher);
167

            
168
  // Http::ConnectionCallbacks
169
1744
  void onGoAway(GoAwayErrorCode error_code) override {
170
1744
    if (codec_callbacks_) {
171
1728
      codec_callbacks_->onGoAway(error_code);
172
1728
    }
173
1744
  }
174
13255
  void onSettings(ReceivedSettings& settings) override {
175
13255
    if (codec_callbacks_) {
176
12872
      codec_callbacks_->onSettings(settings);
177
12872
    }
178
13255
  }
179
5803
  void onMaxStreamsChanged(uint32_t num_streams) override {
180
5803
    if (codec_callbacks_) {
181
4686
      codec_callbacks_->onMaxStreamsChanged(num_streams);
182
4686
    }
183
5803
  }
184

            
185
32
  void onIdleTimeout() {
186
32
    host_->cluster().trafficStats()->upstream_cx_idle_timeout_.inc();
187
32
    close(Network::ConnectionCloseType::NoFlush,
188
32
          StreamInfo::LocalCloseReasons::get().IdleTimeoutOnConnection);
189
32
  }
190

            
191
149250
  void disableIdleTimer() {
192
149250
    if (idle_timer_ != nullptr) {
193
75592
      idle_timer_->disableTimer();
194
75592
    }
195
149250
  }
196

            
197
121405
  void enableIdleTimer() {
198
    // Bug fix (default): only enable idle timer when connection is established.
199
    // Old behavior (when flag is disabled): enable idle timer even when connection is not yet
200
    // established.
201
121405
    if (!connected_ && enable_idle_timer_only_when_connected_) {
202
25
      return;
203
25
    }
204
121380
    if (idle_timer_ != nullptr) {
205
68118
      idle_timer_->enableTimer(idle_timeout_.value());
206
68118
    }
207
121380
  }
208

            
209
  const CodecType type_;
210
  // The order of host_, connection_, and codec_ matter as during destruction each can refer to
211
  // the previous, at least in tests.
212
  Upstream::HostDescriptionConstSharedPtr host_;
213
  Network::ClientConnectionPtr connection_;
214
  ClientConnectionPtr codec_;
215
  Event::TimerPtr idle_timer_;
216
  const absl::optional<std::chrono::milliseconds> idle_timeout_;
217
  const bool enable_idle_timer_only_when_connected_;
218

            
219
private:
220
  /**
221
   * Wrapper read filter to drive incoming connection data into the codec. We could potentially
222
   * support other filters in the future.
223
   */
224
  struct CodecReadFilter : public Network::ReadFilterBaseImpl {
225
51234
    CodecReadFilter(CodecClient& parent) : parent_(parent) {}
226

            
227
    // Network::ReadFilter
228
124126
    Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override {
229
124126
      parent_.onData(data);
230
124126
      if (end_stream && parent_.isHalfCloseEnabled()) {
231
        // Note that this results in the connection closed as if it was closed
232
        // locally, it would be more correct to convey the end stream to the
233
        // response decoder, but it would require some refactoring.
234
95
        parent_.close();
235
95
      }
236
124126
      return Network::FilterStatus::StopIteration;
237
124126
    }
238

            
239
    CodecClient& parent_;
240
  };
241

            
242
  struct ActiveRequest;
243

            
244
  /**
245
   * Wrapper for an outstanding request. Designed for handling stream multiplexing.
246
   */
247
  struct ActiveRequest : LinkedObject<ActiveRequest>,
248
                         public Event::DeferredDeletable,
249
                         public StreamCallbacks,
250
                         public ResponseDecoderWrapper,
251
                         public RequestEncoderWrapper {
252
    ActiveRequest(CodecClient& parent, ResponseDecoder& inner)
253
76824
        : ResponseDecoderWrapper(inner), RequestEncoderWrapper(nullptr), parent_(parent),
254
          header_validator_(
255
76824
              parent.host_->cluster().makeHeaderValidator(parent.codec_->protocol())) {
256
76824
      switch (parent.protocol()) {
257
88
      case Protocol::Http10:
258
42727
      case Protocol::Http11:
259
        // HTTP/1.1 codec does not support half-close on the response completion.
260
42727
        wait_encode_complete_ = false;
261
42727
        break;
262
31590
      case Protocol::Http2:
263
34097
      case Protocol::Http3:
264
34097
        wait_encode_complete_ = true;
265
34097
        break;
266
76824
      }
267
76824
    }
268

            
269
    ActiveRequest(CodecClient& parent, ResponseDecoderHandlePtr inner_handle)
270
21205
        : ResponseDecoderWrapper(std::move(inner_handle)), RequestEncoderWrapper(nullptr),
271
21205
          parent_(parent), header_validator_(parent.host_->cluster().makeHeaderValidator(
272
21205
                               parent.codec_->protocol())) {
273
21205
      switch (parent.protocol()) {
274
87
      case Protocol::Http10:
275
87
      case Protocol::Http11:
276
        // HTTP/1.1 codec does not support half-close on the response completion.
277
87
        wait_encode_complete_ = false;
278
87
        break;
279
19669
      case Protocol::Http2:
280
21118
      case Protocol::Http3:
281
21118
        wait_encode_complete_ = true;
282
21118
        break;
283
21205
      }
284
21205
    }
285

            
286
    void decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) override;
287

            
288
    // StreamCallbacks
289
16359
    void onResetStream(StreamResetReason reason, absl::string_view) override {
290
16359
      parent_.onReset(*this, reason);
291
16359
    }
292
238157
    void onAboveWriteBufferHighWatermark() override {}
293
238131
    void onBelowWriteBufferLowWatermark() override {}
294

            
295
    // StreamDecoderWrapper
296
82515
    void onPreDecodeComplete() override { parent_.responsePreDecodeComplete(*this); }
297
82515
    void onDecodeComplete() override {}
298

            
299
    // RequestEncoderWrapper
300
84942
    void onEncodeComplete() override { parent_.requestEncodeComplete(*this); }
301

            
302
    // RequestEncoder
303
    Status encodeHeaders(const RequestHeaderMap& headers, bool end_stream) override;
304

            
305
98029
    void setEncoder(RequestEncoder& encoder) {
306
98029
      inner_encoder_ = &encoder;
307
98029
      inner_encoder_->getStream().addCallbacks(*this);
308
98029
    }
309

            
310
81670
    void removeEncoderCallbacks() { inner_encoder_->getStream().removeCallbacks(*this); }
311

            
312
    CodecClient& parent_;
313
    Http::ClientHeaderValidatorPtr header_validator_;
314
    bool wait_encode_complete_{true};
315
    bool encode_complete_{false};
316
    bool decode_complete_{false};
317
  };
318

            
319
  using ActiveRequestPtr = std::unique_ptr<ActiveRequest>;
320

            
321
  /**
322
   * Called when a response finishes decoding. This is called *before* forwarding on to the
323
   * wrapped decoder.
324
   */
325
  void responsePreDecodeComplete(ActiveRequest& request);
326
  void requestEncodeComplete(ActiveRequest& request);
327
  void completeRequest(ActiveRequest& request);
328

            
329
  void deleteRequest(ActiveRequest& request);
330
  void onReset(ActiveRequest& request, StreamResetReason reason);
331
  void onData(Buffer::Instance& data);
332

            
333
  // Network::ConnectionCallbacks
334
  void onEvent(Network::ConnectionEvent event) override;
335
  // Pass watermark events from the connection on to the codec which will pass it to the underlying
336
  // streams.
337
104532
  void onAboveWriteBufferHighWatermark() override {
338
104532
    codec_->onUnderlyingConnectionAboveWriteBufferHighWatermark();
339
104532
  }
340
104532
  void onBelowWriteBufferLowWatermark() override {
341
104532
    codec_->onUnderlyingConnectionBelowWriteBufferLowWatermark();
342
104532
  }
343
  RequestEncoder& enlistAndCreateEncoder(ActiveRequestPtr request);
344

            
345
  std::list<ActiveRequestPtr> active_requests_;
346
  Http::ConnectionCallbacks* codec_callbacks_{};
347
  CodecClientCallbacks* codec_client_callbacks_{};
348
  bool connected_{};
349
  bool remote_closed_{};
350
  bool protocol_error_{false};
351
  bool connect_called_{false};
352
};
353

            
354
using CodecClientPtr = std::unique_ptr<CodecClient>;
355

            
356
/**
357
 * Production implementation that installs a real codec.
358
 */
359
class CodecClientProd : public CodecClient {
360
public:
361
  CodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection,
362
                  Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher,
363
                  Random::RandomGenerator& random_generator,
364
                  const Network::TransportSocketOptionsConstSharedPtr& options,
365
                  bool should_connect_on_creation = true);
366
};
367

            
368
} // namespace Http
369
} // namespace Envoy