1
#include "source/common/http/http1/conn_pool.h"
2

            
3
#include <cstdint>
4
#include <list>
5
#include <memory>
6

            
7
#include "envoy/event/dispatcher.h"
8
#include "envoy/event/schedulable_cb.h"
9
#include "envoy/event/timer.h"
10
#include "envoy/http/codec.h"
11
#include "envoy/http/header_map.h"
12
#include "envoy/server/overload/overload_manager.h"
13
#include "envoy/upstream/upstream.h"
14

            
15
#include "source/common/http/codec_client.h"
16
#include "source/common/http/codes.h"
17
#include "source/common/http/header_utility.h"
18
#include "source/common/http/headers.h"
19
#include "source/common/runtime/runtime_features.h"
20

            
21
#include "absl/strings/match.h"
22

            
23
namespace Envoy {
24
namespace Http {
25
namespace Http1 {
26

            
27
ActiveClient::StreamWrapper::StreamWrapper(ResponseDecoder& response_decoder, ActiveClient& parent)
28
1
    : ResponseDecoderWrapper(response_decoder),
29
1
      RequestEncoderWrapper(&parent.codec_client_->newStream(*this)), parent_(parent) {
30
1
  RequestEncoderWrapper::inner_encoder_->getStream().addCallbacks(*this);
31
1
}
32

            
33
ActiveClient::StreamWrapper::StreamWrapper(ResponseDecoderHandlePtr response_decoder_handle,
34
                                           ActiveClient& parent)
35
25603
    : ResponseDecoderWrapper(std::move(response_decoder_handle)),
36
25603
      RequestEncoderWrapper(&parent.codec_client_->newStream(*this)), parent_(parent) {
37
25603
  RequestEncoderWrapper::inner_encoder_->getStream().addCallbacks(*this);
38
25603
}
39

            
40
25604
ActiveClient::StreamWrapper::~StreamWrapper() {
41
  // Upstream connection might be closed right after response is complete. Setting delay=true
42
  // here to attach pending requests in next dispatcher loop to handle that case.
43
  // https://github.com/envoyproxy/envoy/issues/2715
44
25604
  parent_.parent_.onStreamClosed(parent_, true);
45
25604
}
46

            
47
24760
void ActiveClient::StreamWrapper::onEncodeComplete() { encode_complete_ = true; }
48

            
49
24854
void ActiveClient::StreamWrapper::decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) {
50
24854
  close_connection_ =
51
24854
      HeaderUtility::shouldCloseConnection(parent_.codec_client_->protocol(), *headers);
52
24854
  if (close_connection_) {
53
3
    parent_.parent().host()->cluster().trafficStats()->upstream_cx_close_notify_.inc();
54
3
  }
55
24854
  ResponseDecoderWrapper::decodeHeaders(std::move(headers), end_stream);
56
24854
}
57

            
58
24454
void ActiveClient::StreamWrapper::onDecodeComplete() {
59
24454
  ASSERT(!decode_complete_);
60
24454
  decode_complete_ = encode_complete_;
61
24454
  ENVOY_CONN_LOG(debug, "response complete", *parent_.codec_client_);
62

            
63
24454
  if (!encode_complete_) {
64
51
    ENVOY_CONN_LOG(debug, "response before request complete", *parent_.codec_client_);
65
51
    parent_.codec_client_->close();
66
24419
  } else if (close_connection_ || parent_.codec_client_->remoteClosed()) {
67
5
    ENVOY_CONN_LOG(debug, "saw upstream close connection", *parent_.codec_client_);
68
5
    parent_.codec_client_->close();
69
24398
  } else {
70
24398
    auto* pool = &parent_.parent();
71
24398
    pool->scheduleOnUpstreamReady();
72
24398
    parent_.stream_wrapper_.reset();
73

            
74
24398
    pool->checkForIdleAndCloseIdleConnsIfDraining();
75
24398
  }
76
24454
}
77

            
78
1162
void ActiveClient::StreamWrapper::onResetStream(StreamResetReason, absl::string_view) {
79
1162
  parent_.codec_client_->close();
80
1162
}
81

            
82
ActiveClient::ActiveClient(HttpConnPoolImplBase& parent,
83
                           OptRef<Upstream::Host::CreateConnectionData> data)
84
21891
    : Envoy::Http::ActiveClient(parent, parent.host()->cluster().maxRequestsPerConnection(),
85
21891
                                /* effective_concurrent_stream_limit */ 1,
86
21891
                                /* configured_concurrent_stream_limit */ 1, data) {
87
21891
  parent.host()->cluster().trafficStats()->upstream_cx_http1_total_.inc();
88
21891
}
89

            
90
21891
ActiveClient::~ActiveClient() { ASSERT(!stream_wrapper_.get()); }
91

            
92
21891
bool ActiveClient::closingWithIncompleteStream() const {
93
21891
  return (stream_wrapper_ != nullptr) && (!stream_wrapper_->decode_complete_);
94
21891
}
95

            
96
1
RequestEncoder& ActiveClient::newStreamEncoder(ResponseDecoder& response_decoder) {
97
1
  ASSERT(!stream_wrapper_);
98
1
  stream_wrapper_ = std::make_unique<StreamWrapper>(response_decoder, *this);
99
1
  return *stream_wrapper_;
100
1
}
101

            
102
25603
RequestEncoder& ActiveClient::newStreamEncoder(ResponseDecoderHandlePtr response_decoder_handle) {
103
25603
  ASSERT(!stream_wrapper_);
104
25603
  stream_wrapper_ = std::make_unique<StreamWrapper>(std::move(response_decoder_handle), *this);
105
25603
  return *stream_wrapper_;
106
25603
}
107

            
108
ConnectionPool::InstancePtr
109
allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
110
                 Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
111
                 const Network::ConnectionSocket::OptionsSharedPtr& options,
112
                 const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
113
                 Upstream::ClusterConnectivityState& state,
114
4817
                 Server::OverloadManager& overload_manager) {
115
4817
  return std::make_unique<FixedHttpConnPoolImpl>(
116
4817
      std::move(host), std::move(priority), dispatcher, options, transport_socket_options,
117
4817
      random_generator, state,
118
21832
      [](HttpConnPoolImplBase* pool) {
119
21828
        return std::make_unique<ActiveClient>(*pool, absl::nullopt);
120
21828
      },
121
21832
      [](Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase* pool) {
122
21828
        CodecClientPtr codec{new CodecClientProd(
123
21828
            CodecType::HTTP1, std::move(data.connection_), data.host_description_,
124
21828
            pool->dispatcher(), pool->randomGenerator(), pool->transportSocketOptions())};
125
21828
        return codec;
126
21828
      },
127
4817
      std::vector<Protocol>{Protocol::Http11}, overload_manager, absl::nullopt, nullptr);
128
4817
}
129

            
130
} // namespace Http1
131
} // namespace Http
132
} // namespace Envoy