1
#include <fmt/base.h>
2
#include <fmt/format.h>
3
#include <gtest/gtest-param-test.h>
4
#include <gtest/gtest.h>
5

            
6
#include <chrono>
7
#include <cstdint>
8
#include <string>
9

            
10
#include "test/integration/fake_upstream.h"
11
#include "test/integration/integration_tcp_client.h"
12
#include "test/test_common/environment.h"
13
#include "test/test_common/utility.h"
14

            
15
#include "tests/cilium_tcp_integration.h"
16

            
17
namespace Envoy {
18

            
19
//
20
// Cilium filters with TCP proxy
21
//
22

            
23
// params: is_ingress ("true", "false")
24
const std::string cilium_tcp_proxy_config_fmt = R"EOF(
25
admin:
26
  address:
27
    socket_address:
28
      address: 127.0.0.1
29
      port_value: 0
30
static_resources:
31
  clusters:
32
  - name: cluster1
33
    type: ORIGINAL_DST
34
    lb_policy: CLUSTER_PROVIDED
35
    connect_timeout:
36
      seconds: 1
37
  - name: xds-grpc-cilium
38
    connect_timeout:
39
      seconds: 5
40
    type: STATIC
41
    lb_policy: ROUND_ROBIN
42
    http2_protocol_options:
43
    load_assignment:
44
      cluster_name: xds-grpc-cilium
45
      endpoints:
46
      - lb_endpoints:
47
        - endpoint:
48
            address:
49
              pipe:
50
                path: /var/run/cilium/xds.sock
51
  listeners:
52
    name: listener_0
53
    address:
54
      socket_address:
55
        address: 127.0.0.1
56
        port_value: 0
57
    listener_filters:
58
      name: test_bpf_metadata
59
      typed_config:
60
        "@type": type.googleapis.com/cilium.TestBpfMetadata
61
        is_ingress: {0}
62
    filter_chains:
63
      filters:
64
      - name: cilium.network
65
        typed_config:
66
          "@type": type.googleapis.com/cilium.NetworkFilter
67
      - name: cilium.network.websocket.client
68
        typed_config:
69
          "@type": type.googleapis.com/cilium.WebSocketClient
70
          access_log_path: "{{ test_udsdir }}/access_log.sock"
71
          origin: "jarno.cilium.rocks"
72
          host: "jarno.cilium.rocks"
73
          ping_interval:
74
            nanos: 1000000
75
          ping_when_idle: true
76
      - name: cilium.network.websocket.server
77
        typed_config:
78
          "@type": type.googleapis.com/cilium.WebSocketServer
79
          access_log_path: "{{ test_udsdir }}/access_log.sock"
80
          origin: "jarno.cilium.rocks"
81
      - name: envoy.tcp_proxy
82
        typed_config:
83
          "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
84
          stat_prefix: tcp_stats
85
          cluster: cluster1
86
)EOF";
87

            
88
class CiliumWebSocketIntegrationTest : public CiliumTcpIntegrationTest {
89
public:
90
  CiliumWebSocketIntegrationTest()
91
7
      : CiliumTcpIntegrationTest(fmt::format(
92
7
            fmt::runtime(TestEnvironment::substitute(cilium_tcp_proxy_config_fmt, GetParam())),
93
7
            "true")) {}
94

            
95
7
  std::string testPolicyFmt() override {
96
7
    return TestEnvironment::substitute(R"EOF(version_info: "0"
97
7
resources:
98
7
- "@type": type.googleapis.com/cilium.NetworkPolicy
99
7
  endpoint_ips:
100
7
  - '{{ ntop_ip_loopback_address }}'
101
7
  policy: 3
102
7
  ingress_per_port_policies:
103
7
  - port: {0}
104
7
    rules:
105
7
    - remote_policies: [ 1 ]
106
7
  egress_per_port_policies:
107
7
  - port: {0}
108
7
    rules:
109
7
    - remote_policies: [ 1 ]
110
7
)EOF",
111
7
                                       GetParam());
112
7
  }
113
};
114

            
115
INSTANTIATE_TEST_SUITE_P(IpVersions, CiliumWebSocketIntegrationTest,
116
                         testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
117
                         TestUtility::ipTestParamsToString);
118

            
119
// Test upstream writing before downstream downstream does.
120
1
TEST_P(CiliumWebSocketIntegrationTest, CiliumWebSocketUpstreamWritesFirst) {
121
1
  initialize();
122
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
123
1
  FakeRawConnectionPtr fake_upstream_connection;
124
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
125

            
126
1
  test_server_->waitForCounterGe("websocket.ping_sent_count", 1);
127

            
128
1
  ASSERT_TRUE(fake_upstream_connection->write("hello"));
129
1
  tcp_client->waitForData("hello");
130

            
131
1
  ASSERT_TRUE(tcp_client->write("hello"));
132
1
  std::string received;
133
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5, &received));
134
1
  ASSERT_EQ(received, "hello");
135

            
136
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
137
1
  tcp_client->waitForHalfClose();
138
1
  ASSERT_TRUE(tcp_client->write("", true));
139
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
140
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
141
1
}
142

            
143
// Test proxying data in both directions, and that all data is flushed properly
144
// when there is an upstream disconnect.
145
1
TEST_P(CiliumWebSocketIntegrationTest, CiliumWebSocketUpstreamDisconnect) {
146
1
  initialize();
147
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
148
1
  ASSERT_TRUE(tcp_client->write("hello"));
149
1
  FakeRawConnectionPtr fake_upstream_connection;
150
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
151

            
152
1
  std::string received;
153
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5, &received));
154
1
  ASSERT_EQ(received, "hello");
155

            
156
1
  test_server_->waitForCounterGe("websocket.ping_sent_count", 1);
157

            
158
1
  ASSERT_TRUE(fake_upstream_connection->write("world"));
159
1
  ASSERT_TRUE(fake_upstream_connection->close());
160
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
161
1
  tcp_client->waitForHalfClose();
162
1
  tcp_client->close();
163

            
164
1
  EXPECT_EQ("world", tcp_client->data());
165
1
}
166

            
167
// Test proxying data in both directions, and that all data is flushed properly
168
// when the client disconnects.
169
1
TEST_P(CiliumWebSocketIntegrationTest, CiliumWebSocketDownstreamDisconnect) {
170
1
  initialize();
171
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
172
1
  ASSERT_TRUE(tcp_client->write("hello"));
173
1
  FakeRawConnectionPtr fake_upstream_connection;
174
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
175

            
176
1
  std::string received;
177
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5, &received));
178
1
  ASSERT_EQ(received, "hello");
179
1
  ASSERT_TRUE(fake_upstream_connection->write("world"));
180
1
  tcp_client->waitForData("world");
181

            
182
1
  test_server_->waitForCounterGe("websocket.ping_sent_count", 1);
183

            
184
1
  ASSERT_TRUE(tcp_client->write("hello", true));
185
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(10, &received));
186
1
  ASSERT_EQ(received, "hellohello");
187
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
188
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
189
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
190
1
  tcp_client->waitForDisconnect();
191
1
}
192

            
193
1
TEST_P(CiliumWebSocketIntegrationTest, CiliumWebSocketLargeWrite) {
194
1
  config_helper_.setBufferLimits(1024, 1024);
195
1
  initialize();
196

            
197
1
  std::string data(1024 * 16, 'a');
198
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
199
1
  ASSERT_TRUE(tcp_client->write(data));
200
1
  FakeRawConnectionPtr fake_upstream_connection;
201
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
202

            
203
1
  std::string received;
204
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(data.size(), &received));
205
1
  ASSERT_EQ(received, data);
206
1
  ASSERT_TRUE(fake_upstream_connection->write(data));
207
1
  tcp_client->waitForData(data);
208

            
209
1
  test_server_->waitForCounterGe("websocket.ping_sent_count", 1);
210

            
211
1
  tcp_client->close();
212
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
213
1
  ASSERT_TRUE(fake_upstream_connection->close());
214
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
215

            
216
1
  uint32_t upstream_pauses =
217
1
      test_server_->counter("cluster.cluster1.upstream_flow_control_paused_reading_total")->value();
218
1
  uint32_t upstream_resumes =
219
1
      test_server_->counter("cluster.cluster1.upstream_flow_control_resumed_reading_total")
220
1
          ->value();
221
1
  EXPECT_EQ(upstream_pauses, upstream_resumes);
222
1
  uint32_t downstream_pauses =
223
1
      test_server_->counter("tcp.tcp_stats.downstream_flow_control_paused_reading_total")->value();
224
1
  uint32_t downstream_resumes =
225
1
      test_server_->counter("tcp.tcp_stats.downstream_flow_control_resumed_reading_total")->value();
226
  // Since we are receiving early data, downstream connection will already be read
227
  // disabled so downstream pause metric is not emitted when upstream buffer hits high
228
  // watermark. When the upstream buffer watermark goes down, downstream will be read
229
  // enabled and downstream resume metric will be emitted.
230
1
  EXPECT_EQ(downstream_pauses, 0);
231
1
  EXPECT_EQ(downstream_resumes, 1);
232
1
}
233

            
234
// Test that a downstream flush works correctly (all data is flushed)
235
1
TEST_P(CiliumWebSocketIntegrationTest, CiliumWebSocketDownstreamFlush) {
236
  // Use a very large size to make sure it is larger than the kernel socket read
237
  // buffer.
238
1
  const uint32_t size = 50 * 1024 * 1024;
239
1
  config_helper_.setBufferLimits(size / 4, size / 4);
240
1
  enableHalfClose(true);
241
1
  initialize();
242

            
243
1
  std::string data(size, 'a');
244
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
245
1
  FakeRawConnectionPtr fake_upstream_connection;
246
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
247

            
248
1
  test_server_->waitForCounterGe("websocket.ping_sent_count", 1);
249

            
250
1
  tcp_client->readDisable(true);
251
1
  ASSERT_TRUE(tcp_client->write("", true));
252

            
253
  // This ensures that readDisable(true) has been run on it's thread
254
  // before tcp_client starts writing.
255
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
256

            
257
1
  ASSERT_TRUE(fake_upstream_connection->write(data, true));
258

            
259
1
  test_server_->waitForCounterGe("cluster.cluster1.upstream_flow_control_paused_reading_total", 1);
260
1
  EXPECT_EQ(test_server_->counter("cluster.cluster1.upstream_flow_control_resumed_reading_total")
261
1
                ->value(),
262
1
            0);
263
1
  tcp_client->readDisable(false);
264
1
  tcp_client->waitForData(data);
265
1
  tcp_client->waitForHalfClose();
266
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
267

            
268
1
  uint32_t upstream_pauses =
269
1
      test_server_->counter("cluster.cluster1.upstream_flow_control_paused_reading_total")->value();
270
1
  uint32_t upstream_resumes =
271
1
      test_server_->counter("cluster.cluster1.upstream_flow_control_resumed_reading_total")
272
1
          ->value();
273
1
  EXPECT_GE(upstream_pauses, upstream_resumes);
274
1
  EXPECT_GT(upstream_resumes, 0);
275
1
}
276

            
277
// Test that an upstream flush works correctly (all data is flushed)
278
1
TEST_P(CiliumWebSocketIntegrationTest, CiliumWebSocketUpstreamFlush) {
279
  // Use a very large size to make sure it is larger than the kernel socket read
280
  // buffer.
281
1
  const uint32_t size = 50 * 1024 * 1024;
282
1
  config_helper_.setBufferLimits(size, size);
283
1
  enableHalfClose(true);
284
1
  initialize();
285

            
286
1
  std::string data(size, 'a');
287
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
288
1
  FakeRawConnectionPtr fake_upstream_connection;
289
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
290

            
291
1
  test_server_->waitForCounterGe("websocket.ping_sent_count", 1);
292

            
293
1
  ASSERT_TRUE(fake_upstream_connection->readDisable(true));
294
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
295

            
296
  // This ensures that fake_upstream_connection->readDisable has been run on
297
  // it's thread before tcp_client starts writing.
298
1
  tcp_client->waitForHalfClose();
299

            
300
1
  ASSERT_TRUE(tcp_client->write(data, true, true, std::chrono::milliseconds(30000)));
301

            
302
1
  test_server_->waitForGaugeEq("tcp.tcp_stats.upstream_flush_active", 1);
303

            
304
1
  ASSERT_TRUE(fake_upstream_connection->readDisable(false));
305
1
  std::string received;
306
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(data.size(), &received));
307
1
  ASSERT_EQ(received, data);
308
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
309
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
310
1
  tcp_client->waitForHalfClose();
311

            
312
1
  EXPECT_EQ(test_server_->counter("tcp.tcp_stats.upstream_flush_total")->value(), 1);
313
1
  test_server_->waitForGaugeEq("tcp.tcp_stats.upstream_flush_active", 0);
314
1
}
315

            
316
// Test that Envoy doesn't crash or assert when shutting down with an upstream
317
// flush active
318
1
TEST_P(CiliumWebSocketIntegrationTest, CiliumWebSocketUpstreamFlushEnvoyExit) {
319
  // Use a very large size to make sure it is larger than the kernel socket read
320
  // buffer.
321
1
  const uint32_t size = 50 * 1024 * 1024;
322
1
  config_helper_.setBufferLimits(size, size);
323
1
  initialize();
324

            
325
1
  std::string data(size, 'a');
326
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
327
1
  FakeRawConnectionPtr fake_upstream_connection;
328
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
329

            
330
1
  ASSERT_TRUE(fake_upstream_connection->readDisable(true));
331
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
332

            
333
  // This ensures that fake_upstream_connection->readDisable has been run on
334
  // it's thread before tcp_client starts writing.
335
1
  tcp_client->waitForHalfClose();
336

            
337
1
  test_server_->waitForCounterGe("websocket.ping_sent_count", 1);
338

            
339
1
  ASSERT_TRUE(tcp_client->write(data, true));
340

            
341
1
  test_server_->waitForGaugeEq("tcp.tcp_stats.upstream_flush_active", 1);
342
1
  test_server_.reset();
343
1
  ASSERT_TRUE(fake_upstream_connection->close());
344
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
345

            
346
  // Success criteria is that no ASSERTs fire and there are no leaks.
347
1
}
348

            
349
} // namespace Envoy