Line data Source code
1 : #include "source/common/http/http1/conn_pool.h" 2 : 3 : #include <cstdint> 4 : #include <list> 5 : #include <memory> 6 : 7 : #include "envoy/event/dispatcher.h" 8 : #include "envoy/event/schedulable_cb.h" 9 : #include "envoy/event/timer.h" 10 : #include "envoy/http/codec.h" 11 : #include "envoy/http/header_map.h" 12 : #include "envoy/upstream/upstream.h" 13 : 14 : #include "source/common/http/codec_client.h" 15 : #include "source/common/http/codes.h" 16 : #include "source/common/http/header_utility.h" 17 : #include "source/common/http/headers.h" 18 : #include "source/common/runtime/runtime_features.h" 19 : 20 : #include "absl/strings/match.h" 21 : 22 : namespace Envoy { 23 : namespace Http { 24 : namespace Http1 { 25 : 26 : ActiveClient::StreamWrapper::StreamWrapper(ResponseDecoder& response_decoder, ActiveClient& parent) 27 : : RequestEncoderWrapper(&parent.codec_client_->newStream(*this)), 28 54 : ResponseDecoderWrapper(response_decoder), parent_(parent) { 29 54 : RequestEncoderWrapper::inner_encoder_->getStream().addCallbacks(*this); 30 54 : } 31 : 32 54 : ActiveClient::StreamWrapper::~StreamWrapper() { 33 : // Upstream connection might be closed right after response is complete. Setting delay=true 34 : // here to attach pending requests in next dispatcher loop to handle that case. 35 : // https://github.com/envoyproxy/envoy/issues/2715 36 54 : parent_.parent_.onStreamClosed(parent_, true); 37 54 : } 38 : 39 52 : void ActiveClient::StreamWrapper::onEncodeComplete() { encode_complete_ = true; } 40 : 41 33 : void ActiveClient::StreamWrapper::decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) { 42 33 : close_connection_ = 43 33 : HeaderUtility::shouldCloseConnection(parent_.codec_client_->protocol(), *headers); 44 33 : if (close_connection_) { 45 0 : parent_.parent().host()->cluster().trafficStats()->upstream_cx_close_notify_.inc(); 46 0 : } 47 33 : ResponseDecoderWrapper::decodeHeaders(std::move(headers), end_stream); 48 33 : } 49 : 50 33 : void ActiveClient::StreamWrapper::onDecodeComplete() { 51 33 : ASSERT(!decode_complete_); 52 33 : decode_complete_ = encode_complete_; 53 33 : ENVOY_CONN_LOG(debug, "response complete", *parent_.codec_client_); 54 : 55 33 : if (!parent_.stream_wrapper_->encode_complete_) { 56 0 : ENVOY_CONN_LOG(debug, "response before request complete", *parent_.codec_client_); 57 0 : parent_.codec_client_->close(); 58 33 : } else if (parent_.stream_wrapper_->close_connection_ || parent_.codec_client_->remoteClosed()) { 59 0 : ENVOY_CONN_LOG(debug, "saw upstream close connection", *parent_.codec_client_); 60 0 : parent_.codec_client_->close(); 61 33 : } else { 62 33 : auto* pool = &parent_.parent(); 63 33 : pool->scheduleOnUpstreamReady(); 64 33 : parent_.stream_wrapper_.reset(); 65 : 66 33 : pool->checkForIdleAndCloseIdleConnsIfDraining(); 67 33 : } 68 33 : } 69 : 70 21 : void ActiveClient::StreamWrapper::onResetStream(StreamResetReason, absl::string_view) { 71 21 : parent_.codec_client_->close(); 72 21 : } 73 : 74 : ActiveClient::ActiveClient(HttpConnPoolImplBase& parent, 75 : OptRef<Upstream::Host::CreateConnectionData> data) 76 : : Envoy::Http::ActiveClient(parent, parent.host()->cluster().maxRequestsPerConnection(), 77 : /* effective_concurrent_stream_limit */ 1, 78 68 : /* configured_concurrent_stream_limit */ 1, data) { 79 68 : parent.host()->cluster().trafficStats()->upstream_cx_http1_total_.inc(); 80 68 : } 81 : 82 68 : ActiveClient::~ActiveClient() { ASSERT(!stream_wrapper_.get()); } 83 : 84 68 : bool ActiveClient::closingWithIncompleteStream() const { 85 68 : return (stream_wrapper_ != nullptr) && (!stream_wrapper_->decode_complete_); 86 68 : } 87 : 88 54 : RequestEncoder& ActiveClient::newStreamEncoder(ResponseDecoder& response_decoder) { 89 54 : ASSERT(!stream_wrapper_); 90 54 : stream_wrapper_ = std::make_unique<StreamWrapper>(response_decoder, *this); 91 54 : return *stream_wrapper_; 92 54 : } 93 : 94 : ConnectionPool::InstancePtr 95 : allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator, 96 : Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority, 97 : const Network::ConnectionSocket::OptionsSharedPtr& options, 98 : const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options, 99 68 : Upstream::ClusterConnectivityState& state) { 100 68 : return std::make_unique<FixedHttpConnPoolImpl>( 101 68 : std::move(host), std::move(priority), dispatcher, options, transport_socket_options, 102 68 : random_generator, state, 103 68 : [](HttpConnPoolImplBase* pool) { 104 68 : return std::make_unique<ActiveClient>(*pool, absl::nullopt); 105 68 : }, 106 68 : [](Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase* pool) { 107 68 : CodecClientPtr codec{new CodecClientProd( 108 68 : CodecType::HTTP1, std::move(data.connection_), data.host_description_, 109 68 : pool->dispatcher(), pool->randomGenerator(), pool->transportSocketOptions())}; 110 68 : return codec; 111 68 : }, 112 68 : std::vector<Protocol>{Protocol::Http11}, absl::nullopt, nullptr); 113 68 : } 114 : 115 : } // namespace Http1 116 : } // namespace Http 117 : } // namespace Envoy