1
#include "source/common/http/http1/conn_pool.h"
2

            
3
#include <cstdint>
4
#include <list>
5
#include <memory>
6

            
7
#include "envoy/event/dispatcher.h"
8
#include "envoy/event/schedulable_cb.h"
9
#include "envoy/event/timer.h"
10
#include "envoy/http/codec.h"
11
#include "envoy/http/header_map.h"
12
#include "envoy/server/overload/overload_manager.h"
13
#include "envoy/upstream/upstream.h"
14

            
15
#include "source/common/http/codec_client.h"
16
#include "source/common/http/codes.h"
17
#include "source/common/http/header_utility.h"
18
#include "source/common/http/headers.h"
19
#include "source/common/runtime/runtime_features.h"
20

            
21
#include "absl/strings/match.h"
22

            
23
namespace Envoy {
24
namespace Http {
25
namespace Http1 {
26

            
27
ActiveClient::StreamWrapper::StreamWrapper(ResponseDecoder& response_decoder, ActiveClient& parent)
28
1
    : ResponseDecoderWrapper(response_decoder),
29
1
      RequestEncoderWrapper(&parent.codec_client_->newStream(*this)), parent_(parent) {
30
1
  RequestEncoderWrapper::inner_encoder_->getStream().addCallbacks(*this);
31
1
}
32

            
33
ActiveClient::StreamWrapper::StreamWrapper(ResponseDecoderHandlePtr response_decoder_handle,
34
                                           ActiveClient& parent)
35
25521
    : ResponseDecoderWrapper(std::move(response_decoder_handle)),
36
25521
      RequestEncoderWrapper(&parent.codec_client_->newStream(*this)), parent_(parent) {
37
25521
  RequestEncoderWrapper::inner_encoder_->getStream().addCallbacks(*this);
38
25521
}
39

            
40
25522
ActiveClient::StreamWrapper::~StreamWrapper() {
41
  // Upstream connection might be closed right after response is complete. Setting delay=true
42
  // here to attach pending requests in next dispatcher loop to handle that case.
43
  // https://github.com/envoyproxy/envoy/issues/2715
44
25522
  parent_.parent_.onStreamClosed(parent_, true);
45
25522
}
46

            
47
24674
void ActiveClient::StreamWrapper::onEncodeComplete() { encode_complete_ = true; }
48

            
49
24767
void ActiveClient::StreamWrapper::decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) {
50
24767
  close_connection_ =
51
24767
      HeaderUtility::shouldCloseConnection(parent_.codec_client_->protocol(), *headers);
52
24767
  if (close_connection_) {
53
3
    parent_.parent().host()->cluster().trafficStats()->upstream_cx_close_notify_.inc();
54
3
  }
55
24767
  ResponseDecoderWrapper::decodeHeaders(std::move(headers), end_stream);
56
24767
}
57

            
58
24367
void ActiveClient::StreamWrapper::onDecodeComplete() {
59
24367
  ASSERT(!decode_complete_);
60
24367
  decode_complete_ = encode_complete_;
61
24367
  ENVOY_CONN_LOG(debug, "response complete", *parent_.codec_client_);
62

            
63
24367
  if (!encode_complete_) {
64
51
    ENVOY_CONN_LOG(debug, "response before request complete", *parent_.codec_client_);
65
51
    parent_.codec_client_->close();
66
24332
  } else if (close_connection_ || parent_.codec_client_->remoteClosed()) {
67
5
    ENVOY_CONN_LOG(debug, "saw upstream close connection", *parent_.codec_client_);
68
5
    parent_.codec_client_->close();
69
24311
  } else {
70
24311
    auto* pool = &parent_.parent();
71
24311
    pool->scheduleOnUpstreamReady();
72
24311
    parent_.stream_wrapper_.reset();
73

            
74
24311
    pool->checkForIdleAndCloseIdleConnsIfDraining();
75
24311
  }
76
24367
}
77

            
78
1167
void ActiveClient::StreamWrapper::onResetStream(StreamResetReason, absl::string_view) {
79
1167
  parent_.codec_client_->close();
80
1167
}
81

            
82
ActiveClient::ActiveClient(HttpConnPoolImplBase& parent,
83
                           OptRef<Upstream::Host::CreateConnectionData> data)
84
21906
    : Envoy::Http::ActiveClient(parent, parent.host()->cluster().maxRequestsPerConnection(),
85
21906
                                /* effective_concurrent_stream_limit */ 1,
86
21906
                                /* configured_concurrent_stream_limit */ 1, data) {
87
21906
  parent.host()->cluster().trafficStats()->upstream_cx_http1_total_.inc();
88
21906
}
89

            
90
21906
ActiveClient::~ActiveClient() { ASSERT(!stream_wrapper_.get()); }
91

            
92
21906
bool ActiveClient::closingWithIncompleteStream() const {
93
21906
  return (stream_wrapper_ != nullptr) && (!stream_wrapper_->decode_complete_);
94
21906
}
95

            
96
1
RequestEncoder& ActiveClient::newStreamEncoder(ResponseDecoder& response_decoder) {
97
1
  ASSERT(!stream_wrapper_);
98
1
  stream_wrapper_ = std::make_unique<StreamWrapper>(response_decoder, *this);
99
1
  return *stream_wrapper_;
100
1
}
101

            
102
25521
RequestEncoder& ActiveClient::newStreamEncoder(ResponseDecoderHandlePtr response_decoder_handle) {
103
25521
  ASSERT(!stream_wrapper_);
104
25521
  stream_wrapper_ = std::make_unique<StreamWrapper>(std::move(response_decoder_handle), *this);
105
25521
  return *stream_wrapper_;
106
25521
}
107

            
108
ConnectionPool::InstancePtr
109
allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
110
                 Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
111
                 const Network::ConnectionSocket::OptionsSharedPtr& options,
112
                 const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
113
                 Upstream::ClusterConnectivityState& state,
114
4835
                 Server::OverloadManager& overload_manager) {
115
4835
  return std::make_unique<FixedHttpConnPoolImpl>(
116
4835
      std::move(host), std::move(priority), dispatcher, options, transport_socket_options,
117
4835
      random_generator, state,
118
21847
      [](HttpConnPoolImplBase* pool) {
119
21843
        return std::make_unique<ActiveClient>(*pool, absl::nullopt);
120
21843
      },
121
21847
      [](Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase* pool) {
122
21843
        CodecClientPtr codec{new CodecClientProd(
123
21843
            CodecType::HTTP1, std::move(data.connection_), data.host_description_,
124
21843
            pool->dispatcher(), pool->randomGenerator(), pool->transportSocketOptions())};
125
21843
        return codec;
126
21843
      },
127
4835
      std::vector<Protocol>{Protocol::Http11}, overload_manager, absl::nullopt, nullptr);
128
4835
}
129

            
130
} // namespace Http1
131
} // namespace Http
132
} // namespace Envoy