Line data Source code
1 : #pragma once 2 : 3 : #include <cstdint> 4 : #include <list> 5 : #include <memory> 6 : 7 : #include "envoy/common/random_generator.h" 8 : #include "envoy/event/deferred_deletable.h" 9 : #include "envoy/event/timer.h" 10 : #include "envoy/http/codec.h" 11 : #include "envoy/http/header_validator.h" 12 : #include "envoy/network/connection.h" 13 : #include "envoy/network/filter.h" 14 : #include "envoy/upstream/upstream.h" 15 : 16 : #include "source/common/common/assert.h" 17 : #include "source/common/common/linked_object.h" 18 : #include "source/common/common/logger.h" 19 : #include "source/common/http/codec_wrappers.h" 20 : #include "source/common/network/filter_impl.h" 21 : #include "source/common/runtime/runtime_features.h" 22 : 23 : namespace Envoy { 24 : namespace Http { 25 : 26 : /** 27 : * Callbacks specific to a codec client. 28 : */ 29 : class CodecClientCallbacks { 30 : public: 31 145 : virtual ~CodecClientCallbacks() = default; 32 : 33 : // Called in onPreDecodeComplete 34 105 : virtual void onStreamPreDecodeComplete() {} 35 : 36 : /** 37 : * Called every time an owned stream is destroyed, whether complete or not. 38 : */ 39 : virtual void onStreamDestroy() PURE; 40 : 41 : /** 42 : * Called when a stream is reset by the client. 43 : * @param reason supplies the reset reason. 44 : */ 45 : virtual void onStreamReset(StreamResetReason reason) PURE; 46 : }; 47 : 48 : /** 49 : * This is an HTTP client that multiple stream management and underlying connection management 50 : * across multiple HTTP codec types. 51 : */ 52 : class CodecClient : protected Logger::Loggable<Logger::Id::client>, 53 : public Http::ConnectionCallbacks, 54 : public Network::ConnectionCallbacks, 55 : public Event::DeferredDeletable { 56 : public: 57 : /** 58 : * Type of HTTP codec to use. 59 : */ 60 : // This is a legacy alias. 61 : using Type = Envoy::Http::CodecType; 62 : 63 : /** 64 : * Add a connection callback to the underlying network connection. 65 : */ 66 227 : void addConnectionCallbacks(Network::ConnectionCallbacks& cb) { 67 227 : connection_->addConnectionCallbacks(cb); 68 227 : } 69 : 70 : /** 71 : * Return if half-close semantics are enabled on the underlying connection. 72 : */ 73 0 : bool isHalfCloseEnabled() { return connection_->isHalfCloseEnabled(); } 74 : 75 : /** 76 : * Initialize all of the installed read filters on the underlying connection. 77 : * This effectively calls onNewConnection() on each of them. 78 : */ 79 165 : void initializeReadFilters() { connection_->initializeReadFilters(); } 80 : 81 : /** 82 : * Close the underlying network connection. This is immediate and will not attempt to flush any 83 : * pending write data. 84 : */ 85 : void close(Network::ConnectionCloseType type = Network::ConnectionCloseType::NoFlush); 86 : 87 : /** 88 : * Send a codec level go away indication to the peer. 89 : */ 90 0 : void goAway() { codec_->goAway(); } 91 : 92 : /** 93 : * @return the underlying connection ID. 94 : */ 95 0 : uint64_t id() const { return connection_->id(); } 96 : 97 : /** 98 : * @return the underlying codec protocol. 99 : */ 100 649 : Protocol protocol() { return codec_->protocol(); } 101 : 102 : /** 103 : * @return the underlying connection error. 104 : */ 105 338 : absl::string_view connectionFailureReason() { return connection_->transportFailureReason(); } 106 : 107 : /** 108 : * @return size_t the number of outstanding requests that have not completed or been reset. 109 : */ 110 1365 : size_t numActiveRequests() { return active_requests_.size(); } 111 : 112 : /** 113 : * Create a new stream. Note: The CodecClient will NOT buffer multiple requests for HTTP1 114 : * connections. Thus, calling newStream() before the previous request has been fully encoded 115 : * is an error. Pipelining is supported however. 116 : * @param response_decoder supplies the decoder to use for response callbacks. 117 : * @return StreamEncoder& the encoder to use for encoding the request. 118 : */ 119 : RequestEncoder& newStream(ResponseDecoder& response_decoder); 120 : 121 173 : void setConnectionStats(const Network::Connection::ConnectionStats& stats) { 122 173 : connection_->setConnectionStats(stats); 123 173 : } 124 : 125 145 : void setCodecClientCallbacks(CodecClientCallbacks& callbacks) { 126 145 : codec_client_callbacks_ = &callbacks; 127 145 : } 128 : 129 199 : void setCodecConnectionCallbacks(Http::ConnectionCallbacks& callbacks) { 130 199 : codec_callbacks_ = &callbacks; 131 199 : } 132 : 133 33 : bool remoteClosed() const { return remote_closed_; } 134 : 135 40 : CodecType type() const { return type_; } 136 : 137 : // Note this is the L4 stream info, not L7. 138 202 : StreamInfo::StreamInfo& streamInfo() { return connection_->streamInfo(); } 139 : 140 : /** 141 : * Connect to the host. 142 : * Needs to be called after codec_ is instantiated. 143 : */ 144 : void connect(); 145 : 146 : protected: 147 : /** 148 : * Create a codec client and connect to a remote host/port. 149 : * @param type supplies the codec type. 150 : * @param connection supplies the connection to communicate on. 151 : * @param host supplies the owning host. 152 : */ 153 : CodecClient(CodecType type, Network::ClientConnectionPtr&& connection, 154 : Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher); 155 : 156 : // Http::ConnectionCallbacks 157 11 : void onGoAway(GoAwayErrorCode error_code) override { 158 11 : if (codec_callbacks_) { 159 11 : codec_callbacks_->onGoAway(error_code); 160 11 : } 161 11 : } 162 108 : void onSettings(ReceivedSettings& settings) override { 163 108 : if (codec_callbacks_) { 164 108 : codec_callbacks_->onSettings(settings); 165 108 : } 166 108 : } 167 0 : void onMaxStreamsChanged(uint32_t num_streams) override { 168 0 : if (codec_callbacks_) { 169 0 : codec_callbacks_->onMaxStreamsChanged(num_streams); 170 0 : } 171 0 : } 172 : 173 0 : void onIdleTimeout() { 174 0 : host_->cluster().trafficStats()->upstream_cx_idle_timeout_.inc(); 175 0 : close(); 176 0 : } 177 : 178 763 : void disableIdleTimer() { 179 763 : if (idle_timer_ != nullptr) { 180 375 : idle_timer_->disableTimer(); 181 375 : } 182 763 : } 183 : 184 571 : void enableIdleTimer() { 185 571 : if (idle_timer_ != nullptr) { 186 295 : idle_timer_->enableTimer(idle_timeout_.value()); 187 295 : } 188 571 : } 189 : 190 : const CodecType type_; 191 : // The order of host_, connection_, and codec_ matter as during destruction each can refer to 192 : // the previous, at least in tests. 193 : Upstream::HostDescriptionConstSharedPtr host_; 194 : Network::ClientConnectionPtr connection_; 195 : ClientConnectionPtr codec_; 196 : Event::TimerPtr idle_timer_; 197 : const absl::optional<std::chrono::milliseconds> idle_timeout_; 198 : 199 : private: 200 : /** 201 : * Wrapper read filter to drive incoming connection data into the codec. We could potentially 202 : * support other filters in the future. 203 : */ 204 : struct CodecReadFilter : public Network::ReadFilterBaseImpl { 205 365 : CodecReadFilter(CodecClient& parent) : parent_(parent) {} 206 : 207 : // Network::ReadFilter 208 675 : Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override { 209 675 : parent_.onData(data); 210 675 : if (end_stream && parent_.isHalfCloseEnabled()) { 211 : // Note that this results in the connection closed as if it was closed 212 : // locally, it would be more correct to convey the end stream to the 213 : // response decoder, but it would require some refactoring. 214 0 : parent_.close(); 215 0 : } 216 675 : return Network::FilterStatus::StopIteration; 217 675 : } 218 : 219 : CodecClient& parent_; 220 : }; 221 : 222 : struct ActiveRequest; 223 : 224 : /** 225 : * Wrapper for an outstanding request. Designed for handling stream multiplexing. 226 : */ 227 : struct ActiveRequest : LinkedObject<ActiveRequest>, 228 : public Event::DeferredDeletable, 229 : public StreamCallbacks, 230 : public ResponseDecoderWrapper, 231 : public RequestEncoderWrapper { 232 : ActiveRequest(CodecClient& parent, ResponseDecoder& inner) 233 : : ResponseDecoderWrapper(inner), RequestEncoderWrapper(nullptr), parent_(parent), 234 : header_validator_( 235 398 : parent.host_->cluster().makeHeaderValidator(parent.codec_->protocol())) { 236 398 : switch (parent.protocol()) { 237 26 : case Protocol::Http10: 238 210 : case Protocol::Http11: 239 : // HTTP/1.1 codec does not support half-close on the response completion. 240 210 : wait_encode_complete_ = false; 241 210 : break; 242 188 : case Protocol::Http2: 243 188 : case Protocol::Http3: 244 188 : wait_encode_complete_ = true; 245 188 : break; 246 398 : } 247 398 : } 248 : 249 : void decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) override; 250 : 251 : // StreamCallbacks 252 163 : void onResetStream(StreamResetReason reason, absl::string_view) override { 253 163 : parent_.onReset(*this, reason); 254 163 : } 255 0 : void onAboveWriteBufferHighWatermark() override {} 256 0 : void onBelowWriteBufferLowWatermark() override {} 257 : 258 : // StreamDecoderWrapper 259 260 : void onPreDecodeComplete() override { parent_.responsePreDecodeComplete(*this); } 260 260 : void onDecodeComplete() override {} 261 : 262 : // RequestEncoderWrapper 263 343 : void onEncodeComplete() override { parent_.requestEncodeComplete(*this); } 264 : 265 : // RequestEncoder 266 : Status encodeHeaders(const RequestHeaderMap& headers, bool end_stream) override; 267 : 268 398 : void setEncoder(RequestEncoder& encoder) { 269 398 : inner_encoder_ = &encoder; 270 398 : inner_encoder_->getStream().addCallbacks(*this); 271 398 : } 272 : 273 235 : void removeEncoderCallbacks() { inner_encoder_->getStream().removeCallbacks(*this); } 274 : 275 : CodecClient& parent_; 276 : Http::ClientHeaderValidatorPtr header_validator_; 277 : bool wait_encode_complete_{true}; 278 : bool encode_complete_{false}; 279 : bool decode_complete_{false}; 280 : }; 281 : 282 : using ActiveRequestPtr = std::unique_ptr<ActiveRequest>; 283 : 284 : /** 285 : * Called when a response finishes decoding. This is called *before* forwarding on to the 286 : * wrapped decoder. 287 : */ 288 : void responsePreDecodeComplete(ActiveRequest& request); 289 : void requestEncodeComplete(ActiveRequest& request); 290 : void completeRequest(ActiveRequest& request); 291 : 292 : void deleteRequest(ActiveRequest& request); 293 : void onReset(ActiveRequest& request, StreamResetReason reason); 294 : void onData(Buffer::Instance& data); 295 : 296 : // Network::ConnectionCallbacks 297 : void onEvent(Network::ConnectionEvent event) override; 298 : // Pass watermark events from the connection on to the codec which will pass it to the underlying 299 : // streams. 300 0 : void onAboveWriteBufferHighWatermark() override { 301 0 : codec_->onUnderlyingConnectionAboveWriteBufferHighWatermark(); 302 0 : } 303 0 : void onBelowWriteBufferLowWatermark() override { 304 0 : codec_->onUnderlyingConnectionBelowWriteBufferLowWatermark(); 305 0 : } 306 : 307 : std::list<ActiveRequestPtr> active_requests_; 308 : Http::ConnectionCallbacks* codec_callbacks_{}; 309 : CodecClientCallbacks* codec_client_callbacks_{}; 310 : bool connected_{}; 311 : bool remote_closed_{}; 312 : bool protocol_error_{false}; 313 : bool connect_called_{false}; 314 : }; 315 : 316 : using CodecClientPtr = std::unique_ptr<CodecClient>; 317 : 318 : /** 319 : * Production implementation that installs a real codec without automatically connecting. 320 : * TODO(danzh) deprecate this class and make CodecClientProd to have the option to defer connect 321 : * once "envoy.reloadable_features.postpone_h3_client_connect_to_next_loop" is deprecated. 322 : */ 323 : class NoConnectCodecClientProd : public CodecClient { 324 : public: 325 : NoConnectCodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection, 326 : Upstream::HostDescriptionConstSharedPtr host, 327 : Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator, 328 : const Network::TransportSocketOptionsConstSharedPtr& options); 329 : }; 330 : 331 : /** 332 : * Production implementation that installs a real codec. 333 : */ 334 : class CodecClientProd : public NoConnectCodecClientProd { 335 : public: 336 : CodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection, 337 : Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher, 338 : Random::RandomGenerator& random_generator, 339 : const Network::TransportSocketOptionsConstSharedPtr& options); 340 : }; 341 : 342 : } // namespace Http 343 : } // namespace Envoy