Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/http/codec_client.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <cstdint>
4
#include <list>
5
#include <memory>
6
7
#include "envoy/common/random_generator.h"
8
#include "envoy/event/deferred_deletable.h"
9
#include "envoy/event/timer.h"
10
#include "envoy/http/codec.h"
11
#include "envoy/http/header_validator.h"
12
#include "envoy/network/connection.h"
13
#include "envoy/network/filter.h"
14
#include "envoy/upstream/upstream.h"
15
16
#include "source/common/common/assert.h"
17
#include "source/common/common/linked_object.h"
18
#include "source/common/common/logger.h"
19
#include "source/common/http/codec_wrappers.h"
20
#include "source/common/network/filter_impl.h"
21
#include "source/common/runtime/runtime_features.h"
22
23
namespace Envoy {
24
namespace Http {
25
26
/**
27
 * Callbacks specific to a codec client.
28
 */
29
class CodecClientCallbacks {
30
public:
31
2.37k
  virtual ~CodecClientCallbacks() = default;
32
33
  // Called in onPreDecodeComplete
34
2.26k
  virtual void onStreamPreDecodeComplete() {}
35
36
  /**
37
   * Called every time an owned stream is destroyed, whether complete or not.
38
   */
39
  virtual void onStreamDestroy() PURE;
40
41
  /**
42
   * Called when a stream is reset by the client.
43
   * @param reason supplies the reset reason.
44
   */
45
  virtual void onStreamReset(StreamResetReason reason) PURE;
46
};
47
48
/**
49
 * This is an HTTP client that multiple stream management and underlying connection management
50
 * across multiple HTTP codec types.
51
 */
52
class CodecClient : protected Logger::Loggable<Logger::Id::client>,
53
                    public Http::ConnectionCallbacks,
54
                    public Network::ConnectionCallbacks,
55
                    public Event::DeferredDeletable {
56
public:
57
  /**
58
   * Type of HTTP codec to use.
59
   */
60
  // This is a legacy alias.
61
  using Type = Envoy::Http::CodecType;
62
63
  /**
64
   * Add a connection callback to the underlying network connection.
65
   */
66
21.9k
  void addConnectionCallbacks(Network::ConnectionCallbacks& cb) {
67
21.9k
    connection_->addConnectionCallbacks(cb);
68
21.9k
  }
69
70
  /**
71
   * Return if half-close semantics are enabled on the underlying connection.
72
   */
73
0
  bool isHalfCloseEnabled() { return connection_->isHalfCloseEnabled(); }
74
75
  /**
76
   * Initialize all of the installed read filters on the underlying connection.
77
   * This effectively calls onNewConnection() on each of them.
78
   */
79
1.31k
  void initializeReadFilters() { connection_->initializeReadFilters(); }
80
81
  /**
82
   * Close the underlying network connection. This is immediate and will not attempt to flush any
83
   * pending write data.
84
   */
85
  void close(Network::ConnectionCloseType type = Network::ConnectionCloseType::NoFlush);
86
87
  /**
88
   * Send a codec level go away indication to the peer.
89
   */
90
0
  void goAway() { codec_->goAway(); }
91
92
  /**
93
   * @return the underlying connection ID.
94
   */
95
0
  uint64_t id() const { return connection_->id(); }
96
97
  /**
98
   * @return the underlying codec protocol.
99
   */
100
31.1k
  Protocol protocol() { return codec_->protocol(); }
101
102
  /**
103
   * @return the underlying connection error.
104
   */
105
2.63k
  absl::string_view connectionFailureReason() { return connection_->transportFailureReason(); }
106
107
  /**
108
   * @return size_t the number of outstanding requests that have not completed or been reset.
109
   */
110
45.3k
  size_t numActiveRequests() { return active_requests_.size(); }
111
112
  /**
113
   * Create a new stream. Note: The CodecClient will NOT buffer multiple requests for HTTP1
114
   * connections. Thus, calling newStream() before the previous request has been fully encoded
115
   * is an error. Pipelining is supported however.
116
   * @param response_decoder supplies the decoder to use for response callbacks.
117
   * @return StreamEncoder& the encoder to use for encoding the request.
118
   */
119
  RequestEncoder& newStream(ResponseDecoder& response_decoder);
120
121
1.32k
  void setConnectionStats(const Network::Connection::ConnectionStats& stats) {
122
1.32k
    connection_->setConnectionStats(stats);
123
1.32k
  }
124
125
2.37k
  void setCodecClientCallbacks(CodecClientCallbacks& callbacks) {
126
2.37k
    codec_client_callbacks_ = &callbacks;
127
2.37k
  }
128
129
23.0k
  void setCodecConnectionCallbacks(Http::ConnectionCallbacks& callbacks) {
130
23.0k
    codec_callbacks_ = &callbacks;
131
23.0k
  }
132
133
1
  bool remoteClosed() const { return remote_closed_; }
134
135
1.10k
  CodecType type() const { return type_; }
136
137
  // Note this is the L4 stream info, not L7.
138
2.32k
  StreamInfo::StreamInfo& streamInfo() { return connection_->streamInfo(); }
139
140
  /**
141
   * Connect to the host.
142
   * Needs to be called after codec_ is instantiated.
143
   */
144
  void connect();
145
146
protected:
147
  /**
148
   * Create a codec client and connect to a remote host/port.
149
   * @param type supplies the codec type.
150
   * @param connection supplies the connection to communicate on.
151
   * @param host supplies the owning host.
152
   */
153
  CodecClient(CodecType type, Network::ClientConnectionPtr&& connection,
154
              Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher);
155
156
  // Http::ConnectionCallbacks
157
13.1k
  void onGoAway(GoAwayErrorCode error_code) override {
158
13.1k
    if (codec_callbacks_) {
159
13.1k
      codec_callbacks_->onGoAway(error_code);
160
13.1k
    }
161
13.1k
  }
162
2.43k
  void onSettings(ReceivedSettings& settings) override {
163
2.43k
    if (codec_callbacks_) {
164
2.43k
      codec_callbacks_->onSettings(settings);
165
2.43k
    }
166
2.43k
  }
167
0
  void onMaxStreamsChanged(uint32_t num_streams) override {
168
0
    if (codec_callbacks_) {
169
0
      codec_callbacks_->onMaxStreamsChanged(num_streams);
170
0
    }
171
0
  }
172
173
0
  void onIdleTimeout() {
174
0
    host_->cluster().trafficStats()->upstream_cx_idle_timeout_.inc();
175
0
    close();
176
0
  }
177
178
54.1k
  void disableIdleTimer() {
179
54.1k
    if (idle_timer_ != nullptr) {
180
3.64k
      idle_timer_->disableTimer();
181
3.64k
    }
182
54.1k
  }
183
184
29.7k
  void enableIdleTimer() {
185
29.7k
    if (idle_timer_ != nullptr) {
186
3.55k
      idle_timer_->enableTimer(idle_timeout_.value());
187
3.55k
    }
188
29.7k
  }
189
190
  const CodecType type_;
191
  // The order of host_, connection_, and codec_ matter as during destruction each can refer to
192
  // the previous, at least in tests.
193
  Upstream::HostDescriptionConstSharedPtr host_;
194
  Network::ClientConnectionPtr connection_;
195
  ClientConnectionPtr codec_;
196
  Event::TimerPtr idle_timer_;
197
  const absl::optional<std::chrono::milliseconds> idle_timeout_;
198
199
private:
200
  /**
201
   * Wrapper read filter to drive incoming connection data into the codec. We could potentially
202
   * support other filters in the future.
203
   */
204
  struct CodecReadFilter : public Network::ReadFilterBaseImpl {
205
25.7k
    CodecReadFilter(CodecClient& parent) : parent_(parent) {}
206
207
    // Network::ReadFilter
208
11.3k
    Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override {
209
11.3k
      parent_.onData(data);
210
11.3k
      if (end_stream && parent_.isHalfCloseEnabled()) {
211
        // Note that this results in the connection closed as if it was closed
212
        // locally, it would be more correct to convey the end stream to the
213
        // response decoder, but it would require some refactoring.
214
0
        parent_.close();
215
0
      }
216
11.3k
      return Network::FilterStatus::StopIteration;
217
11.3k
    }
218
219
    CodecClient& parent_;
220
  };
221
222
  struct ActiveRequest;
223
224
  /**
225
   * Wrapper for an outstanding request. Designed for handling stream multiplexing.
226
   */
227
  struct ActiveRequest : LinkedObject<ActiveRequest>,
228
                         public Event::DeferredDeletable,
229
                         public StreamCallbacks,
230
                         public ResponseDecoderWrapper,
231
                         public RequestEncoderWrapper {
232
    ActiveRequest(CodecClient& parent, ResponseDecoder& inner)
233
        : ResponseDecoderWrapper(inner), RequestEncoderWrapper(nullptr), parent_(parent),
234
          header_validator_(
235
28.4k
              parent.host_->cluster().makeHeaderValidator(parent.codec_->protocol())) {
236
28.4k
      switch (parent.protocol()) {
237
18.4k
      case Protocol::Http10:
238
25.0k
      case Protocol::Http11:
239
        // HTTP/1.1 codec does not support half-close on the response completion.
240
25.0k
        wait_encode_complete_ = false;
241
25.0k
        break;
242
3.38k
      case Protocol::Http2:
243
3.38k
      case Protocol::Http3:
244
3.38k
        wait_encode_complete_ = true;
245
3.38k
        break;
246
28.4k
      }
247
28.4k
    }
Unexecuted instantiation: Envoy::Http::CodecClient::ActiveRequest::ActiveRequest(Envoy::Http::CodecClient&, Envoy::Http::ResponseDecoder&)
Envoy::Http::CodecClient::ActiveRequest::ActiveRequest(Envoy::Http::CodecClient&, Envoy::Http::ResponseDecoder&)
Line
Count
Source
235
28.4k
              parent.host_->cluster().makeHeaderValidator(parent.codec_->protocol())) {
236
28.4k
      switch (parent.protocol()) {
237
18.4k
      case Protocol::Http10:
238
25.0k
      case Protocol::Http11:
239
        // HTTP/1.1 codec does not support half-close on the response completion.
240
25.0k
        wait_encode_complete_ = false;
241
25.0k
        break;
242
3.38k
      case Protocol::Http2:
243
3.38k
      case Protocol::Http3:
244
3.38k
        wait_encode_complete_ = true;
245
3.38k
        break;
246
28.4k
      }
247
28.4k
    }
248
249
    void decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) override;
250
251
    // StreamCallbacks
252
20.4k
    void onResetStream(StreamResetReason reason, absl::string_view) override {
253
20.4k
      parent_.onReset(*this, reason);
254
20.4k
    }
255
0
    void onAboveWriteBufferHighWatermark() override {}
256
0
    void onBelowWriteBufferLowWatermark() override {}
257
258
    // StreamDecoderWrapper
259
8.14k
    void onPreDecodeComplete() override { parent_.responsePreDecodeComplete(*this); }
260
8.14k
    void onDecodeComplete() override {}
261
262
    // RequestEncoderWrapper
263
28.3k
    void onEncodeComplete() override { parent_.requestEncodeComplete(*this); }
264
265
    // RequestEncoder
266
    Status encodeHeaders(const RequestHeaderMap& headers, bool end_stream) override;
267
268
28.4k
    void setEncoder(RequestEncoder& encoder) {
269
28.4k
      inner_encoder_ = &encoder;
270
28.4k
      inner_encoder_->getStream().addCallbacks(*this);
271
28.4k
    }
272
273
8.05k
    void removeEncoderCallbacks() { inner_encoder_->getStream().removeCallbacks(*this); }
274
275
    CodecClient& parent_;
276
    Http::ClientHeaderValidatorPtr header_validator_;
277
    bool wait_encode_complete_{true};
278
    bool encode_complete_{false};
279
    bool decode_complete_{false};
280
  };
281
282
  using ActiveRequestPtr = std::unique_ptr<ActiveRequest>;
283
284
  /**
285
   * Called when a response finishes decoding. This is called *before* forwarding on to the
286
   * wrapped decoder.
287
   */
288
  void responsePreDecodeComplete(ActiveRequest& request);
289
  void requestEncodeComplete(ActiveRequest& request);
290
  void completeRequest(ActiveRequest& request);
291
292
  void deleteRequest(ActiveRequest& request);
293
  void onReset(ActiveRequest& request, StreamResetReason reason);
294
  void onData(Buffer::Instance& data);
295
296
  // Network::ConnectionCallbacks
297
  void onEvent(Network::ConnectionEvent event) override;
298
  // Pass watermark events from the connection on to the codec which will pass it to the underlying
299
  // streams.
300
0
  void onAboveWriteBufferHighWatermark() override {
301
0
    codec_->onUnderlyingConnectionAboveWriteBufferHighWatermark();
302
0
  }
303
0
  void onBelowWriteBufferLowWatermark() override {
304
0
    codec_->onUnderlyingConnectionBelowWriteBufferLowWatermark();
305
0
  }
306
307
  std::list<ActiveRequestPtr> active_requests_;
308
  Http::ConnectionCallbacks* codec_callbacks_{};
309
  CodecClientCallbacks* codec_client_callbacks_{};
310
  bool connected_{};
311
  bool remote_closed_{};
312
  bool protocol_error_{false};
313
  bool connect_called_{false};
314
};
315
316
using CodecClientPtr = std::unique_ptr<CodecClient>;
317
318
/**
319
 * Production implementation that installs a real codec without automatically connecting.
320
 * TODO(danzh) deprecate this class and make CodecClientProd to have the option to defer connect
321
 * once "envoy.reloadable_features.postpone_h3_client_connect_to_next_loop" is deprecated.
322
 */
323
class NoConnectCodecClientProd : public CodecClient {
324
public:
325
  NoConnectCodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection,
326
                           Upstream::HostDescriptionConstSharedPtr host,
327
                           Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
328
                           const Network::TransportSocketOptionsConstSharedPtr& options);
329
};
330
331
/**
332
 * Production implementation that installs a real codec.
333
 */
334
class CodecClientProd : public NoConnectCodecClientProd {
335
public:
336
  CodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection,
337
                  Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher,
338
                  Random::RandomGenerator& random_generator,
339
                  const Network::TransportSocketOptionsConstSharedPtr& options);
340
};
341
342
} // namespace Http
343
} // namespace Envoy