1
#include <fmt/base.h>
2
#include <fmt/format.h>
3
#include <gmock/gmock-cardinalities.h>
4
#include <gmock/gmock-spec-builders.h>
5
#include <gtest/gtest-param-test.h>
6
#include <gtest/gtest.h>
7
#include <unistd.h>
8

            
9
#include <chrono>
10
#include <cstdint>
11
#include <functional>
12
#include <memory>
13
#include <string>
14
#include <utility>
15
#include <vector>
16

            
17
#include "envoy/buffer/buffer.h"
18
#include "envoy/common/exception.h"
19
#include "envoy/event/dispatcher.h"
20
#include "envoy/extensions/transport_sockets/tls/v3/tls.pb.h"
21
#include "envoy/http/codec.h" // IWYU pragma: keep
22
#include "envoy/network/address.h"
23
#include "envoy/network/connection.h"
24
#include "envoy/network/transport_socket.h"
25

            
26
#include "source/common/buffer/buffer_impl.h"
27
#include "source/common/common/assert.h"
28
#include "source/common/stats/isolated_store_impl.h"
29
#include "source/common/tls/server_context_config_impl.h"
30
#include "source/common/tls/server_ssl_socket.h"
31

            
32
#include "test/integration/fake_upstream.h"
33
#include "test/integration/integration_tcp_client.h"
34
#include "test/integration/ssl_utility.h"
35
#include "test/integration/utility.h"
36
#include "test/mocks/buffer/mocks.h"
37
#include "test/mocks/server/admin.h"
38
#include "test/test_common/environment.h"
39
#include "test/test_common/utility.h"
40

            
41
#include "tests/cilium_tcp_integration.h"
42
#include "tests/cilium_tls_integration.h"
43

            
44
using testing::AtLeast;
45

            
46
namespace Envoy {
47
namespace Cilium {
48

            
49
//
50
// Cilium filters with TCP proxy & Upstream TLS
51
//
52

            
53
// params: is_ingress ("true", "false")
54
const std::string cilium_tls_tcp_proxy_config_fmt = R"EOF(
55
admin:
56
  address:
57
    socket_address:
58
      address: 127.0.0.1
59
      port_value: 0
60
static_resources:
61
  clusters:
62
  - name: tls-cluster
63
    type: ORIGINAL_DST
64
    lb_policy: CLUSTER_PROVIDED
65
    connect_timeout:
66
      seconds: 1
67
    transport_socket:
68
      name: "cilium.tls_wrapper"
69
      typed_config:
70
        "@type": type.googleapis.com/cilium.UpstreamTlsWrapperContext
71
  - name: xds-grpc-cilium
72
    connect_timeout:
73
      seconds: 5
74
    type: STATIC
75
    lb_policy: ROUND_ROBIN
76
    http2_protocol_options:
77
    load_assignment:
78
      cluster_name: xds-grpc-cilium
79
      endpoints:
80
      - lb_endpoints:
81
        - endpoint:
82
            address:
83
              pipe:
84
                path: /var/run/cilium/xds.sock
85
  listeners:
86
    name: listener_0
87
    address:
88
      socket_address:
89
        address: 127.0.0.1
90
        port_value: 0
91
    listener_filters:
92
    - name: test_bpf_metadata
93
      typed_config:
94
        "@type": type.googleapis.com/cilium.TestBpfMetadata
95
        is_ingress: {0}
96
    filter_chains:
97
    - filters:
98
      - name: cilium.network
99
        typed_config:
100
          "@type": type.googleapis.com/cilium.NetworkFilter
101
          proxylib: "proxylib/libcilium.so"
102
      - name: envoy.tcp_proxy
103
        typed_config:
104
          "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
105
          stat_prefix: tcp_stats
106
          cluster: tls-cluster
107
)EOF";
108

            
109
class CiliumTLSIntegrationTest : public CiliumTcpIntegrationTest {
110
public:
111
11
  CiliumTLSIntegrationTest(const std::string& config) : CiliumTcpIntegrationTest(config) {
112
#if 0
113
    for (Logger::Logger& logger : Logger::Registry::loggers()) {
114
      logger.setLevel(spdlog::level::trace);
115
    }
116
#endif
117
11
  }
118

            
119
11
  void initialize() override {
120
11
    CiliumTcpIntegrationTest::initialize();
121

            
122
11
    payload_reader_ = std::make_shared<WaitForPayloadReader>(*dispatcher_);
123
11
  }
124

            
125
11
  void createUpstreams() override {
126
11
    if (upstream_tls_) {
127
11
      auto config = upstreamConfig();
128
11
      config.upstream_protocol_ = FakeHttpConnection::Type::HTTP1;
129
11
      config.enable_half_close_ = true;
130
11
      fake_upstreams_.emplace_back(
131
11
          new FakeUpstream(createUpstreamSslContext(), 0, version_, config));
132
11
    } else {
133
      CiliumTcpIntegrationTest::createUpstreams(); // maybe BaseIntegrationTest::createUpstreams()
134
    }
135
11
  }
136

            
137
  // TODO(mattklein123): This logic is duplicated in various places. Cleanup in
138
  // a follow up.
139
11
  Network::DownstreamTransportSocketFactoryPtr createUpstreamSslContext() {
140
11
    envoy::extensions::transport_sockets::tls::v3::DownstreamTlsContext tls_context;
141
11
    auto* common_tls_context = tls_context.mutable_common_tls_context();
142
11
    auto* tls_cert = common_tls_context->add_tls_certificates();
143
11
    tls_cert->mutable_certificate_chain()->set_filename(TestEnvironment::runfilesPath(
144
11
        fmt::format("test/config/integration/certs/{}cert.pem", upstream_cert_name_)));
145
11
    tls_cert->mutable_private_key()->set_filename(TestEnvironment::runfilesPath(
146
11
        fmt::format("test/config/integration/certs/{}key.pem", upstream_cert_name_)));
147

            
148
11
    auto cfg_or_error = Extensions::TransportSockets::Tls::ServerContextConfigImpl::create(
149
11
        tls_context, factory_context_, false);
150
    // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
151
11
    THROW_IF_NOT_OK(cfg_or_error.status());
152
11
    auto cfg = std::move(cfg_or_error.value());
153

            
154
11
    static auto* upstream_stats_store = new Stats::IsolatedStoreImpl();
155
11
    auto server_or_error = Extensions::TransportSockets::Tls::ServerSslSocketFactory::create(
156
11
        std::move(cfg), context_manager_, *upstream_stats_store->rootScope(),
157
11
        std::vector<std::string>{});
158
    // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
159
11
    THROW_IF_NOT_OK(server_or_error.status());
160
11
    return std::move(server_or_error.value());
161
11
  }
162

            
163
4
  void setupConnections() {
164
4
    initialize();
165
4
    fake_upstreams_[0]->setReadDisableOnNewConnection(false);
166

            
167
    // Set up the mock buffer factory so the newly created SSL client will have
168
    // a mock write buffer. This allows us to track the bytes actually written
169
    // to the socket.
170

            
171
4
    EXPECT_CALL(*mock_buffer_factory_, createBuffer_(_, _, _))
172
4
        .Times(AtLeast(1))
173
4
        .WillOnce(Invoke([&](std::function<void()> below_low, std::function<void()> above_high,
174
4
                             std::function<void()> above_overflow) -> Buffer::Instance* {
175
4
          client_write_buffer_ =
176
4
              new NiceMock<MockWatermarkBuffer>(below_low, above_high, above_overflow);
177
4
          ON_CALL(*client_write_buffer_, move(_))
178
4
              .WillByDefault(Invoke(client_write_buffer_, &MockWatermarkBuffer::baseMove));
179
4
          ON_CALL(*client_write_buffer_, drain(_))
180
4
              .WillByDefault(Invoke(client_write_buffer_, &MockWatermarkBuffer::trackDrains));
181
4
          return client_write_buffer_;
182
4
        }));
183
    // Set up the SSL client.
184
4
    Network::Address::InstanceConstSharedPtr address =
185
4
        Ssl::getSslAddress(version_, lookupPort("tcp_proxy"));
186
4
    context_ = createClientSslTransportSocketFactory(context_manager_, *api_);
187
4
    ssl_client_ = dispatcher_->createClientConnection(
188
4
        address, Network::Address::InstanceConstSharedPtr(),
189
4
        context_->createTransportSocket(nullptr, nullptr), nullptr, nullptr);
190

            
191
    // Perform the SSL handshake. Loopback is whitelisted in tcp_proxy.json for
192
    // the ssl_auth filter so there will be no pause waiting on auth data.
193
4
    ssl_client_->addConnectionCallbacks(connect_callbacks_);
194
4
    ssl_client_->enableHalfClose(true);
195
4
    ssl_client_->addReadFilter(payload_reader_);
196
4
    ssl_client_->connect();
197
32253
    while (!connect_callbacks_.connected()) {
198
32249
      dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
199
32249
    }
200
4
  }
201

            
202
  // Test proxying data in both directions with envoy doing TCP and TLS
203
  // termination.
204
  void sendAndReceiveTlsData(const std::string& data_to_send_upstream,
205
2
                             const std::string& data_to_send_downstream) {
206
2
    FakeRawConnectionPtr fake_upstream_connection;
207
2
    AssertionResult result = fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection);
208
2
    RELEASE_ASSERT(result, result.message());
209

            
210
    // Ship some data upstream.
211
2
    Buffer::OwnedImpl buffer(data_to_send_upstream);
212
2
    ssl_client_->write(buffer, false);
213
4
    while (client_write_buffer_->bytesDrained() != data_to_send_upstream.size()) {
214
2
      dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
215
2
    }
216

            
217
    // Make sure the data makes it upstream.
218
2
    ASSERT_TRUE(fake_upstream_connection->waitForData(data_to_send_upstream.size()));
219

            
220
    // Now send data downstream and make sure it arrives.
221
2
    ASSERT_TRUE(fake_upstream_connection->write(data_to_send_downstream));
222
2
    payload_reader_->setDataToWaitFor(data_to_send_downstream);
223
2
    ssl_client_->dispatcher().run(Event::Dispatcher::RunType::Block);
224

            
225
    // Clean up.
226
2
    Buffer::OwnedImpl empty_buffer;
227
2
    ssl_client_->write(empty_buffer, true);
228
2
    dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
229
2
    ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
230
2
    ASSERT_TRUE(fake_upstream_connection->write("", true));
231
2
    ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
232
2
    ssl_client_->dispatcher().run(Event::Dispatcher::RunType::Block);
233
2
    EXPECT_TRUE(payload_reader_->readLastByte());
234
2
    EXPECT_TRUE(connect_callbacks_.closed());
235
2
  }
236

            
237
  // Upstream
238
  bool upstream_tls_{true};
239
  std::string upstream_cert_name_{"upstreamlocalhost"};
240

            
241
  // Downstream
242
  std::shared_ptr<WaitForPayloadReader> payload_reader_;
243
  MockWatermarkBuffer* client_write_buffer_;
244
  Network::UpstreamTransportSocketFactoryPtr context_;
245
  Network::ClientConnectionPtr ssl_client_;
246
  ConnectionStatusCallbacks connect_callbacks_;
247
};
248

            
249
// upstream_tls_context tructed_ca from test/config/integration/certs/upstreamcacert.pem
250
// downstream_tls_context certificate_chain from test/config/integration/certs/servercert.pem
251
// downstream_tls_context private_key from test/config/integration/certs/serverkey.pem
252
const std::string TCP_POLICY_UPSTREAM_TLS_fmt = R"EOF(version_info: "0"
253
resources:
254
- "@type": type.googleapis.com/cilium.NetworkPolicy
255
  endpoint_ips:
256
  - '{{ ntop_ip_loopback_address }}'
257
  policy: 3
258
  ingress_per_port_policies:
259
  - port: {0}
260
    rules:
261
    - remote_policies: [ 1 ]
262
      upstream_tls_context:
263
        trusted_ca: "-----BEGIN CERTIFICATE-----\nMIID7zCCAtegAwIBAgIUfXpfjZAzA9sFKKe0k9M1rCGG9rwwDQYJKoZIhvcNAQEL\nBQAwfzELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExFjAUBgNVBAcM\nDVNhbiBGcmFuY2lzY28xDTALBgNVBAoMBEx5ZnQxGTAXBgNVBAsMEEx5ZnQgRW5n\naW5lZXJpbmcxGTAXBgNVBAMMEFRlc3QgVXBzdHJlYW0gQ0EwHhcNMjQwNDA4MTA0\nMjUzWhcNMjYwNDA4MTA0MjUzWjB/MQswCQYDVQQGEwJVUzETMBEGA1UECAwKQ2Fs\naWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzENMAsGA1UECgwETHlmdDEZ\nMBcGA1UECwwQTHlmdCBFbmdpbmVlcmluZzEZMBcGA1UEAwwQVGVzdCBVcHN0cmVh\nbSBDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANQ5VS5O8LtJdNY7\nL9sqH6vVhr9wyHsb7bvBSmg9JAxTU8vSFG/Uj4zoJDBYtEivU9F7leeqcqVLU9MA\n2vvYt/LS7/j2HOU0AfilbIGRJiho24AMlrkgXQSweVD+Y46hH42xythcZhwYS6JQ\nMpe0jkSk8SDUZTCCFeosbt8yTxOILgNsFUgUJ1pkUFyQQDSW+cYfruXgg/U/BdP2\nbme/E6Wf41KhZIZJTGzbxmgRrmF29ktOSwLyJcKpMCVNFforIBOKnF7ANKirnAS4\nFuBx6Q4peQ6/qwmXcucBD4X+YBoTi6+CZejW9LHcZX4gFjWKFlny4QJKxz2eFS+1\neudq86kCAwEAAaNjMGEwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAQYw\nHQYDVR0OBBYEFHrsizu33Ld1ed/tUUow717Z5RCDMB8GA1UdIwQYMBaAFHrsizu3\n3Ld1ed/tUUow717Z5RCDMA0GCSqGSIb3DQEBCwUAA4IBAQCPlwh6v03l09vHYA8k\nFX0YVZDUKOcz8wtoRHwkTjetGRaDF2xEu2NGr/RHFS5EyJ9kuwgc1nOGS8lfqDk9\nCznok/2qsN+ctp571ufhK+EZf5FI9etQJP1f0YleXrP3KR3ztQ5zLGXCv6E0oqXi\n6ct4FZJwq5RdP4LYJUWCfCAf5z8Yr6nLUlXTW2Kwwi3+3isqc97jdRMkL37Y3CyR\nEgAHSbw26XozFmY+K7ptspwb8zPaWKMUDNSGJVnfCqo8ABWJbDcdRa/AZA4KXScP\nH/A2sZtKx8b3mOIu/uX5NQCO+e0Tvm6qqCSGr+Ykcn7HI6Rr43d19He/zn82oHZF\nqhaf\n-----END CERTIFICATE-----\n"
264
      downstream_tls_context:
265
        certificate_chain: "-----BEGIN CERTIFICATE-----\nMIIEhTCCA22gAwIBAgIUQRkh3sY/JN5+tu5NX3Tbyx0Y8l8wDQYJKoZIhvcNAQEL\nBQAwdjELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExFjAUBgNVBAcM\nDVNhbiBGcmFuY2lzY28xDTALBgNVBAoMBEx5ZnQxGTAXBgNVBAsMEEx5ZnQgRW5n\naW5lZXJpbmcxEDAOBgNVBAMMB1Rlc3QgQ0EwHhcNMjQwNDA4MTA0MjUzWhcNMjYw\nNDA4MTA0MjUzWjCBpjELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWEx\nFjAUBgNVBAcMDVNhbiBGcmFuY2lzY28xDTALBgNVBAoMBEx5ZnQxGTAXBgNVBAsM\nEEx5ZnQgRW5naW5lZXJpbmcxGjAYBgNVBAMMEVRlc3QgQmFja2VuZCBUZWFtMSQw\nIgYJKoZIhvcNAQkBFhViYWNrZW5kLXRlYW1AbHlmdC5jb20wggEiMA0GCSqGSIb3\nDQEBAQUAA4IBDwAwggEKAoIBAQCqoyXM9pY5gDpLVap5mr0NtQjqCvh+GXZyP7BP\nP2S+oNtSaLLAe5+yNDJoldZSplLGYwrWWJtjWJedeQ5JnhpbFVKrGXDIBtQ/6B/v\noKEkdh3BOB79IKhbmNQTA9pFV+xypvM+IWFr4p5bvjTRgncdXdlzEf6g5ECNdgdi\nhlpdL3aAY/Ko6cEWAzaxypJAumzsaw4HX1HiBP7rhHHZrLsIPc6MZ/LhKztIgJIO\n3U2VOE4uRbcf1uBEkE6H63PKGBnuHJ5qkmLS6IoF9sl7pvydLj5tp1FB8twQMxwP\nWGRTZkpQ121zH5aBIEL1C/1WHgZ6AROEKvMK408e9fiAEfPLAgMBAAGjgdkwgdYw\nDAYDVR0TAQH/BAIwADALBgNVHQ8EBAMCBeAwHQYDVR0lBBYwFAYIKwYBBQUHAwIG\nCCsGAQUFBwMBMFoGA1UdEQRTMFGGHnNwaWZmZTovL2x5ZnQuY29tL2JhY2tlbmQt\ndGVhbYYXaHR0cDovL2JhY2tlbmQubHlmdC5jb22CCGx5ZnQuY29tggx3d3cubHlm\ndC5jb20wHQYDVR0OBBYEFF+sq41Nw9S3XL1yJQ51PLCa+mwUMB8GA1UdIwQYMBaA\nFBn+c0Qg6qbDyfGRTNk1jzuKuSOAMA0GCSqGSIb3DQEBCwUAA4IBAQB56Z7/YUQ6\nSkazZPO2Eodg/4eKHQPEluzbz13/wkTBMTewgLgXRCVow8025yt0NaQ9AQzrKELc\nWBD+BuoTA8koD52zXmG9kjpIzajIqv2urWIaH1vUzfM26uJgiQKXX3eo24fbGRQi\nW452PvGPYoGAtucrEg15MrGlfqLMPkNIJ3ufIWRh+ycriWb8kHe+TgB6XQQGhHdJ\nD0+MXSOkPoNM7I8hU2PNl29krHTl3npYK0zG4AOF6tbOuu6bta94kV8PQ4YBfojF\no8vYmMboYDfZnnh+92WT4Ra/BSIm/NXilo3mXOu+cuRP6Kl3kpJPT0zZIjI5DBLn\nQmJKb8oDcA7+\n-----END CERTIFICATE-----\n"
266
        private_key: "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEAqqMlzPaWOYA6S1WqeZq9DbUI6gr4fhl2cj+wTz9kvqDbUmiy\nwHufsjQyaJXWUqZSxmMK1libY1iXnXkOSZ4aWxVSqxlwyAbUP+gf76ChJHYdwTge\n/SCoW5jUEwPaRVfscqbzPiFha+KeW7400YJ3HV3ZcxH+oORAjXYHYoZaXS92gGPy\nqOnBFgM2scqSQLps7GsOB19R4gT+64Rx2ay7CD3OjGfy4Ss7SICSDt1NlThOLkW3\nH9bgRJBOh+tzyhgZ7hyeapJi0uiKBfbJe6b8nS4+badRQfLcEDMcD1hkU2ZKUNdt\ncx+WgSBC9Qv9Vh4GegEThCrzCuNPHvX4gBHzywIDAQABAoIBAQCpC+QBADGnWY9m\n3sF6o3+zuqvQIXo4gsVDPjFO8UC/UeC17Z9Y7aAyDV/7GKYxTzEl9SzhWProGvZp\nPWqYKBd4MNGrTBLdN1bC0RYCcaHy20lzCEQ7BUWFKQzAocp1dDt9AkRsQume1e2I\nehEdliCnaThptWQKxNXmzw1V4EBZm3Jf18azA82Op5O8uD8B7pLdM7cfXVfY5HIL\nN9HFY5yLJwM+N3M447StKQhfwohLtCuB8dVnYgVNKkqYfPyRSYT6h4OFJI+i1BRu\nyzEZoSVfa7oKAEStVH3G76M4TzKL5msU7AJrIogWFNYIy1jWEMJsCmhD5dbQhbJD\n9q1SgkIBAoGBAOJFaRl2PL7yf9RPh0E4iAJmqz9LxTk9PCa1Lu/EdtUdnqumPfVD\nfbsLxLUMYA5qQP411t+fFEgt2eBYj57WWhh035WhCpvhFhLgFqfXyFosI5Ku9xfE\nsOoCxzGOdCVfSi5PqL+cB49H6Msc5R0Wm1nr6Xz9vuW+U8AlxwdYSdNLAoGBAMEO\ngLej7FqBnnySXfTINFpXatPq4EaoMKqqI5Yjy0PlzBWbQtFV01zjo9xH9eCNRxrj\n1mz5tV3i1zyYapULL8hQ/qYVryf/sc4QGqEi2PPmk6KlR1729MMH83dWhuGgojyf\nkPy/+f5vqklRqQa90g01mea7O8mU10cUNmem85GBAoGBAMikiBfd8uvXmWaoxuUc\nve5zIDNWeyLQnAAu9doDOuSsCUFoftR37ovoWZu5x4vAyLUjBNDy/Ucr8WGw5loQ\n9X9uU70ZOpETPUGrmCtpeu4K6dhucgmPjtlTcVMOYQuqvdrnJFoUf9ecCl/h1YC/\nxS4ttbPyRk7vQNDILv7iWUSVAoGAK51xKwvXm+LowU/39hM88KQLOHE51fytcgEa\nJRNVGrPR1ZfMEqsHI1cyb9O6Es8YH1UV3mzTsrBK3B+7BI0QcHsL7M29UpYLv3gX\n7AuJZCDVfctFQokcZutm77EWq+a0gGm0QcXFXtwvZn0SaLl9uQpBCMWIDlSYBjDk\n0aoAIQECgYAfqxwy93/9Ib3hCT8Agft0vxYaycjyWtiu5Aw2hc2tKytWq+Lgi5Eb\npPSL18rxamvxMIJlERCmaoqJmDuNvGPtpv8TLlAz19ZEfrEIQs6J6k0G716cvZxE\nf4soMOrYismLOdHXsliIhvf4iHGnSMbIiN7jyjochK+maHTBE0GlOA==\n-----END RSA PRIVATE KEY-----\n"
267
  egress_per_port_policies:
268
  - port: {0}
269
    rules:
270
    - remote_policies: [ 1 ]
271
)EOF";
272

            
273
class CiliumTLSProxyIntegrationTest : public CiliumTLSIntegrationTest {
274
public:
275
  CiliumTLSProxyIntegrationTest()
276
7
      : CiliumTLSIntegrationTest(fmt::format(
277
7
            fmt::runtime(TestEnvironment::substitute(cilium_tls_tcp_proxy_config_fmt, GetParam())),
278
7
            "true")) {}
279

            
280
7
  std::string testPolicyFmt() override {
281
7
    return TestEnvironment::substitute(TCP_POLICY_UPSTREAM_TLS_fmt, GetParam());
282
7
  }
283
};
284

            
285
INSTANTIATE_TEST_SUITE_P(IpVersions, CiliumTLSProxyIntegrationTest,
286
                         testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
287
                         TestUtility::ipTestParamsToString);
288

            
289
// Test upstream writing before downstream does.
290
1
TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyUpstreamWritesFirst) {
291
1
  initialize();
292
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
293

            
294
1
  FakeRawConnectionPtr fake_upstream_connection;
295
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
296

            
297
1
  ASSERT_TRUE(fake_upstream_connection->write("hello"));
298
1
  tcp_client->waitForData("hello");
299

            
300
1
  ASSERT_TRUE(tcp_client->write("hello"));
301
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5));
302

            
303
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
304
1
  tcp_client->waitForHalfClose();
305
1
  ASSERT_TRUE(tcp_client->write("", true));
306
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
307
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
308
1
}
309

            
310
// Test proxying data in both directions, and that all data is flushed properly
311
// when there is an upstream disconnect.
312
1
TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyUpstreamDisconnect) {
313
1
  initialize();
314
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
315
1
  ASSERT_TRUE(tcp_client->write("hello"));
316

            
317
1
  FakeRawConnectionPtr fake_upstream_connection;
318
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
319

            
320
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5));
321
1
  ASSERT_TRUE(fake_upstream_connection->write("world"));
322
1
  ASSERT_TRUE(fake_upstream_connection->close());
323
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
324
1
  tcp_client->waitForHalfClose();
325
1
  tcp_client->close();
326

            
327
1
  EXPECT_EQ("world", tcp_client->data());
328
1
}
329

            
330
// Test proxying data in both directions, and that all data is flushed properly
331
// when the client disconnects.
332
1
TEST_P(CiliumTLSProxyIntegrationTest, CiliumTcpProxyDownstreamDisconnect) {
333
1
  initialize();
334
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
335
1
  ASSERT_TRUE(tcp_client->write("hello"));
336

            
337
1
  FakeRawConnectionPtr fake_upstream_connection;
338
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
339

            
340
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5));
341
1
  ASSERT_TRUE(fake_upstream_connection->write("world"));
342
1
  tcp_client->waitForData("world");
343
1
  ASSERT_TRUE(tcp_client->write("hello", true));
344
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(10));
345
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
346
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
347
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
348
1
  tcp_client->waitForDisconnect();
349
1
}
350

            
351
1
TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyLargeWrite) {
352
1
  config_helper_.setBufferLimits(1024, 1024);
353
1
  initialize();
354

            
355
1
  std::string data(1024 * 16, 'a');
356
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
357
1
  ASSERT_TRUE(tcp_client->write(data));
358

            
359
1
  FakeRawConnectionPtr fake_upstream_connection;
360
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
361

            
362
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(data.size()));
363
1
  ASSERT_TRUE(fake_upstream_connection->write(data));
364
1
  tcp_client->waitForData(data);
365
1
  tcp_client->close();
366
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
367
1
  ASSERT_TRUE(fake_upstream_connection->close());
368
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
369

            
370
1
  uint32_t upstream_pauses =
371
1
      test_server_->counter("cluster.tls-cluster.upstream_flow_control_paused_reading_total")
372
1
          ->value();
373
1
  uint32_t upstream_resumes =
374
1
      test_server_->counter("cluster.tls-cluster.upstream_flow_control_resumed_reading_total")
375
1
          ->value();
376
1
  EXPECT_EQ(upstream_pauses, upstream_resumes);
377

            
378
1
  uint32_t downstream_pauses =
379
1
      test_server_->counter("tcp.tcp_stats.downstream_flow_control_paused_reading_total")->value();
380
1
  uint32_t downstream_resumes =
381
1
      test_server_->counter("tcp.tcp_stats.downstream_flow_control_resumed_reading_total")->value();
382
1
  EXPECT_EQ(downstream_pauses, downstream_resumes);
383
1
}
384

            
385
// Test that a downstream flush works correctly (all data is flushed)
386
1
TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyDownstreamFlush) {
387
  // Use a very large size to make sure it is larger than the kernel socket read
388
  // buffer.
389
1
  const uint32_t size = 50 * 1024 * 1024;
390
1
  config_helper_.setBufferLimits(size / 4, size / 4);
391
1
  initialize();
392

            
393
1
  std::string data(size, 'a');
394
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
395

            
396
1
  FakeRawConnectionPtr fake_upstream_connection;
397
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
398

            
399
1
  tcp_client->readDisable(true);
400
1
  ASSERT_TRUE(tcp_client->write("", true));
401

            
402
  // This ensures that readDisable(true) has been run on it's thread
403
  // before tcp_client starts writing.
404
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
405

            
406
1
  ASSERT_TRUE(fake_upstream_connection->write(data, true));
407

            
408
1
  test_server_->waitForCounterGe("cluster.tls-cluster.upstream_flow_control_paused_reading_total",
409
1
                                 1);
410
1
  EXPECT_EQ(test_server_->counter("cluster.tls-cluster.upstream_flow_control_resumed_reading_total")
411
1
                ->value(),
412
1
            0);
413
1
  tcp_client->readDisable(false);
414
1
  tcp_client->waitForData(data);
415
1
  tcp_client->waitForHalfClose();
416
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
417

            
418
1
  uint32_t upstream_pauses =
419
1
      test_server_->counter("cluster.tls-cluster.upstream_flow_control_paused_reading_total")
420
1
          ->value();
421
1
  uint32_t upstream_resumes =
422
1
      test_server_->counter("cluster.tls-cluster.upstream_flow_control_resumed_reading_total")
423
1
          ->value();
424
1
  EXPECT_GE(upstream_pauses, upstream_resumes);
425
1
  EXPECT_GT(upstream_resumes, 0);
426
1
}
427

            
428
// Test that an upstream flush works correctly (all data is flushed)
429
1
TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyUpstreamFlush) {
430
  // Use a very large size to make sure it is larger than the kernel socket read
431
  // buffer.
432
1
  const uint32_t size = 50 * 1024 * 1024;
433
1
  config_helper_.setBufferLimits(size, size);
434
1
  initialize();
435

            
436
1
  std::string data(size, 'a');
437
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
438

            
439
1
  FakeRawConnectionPtr fake_upstream_connection;
440
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
441

            
442
  // Disabling read does not let the TLS handshake to finish. We should be able
443
  // to wait for ConnectionEvent::Connected, which is raised after the TLS
444
  // handshake has completed, but just wait for a while instead for now.
445
1
  usleep(100000); // NO_CHECK_FORMAT(real_time)
446

            
447
1
  ASSERT_TRUE(fake_upstream_connection->readDisable(true));
448
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
449

            
450
  // This ensures that fake_upstream_connection->readDisable has been run on
451
  // it's thread before tcp_client starts writing.
452
1
  tcp_client->waitForHalfClose();
453

            
454
1
  ASSERT_TRUE(tcp_client->write(data, true, true, std::chrono::milliseconds(30000)));
455

            
456
1
  test_server_->waitForGaugeEq("tcp.tcp_stats.upstream_flush_active", 1);
457
1
  ASSERT_TRUE(fake_upstream_connection->readDisable(false));
458
1
  ASSERT_TRUE(
459
1
      fake_upstream_connection->waitForData(data.size(), nullptr, 3 * TestUtility::DefaultTimeout));
460
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
461
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
462

            
463
1
  tcp_client->waitForHalfClose();
464

            
465
1
  EXPECT_EQ(test_server_->counter("tcp.tcp_stats.upstream_flush_total")->value(), 1);
466
1
  test_server_->waitForGaugeEq("tcp.tcp_stats.upstream_flush_active", 0);
467
1
}
468

            
469
// Test that Envoy doesn't crash or assert when shutting down with an upstream
470
// flush active
471
1
TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyUpstreamFlushEnvoyExit) {
472
  // Use a very large size to make sure it is larger than the kernel socket read
473
  // buffer.
474
1
  const uint32_t size = 50 * 1024 * 1024;
475
1
  config_helper_.setBufferLimits(size, size);
476
1
  initialize();
477

            
478
1
  std::string data(size, 'a');
479
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
480

            
481
1
  FakeRawConnectionPtr fake_upstream_connection;
482
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
483

            
484
  // Disabling read does not let the TLS handshake to finish. We should be able
485
  // to wait for ConnectionEvent::Connected, which is raised after the TLS
486
  // handshake has completed, but just wait for a while instead for now.
487
1
  usleep(100000); // NO_CHECK_FORMAT(real_time)
488

            
489
1
  ASSERT_TRUE(fake_upstream_connection->readDisable(true));
490
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
491

            
492
  // This ensures that fake_upstream_connection->readDisable has been run on
493
  // it's thread before tcp_client starts writing.
494
1
  tcp_client->waitForHalfClose();
495

            
496
1
  ASSERT_TRUE(tcp_client->write(data, true));
497

            
498
1
  test_server_->waitForGaugeEq("tcp.tcp_stats.upstream_flush_active", 1);
499
1
  test_server_.reset();
500
1
  ASSERT_TRUE(fake_upstream_connection->close());
501
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
502

            
503
  // Success criteria is that no ASSERTs fire and there are no leaks.
504
1
}
505

            
506
//
507
// Cilium filters with TCP proxy & Upstream TLS
508
//
509

            
510
// params: is_ingress ("true", "false")
511
const std::string cilium_tls_downstream_tcp_proxy_config_fmt = R"EOF(
512
admin:
513
  address:
514
    socket_address:
515
      address: 127.0.0.1
516
      port_value: 0
517
static_resources:
518
  clusters:
519
  - name: tls-cluster
520
    type: ORIGINAL_DST
521
    lb_policy: CLUSTER_PROVIDED
522
    connect_timeout:
523
      seconds: 1
524
    transport_socket:
525
      name: "cilium.tls_wrapper"
526
      typed_config:
527
        "@type": type.googleapis.com/cilium.UpstreamTlsWrapperContext
528
  - name: xds-grpc-cilium
529
    connect_timeout:
530
      seconds: 5
531
    type: STATIC
532
    lb_policy: ROUND_ROBIN
533
    http2_protocol_options:
534
    load_assignment:
535
      cluster_name: xds-grpc-cilium
536
      endpoints:
537
      - lb_endpoints:
538
        - endpoint:
539
            address:
540
              pipe:
541
                path: /var/run/cilium/xds.sock
542
  listeners:
543
    name: listener_0
544
    address:
545
      socket_address:
546
        address: 127.0.0.1
547
        port_value: 0
548
    listener_filters:
549
    - name: test_bpf_metadata
550
      typed_config:
551
        "@type": type.googleapis.com/cilium.TestBpfMetadata
552
        is_ingress: {0}
553
    - name: "envoy.filters.listener.tls_inspector"
554
      typed_config:
555
        "@type": type.googleapis.com/envoy.extensions.filters.listener.tls_inspector.v3.TlsInspector
556
    filter_chains:
557
    - filters:
558
      - name: cilium.network
559
        typed_config:
560
          "@type": type.googleapis.com/cilium.NetworkFilter
561
          proxylib: "proxylib/libcilium.so"
562
      - name: envoy.tcp_proxy
563
        typed_config:
564
          "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
565
          stat_prefix: tcp_stats
566
          cluster: tls-cluster
567
    - filter_chain_match:
568
        transport_protocol: "tls"
569
      transport_socket:
570
        name: "cilium.tls_wrapper"
571
        typed_config:
572
          "@type": type.googleapis.com/cilium.DownstreamTlsWrapperContext
573
      filters:
574
      - name: cilium.network
575
        typed_config:
576
          "@type": type.googleapis.com/cilium.NetworkFilter
577
          proxylib: "proxylib/libcilium.so"
578
      - name: envoy.tcp_proxy
579
        typed_config:
580
          "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
581
          stat_prefix: tcp_stats
582
          cluster: tls-cluster
583
)EOF";
584

            
585
class CiliumDownstreamTLSIntegrationTest : public CiliumTLSIntegrationTest {
586
public:
587
  CiliumDownstreamTLSIntegrationTest()
588
4
      : CiliumTLSIntegrationTest(
589
4
            fmt::format(fmt::runtime(TestEnvironment::substitute(
590
4
                            cilium_tls_downstream_tcp_proxy_config_fmt, GetParam())),
591
4
                        "true")) {}
592

            
593
4
  std::string testPolicyFmt() override {
594
4
    return TestEnvironment::substitute(TCP_POLICY_UPSTREAM_TLS_fmt, GetParam());
595
4
  }
596
};
597

            
598
INSTANTIATE_TEST_SUITE_P(IpVersions, CiliumDownstreamTLSIntegrationTest,
599
                         testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
600
                         TestUtility::ipTestParamsToString);
601

            
602
1
TEST_P(CiliumDownstreamTLSIntegrationTest, SendTlsToTlsListener) {
603
1
  setupConnections();
604
1
  sendAndReceiveTlsData("hello", "world");
605
1
}
606

            
607
1
TEST_P(CiliumDownstreamTLSIntegrationTest, LargeBidirectionalTlsWrites) {
608
1
  setupConnections();
609
1
  std::string large_data(1024 * 8, 'a');
610
1
  sendAndReceiveTlsData(large_data, large_data);
611
1
}
612

            
613
// Test that a half-close on the downstream side is proxied correctly.
614
1
TEST_P(CiliumDownstreamTLSIntegrationTest, DownstreamHalfClose) {
615
1
  setupConnections();
616

            
617
1
  FakeRawConnectionPtr fake_upstream_connection;
618
1
  AssertionResult result = fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection);
619
1
  RELEASE_ASSERT(result, result.message());
620

            
621
1
  Buffer::OwnedImpl empty_buffer;
622
1
  ssl_client_->write(empty_buffer, true);
623
1
  dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
624
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
625

            
626
1
  const std::string data("data");
627
1
  ASSERT_TRUE(fake_upstream_connection->write(data, false));
628
1
  payload_reader_->setDataToWaitFor(data);
629
1
  ssl_client_->dispatcher().run(Event::Dispatcher::RunType::Block);
630
1
  EXPECT_FALSE(payload_reader_->readLastByte());
631

            
632
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
633
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
634
1
  ssl_client_->dispatcher().run(Event::Dispatcher::RunType::Block);
635
1
  EXPECT_TRUE(payload_reader_->readLastByte());
636
1
  EXPECT_TRUE(connect_callbacks_.closed());
637
1
}
638

            
639
// Test that a half-close on the upstream side is proxied correctly.
640
1
TEST_P(CiliumDownstreamTLSIntegrationTest, UpstreamHalfClose) {
641
1
  setupConnections();
642

            
643
1
  FakeRawConnectionPtr fake_upstream_connection;
644
1
  AssertionResult result = fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection);
645
1
  RELEASE_ASSERT(result, result.message());
646

            
647
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
648
1
  ssl_client_->dispatcher().run(Event::Dispatcher::RunType::Block);
649
1
  EXPECT_TRUE(payload_reader_->readLastByte());
650
1
  EXPECT_FALSE(connect_callbacks_.closed());
651

            
652
1
  const std::string& val("data");
653
1
  Buffer::OwnedImpl buffer(val);
654
1
  ssl_client_->write(buffer, false);
655
2
  while (client_write_buffer_->bytesDrained() != val.size()) {
656
1
    dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
657
1
  }
658
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(val.size()));
659

            
660
1
  Buffer::OwnedImpl empty_buffer;
661
1
  ssl_client_->write(empty_buffer, true);
662
2
  while (!connect_callbacks_.closed()) {
663
1
    dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
664
1
  }
665
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
666
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
667
1
}
668

            
669
} // namespace Cilium
670
} // namespace Envoy