Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/http/codec_client.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <cstdint>
4
#include <list>
5
#include <memory>
6
7
#include "envoy/common/random_generator.h"
8
#include "envoy/event/deferred_deletable.h"
9
#include "envoy/event/timer.h"
10
#include "envoy/http/codec.h"
11
#include "envoy/http/header_validator.h"
12
#include "envoy/network/connection.h"
13
#include "envoy/network/filter.h"
14
#include "envoy/upstream/upstream.h"
15
16
#include "source/common/common/assert.h"
17
#include "source/common/common/linked_object.h"
18
#include "source/common/common/logger.h"
19
#include "source/common/http/codec_wrappers.h"
20
#include "source/common/network/filter_impl.h"
21
#include "source/common/runtime/runtime_features.h"
22
23
namespace Envoy {
24
namespace Http {
25
26
/**
27
 * Callbacks specific to a codec client.
28
 */
29
class CodecClientCallbacks {
30
public:
31
1.69k
  virtual ~CodecClientCallbacks() = default;
32
33
  // Called in onPreDecodeComplete
34
1.70k
  virtual void onStreamPreDecodeComplete() {}
35
36
  /**
37
   * Called every time an owned stream is destroyed, whether complete or not.
38
   */
39
  virtual void onStreamDestroy() PURE;
40
41
  /**
42
   * Called when a stream is reset by the client.
43
   * @param reason supplies the reset reason.
44
   */
45
  virtual void onStreamReset(StreamResetReason reason) PURE;
46
};
47
48
/**
49
 * This is an HTTP client that multiple stream management and underlying connection management
50
 * across multiple HTTP codec types.
51
 */
52
class CodecClient : protected Logger::Loggable<Logger::Id::client>,
53
                    public Http::ConnectionCallbacks,
54
                    public Network::ConnectionCallbacks,
55
                    public Event::DeferredDeletable {
56
public:
57
  /**
58
   * Type of HTTP codec to use.
59
   */
60
  // This is a legacy alias.
61
  using Type = Envoy::Http::CodecType;
62
63
  /**
64
   * Add a connection callback to the underlying network connection.
65
   */
66
21.3k
  void addConnectionCallbacks(Network::ConnectionCallbacks& cb) {
67
21.3k
    connection_->addConnectionCallbacks(cb);
68
21.3k
  }
69
70
  /**
71
   * Return if half-close semantics are enabled on the underlying connection.
72
   */
73
0
  bool isHalfCloseEnabled() { return connection_->isHalfCloseEnabled(); }
74
75
  /**
76
   * Initialize all of the installed read filters on the underlying connection.
77
   * This effectively calls onNewConnection() on each of them.
78
   */
79
898
  void initializeReadFilters() { connection_->initializeReadFilters(); }
80
81
  /**
82
   * Close the underlying network connection. This is immediate and will not attempt to flush any
83
   * pending write data.
84
   */
85
  void close(Network::ConnectionCloseType type = Network::ConnectionCloseType::NoFlush);
86
87
  /**
88
   * Send a codec level go away indication to the peer.
89
   */
90
0
  void goAway() { codec_->goAway(); }
91
92
  /**
93
   * @return the underlying connection ID.
94
   */
95
0
  uint64_t id() const { return connection_->id(); }
96
97
  /**
98
   * @return the underlying codec protocol.
99
   */
100
27.1k
  Protocol protocol() { return codec_->protocol(); }
101
102
  /**
103
   * @return the underlying connection error.
104
   */
105
1.80k
  absl::string_view connectionFailureReason() { return connection_->transportFailureReason(); }
106
107
  /**
108
   * @return size_t the number of outstanding requests that have not completed or been reset.
109
   */
110
38.3k
  size_t numActiveRequests() { return active_requests_.size(); }
111
112
  /**
113
   * Create a new stream. Note: The CodecClient will NOT buffer multiple requests for HTTP1
114
   * connections. Thus, calling newStream() before the previous request has been fully encoded
115
   * is an error. Pipelining is supported however.
116
   * @param response_decoder supplies the decoder to use for response callbacks.
117
   * @return StreamEncoder& the encoder to use for encoding the request.
118
   */
119
  RequestEncoder& newStream(ResponseDecoder& response_decoder);
120
121
911
  void setConnectionStats(const Network::Connection::ConnectionStats& stats) {
122
911
    connection_->setConnectionStats(stats);
123
911
  }
124
125
1.69k
  void setCodecClientCallbacks(CodecClientCallbacks& callbacks) {
126
1.69k
    codec_client_callbacks_ = &callbacks;
127
1.69k
  }
128
129
22.1k
  void setCodecConnectionCallbacks(Http::ConnectionCallbacks& callbacks) {
130
22.1k
    codec_callbacks_ = &callbacks;
131
22.1k
  }
132
133
1
  bool remoteClosed() const { return remote_closed_; }
134
135
815
  CodecType type() const { return type_; }
136
137
  // Note this is the L4 stream info, not L7.
138
1.76k
  StreamInfo::StreamInfo& streamInfo() { return connection_->streamInfo(); }
139
140
  /**
141
   * Connect to the host.
142
   * Needs to be called after codec_ is instantiated.
143
   */
144
  void connect();
145
146
0
  bool connectCalled() const { return connect_called_; }
147
148
protected:
149
  /**
150
   * Create a codec client and connect to a remote host/port.
151
   * @param type supplies the codec type.
152
   * @param connection supplies the connection to communicate on.
153
   * @param host supplies the owning host.
154
   */
155
  CodecClient(CodecType type, Network::ClientConnectionPtr&& connection,
156
              Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher);
157
158
  // Http::ConnectionCallbacks
159
12.8k
  void onGoAway(GoAwayErrorCode error_code) override {
160
12.8k
    if (codec_callbacks_) {
161
12.8k
      codec_callbacks_->onGoAway(error_code);
162
12.8k
    }
163
12.8k
  }
164
2.02k
  void onSettings(ReceivedSettings& settings) override {
165
2.02k
    if (codec_callbacks_) {
166
2.02k
      codec_callbacks_->onSettings(settings);
167
2.02k
    }
168
2.02k
  }
169
0
  void onMaxStreamsChanged(uint32_t num_streams) override {
170
0
    if (codec_callbacks_) {
171
0
      codec_callbacks_->onMaxStreamsChanged(num_streams);
172
0
    }
173
0
  }
174
175
0
  void onIdleTimeout() {
176
0
    host_->cluster().trafficStats()->upstream_cx_idle_timeout_.inc();
177
0
    close();
178
0
  }
179
180
47.1k
  void disableIdleTimer() {
181
47.1k
    if (idle_timer_ != nullptr) {
182
2.67k
      idle_timer_->disableTimer();
183
2.67k
    }
184
47.1k
  }
185
186
25.8k
  void enableIdleTimer() {
187
25.8k
    if (idle_timer_ != nullptr) {
188
2.52k
      idle_timer_->enableTimer(idle_timeout_.value());
189
2.52k
    }
190
25.8k
  }
191
192
  const CodecType type_;
193
  // The order of host_, connection_, and codec_ matter as during destruction each can refer to
194
  // the previous, at least in tests.
195
  Upstream::HostDescriptionConstSharedPtr host_;
196
  Network::ClientConnectionPtr connection_;
197
  ClientConnectionPtr codec_;
198
  Event::TimerPtr idle_timer_;
199
  const absl::optional<std::chrono::milliseconds> idle_timeout_;
200
201
private:
202
  /**
203
   * Wrapper read filter to drive incoming connection data into the codec. We could potentially
204
   * support other filters in the future.
205
   */
206
  struct CodecReadFilter : public Network::ReadFilterBaseImpl {
207
22.1k
    CodecReadFilter(CodecClient& parent) : parent_(parent) {}
208
209
    // Network::ReadFilter
210
6.79k
    Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override {
211
6.79k
      parent_.onData(data);
212
6.79k
      if (end_stream && parent_.isHalfCloseEnabled()) {
213
        // Note that this results in the connection closed as if it was closed
214
        // locally, it would be more correct to convey the end stream to the
215
        // response decoder, but it would require some refactoring.
216
0
        parent_.close();
217
0
      }
218
6.79k
      return Network::FilterStatus::StopIteration;
219
6.79k
    }
220
221
    CodecClient& parent_;
222
  };
223
224
  struct ActiveRequest;
225
226
  /**
227
   * Wrapper for an outstanding request. Designed for handling stream multiplexing.
228
   */
229
  struct ActiveRequest : LinkedObject<ActiveRequest>,
230
                         public Event::DeferredDeletable,
231
                         public StreamCallbacks,
232
                         public ResponseDecoderWrapper,
233
                         public RequestEncoderWrapper {
234
    ActiveRequest(CodecClient& parent, ResponseDecoder& inner)
235
        : ResponseDecoderWrapper(inner), RequestEncoderWrapper(nullptr), parent_(parent),
236
          header_validator_(
237
24.9k
              parent.host_->cluster().makeHeaderValidator(parent.codec_->protocol())) {
238
24.9k
      switch (parent.protocol()) {
239
18.6k
      case Protocol::Http10:
240
22.4k
      case Protocol::Http11:
241
        // HTTP/1.1 codec does not support half-close on the response completion.
242
22.4k
        wait_encode_complete_ = false;
243
22.4k
        break;
244
2.55k
      case Protocol::Http2:
245
2.55k
      case Protocol::Http3:
246
2.55k
        wait_encode_complete_ = true;
247
2.55k
        break;
248
24.9k
      }
249
24.9k
    }
Unexecuted instantiation: Envoy::Http::CodecClient::ActiveRequest::ActiveRequest(Envoy::Http::CodecClient&, Envoy::Http::ResponseDecoder&)
Envoy::Http::CodecClient::ActiveRequest::ActiveRequest(Envoy::Http::CodecClient&, Envoy::Http::ResponseDecoder&)
Line
Count
Source
237
24.9k
              parent.host_->cluster().makeHeaderValidator(parent.codec_->protocol())) {
238
24.9k
      switch (parent.protocol()) {
239
18.6k
      case Protocol::Http10:
240
22.4k
      case Protocol::Http11:
241
        // HTTP/1.1 codec does not support half-close on the response completion.
242
22.4k
        wait_encode_complete_ = false;
243
22.4k
        break;
244
2.55k
      case Protocol::Http2:
245
2.55k
      case Protocol::Http3:
246
2.55k
        wait_encode_complete_ = true;
247
2.55k
        break;
248
24.9k
      }
249
24.9k
    }
250
251
    void decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) override;
252
253
    // StreamCallbacks
254
19.8k
    void onResetStream(StreamResetReason reason, absl::string_view) override {
255
19.8k
      parent_.onReset(*this, reason);
256
19.8k
    }
257
0
    void onAboveWriteBufferHighWatermark() override {}
258
0
    void onBelowWriteBufferLowWatermark() override {}
259
260
    // StreamDecoderWrapper
261
5.23k
    void onPreDecodeComplete() override { parent_.responsePreDecodeComplete(*this); }
262
5.23k
    void onDecodeComplete() override {}
263
264
    // RequestEncoderWrapper
265
24.8k
    void onEncodeComplete() override { parent_.requestEncodeComplete(*this); }
266
267
    // RequestEncoder
268
    Status encodeHeaders(const RequestHeaderMap& headers, bool end_stream) override;
269
270
24.9k
    void setEncoder(RequestEncoder& encoder) {
271
24.9k
      inner_encoder_ = &encoder;
272
24.9k
      inner_encoder_->getStream().addCallbacks(*this);
273
24.9k
    }
274
275
5.13k
    void removeEncoderCallbacks() { inner_encoder_->getStream().removeCallbacks(*this); }
276
277
    CodecClient& parent_;
278
    Http::ClientHeaderValidatorPtr header_validator_;
279
    bool wait_encode_complete_{true};
280
    bool encode_complete_{false};
281
    bool decode_complete_{false};
282
  };
283
284
  using ActiveRequestPtr = std::unique_ptr<ActiveRequest>;
285
286
  /**
287
   * Called when a response finishes decoding. This is called *before* forwarding on to the
288
   * wrapped decoder.
289
   */
290
  void responsePreDecodeComplete(ActiveRequest& request);
291
  void requestEncodeComplete(ActiveRequest& request);
292
  void completeRequest(ActiveRequest& request);
293
294
  void deleteRequest(ActiveRequest& request);
295
  void onReset(ActiveRequest& request, StreamResetReason reason);
296
  void onData(Buffer::Instance& data);
297
298
  // Network::ConnectionCallbacks
299
  void onEvent(Network::ConnectionEvent event) override;
300
  // Pass watermark events from the connection on to the codec which will pass it to the underlying
301
  // streams.
302
0
  void onAboveWriteBufferHighWatermark() override {
303
0
    codec_->onUnderlyingConnectionAboveWriteBufferHighWatermark();
304
0
  }
305
0
  void onBelowWriteBufferLowWatermark() override {
306
0
    codec_->onUnderlyingConnectionBelowWriteBufferLowWatermark();
307
0
  }
308
309
  std::list<ActiveRequestPtr> active_requests_;
310
  Http::ConnectionCallbacks* codec_callbacks_{};
311
  CodecClientCallbacks* codec_client_callbacks_{};
312
  bool connected_{};
313
  bool remote_closed_{};
314
  bool protocol_error_{false};
315
  bool connect_called_{false};
316
};
317
318
using CodecClientPtr = std::unique_ptr<CodecClient>;
319
320
/**
321
 * Production implementation that installs a real codec.
322
 */
323
class CodecClientProd : public CodecClient {
324
public:
325
  CodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection,
326
                  Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher,
327
                  Random::RandomGenerator& random_generator,
328
                  const Network::TransportSocketOptionsConstSharedPtr& options,
329
                  bool should_connect_on_creation = true);
330
};
331
332
} // namespace Http
333
} // namespace Envoy