1
#include <fmt/base.h>
2
#include <fmt/format.h>
3
#include <gtest/gtest-param-test.h>
4
#include <gtest/gtest.h>
5

            
6
#include <chrono>
7
#include <cstdint>
8
#include <cstring>
9
#include <string>
10

            
11
#include "test/integration/fake_upstream.h"
12
#include "test/integration/integration_tcp_client.h"
13
#include "test/test_common/environment.h"
14
#include "test/test_common/utility.h"
15

            
16
#include "absl/time/clock.h"
17
#include "absl/time/time.h"
18
#include "tests/cilium_tcp_integration.h"
19

            
20
namespace Envoy {
21

            
22
//
23
// Cilium filters with TCP proxy
24
//
25

            
26
// params: is_ingress ("true", "false")
27
const std::string cilium_tcp_proxy_config_fmt = R"EOF(
28
admin:
29
  address:
30
    socket_address:
31
      address: 127.0.0.1
32
      port_value: 0
33
static_resources:
34
  clusters:
35
  - name: cluster1
36
    type: ORIGINAL_DST
37
    lb_policy: CLUSTER_PROVIDED
38
    connect_timeout:
39
      seconds: 1
40
  - name: xds-grpc-cilium
41
    connect_timeout:
42
      seconds: 5
43
    type: STATIC
44
    lb_policy: ROUND_ROBIN
45
    http2_protocol_options:
46
    load_assignment:
47
      cluster_name: xds-grpc-cilium
48
      endpoints:
49
      - lb_endpoints:
50
        - endpoint:
51
            address:
52
              pipe:
53
                path: /var/run/cilium/xds.sock
54
  listeners:
55
    name: listener_0
56
    address:
57
      socket_address:
58
        address: 127.0.0.1
59
        port_value: 0
60
    listener_filters:
61
      name: test_bpf_metadata
62
      typed_config:
63
        "@type": type.googleapis.com/cilium.TestBpfMetadata
64
        is_ingress: {0}
65
    filter_chains:
66
      filters:
67
      - name: cilium.network
68
        typed_config:
69
          "@type": type.googleapis.com/cilium.NetworkFilter
70
          proxylib: "proxylib/libcilium.so"
71
      - name: envoy.tcp_proxy
72
        typed_config:
73
          "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
74
          stat_prefix: tcp_stats
75
          cluster: cluster1
76
)EOF";
77

            
78
class CiliumTcpProxyIntegrationTest : public CiliumTcpIntegrationTest {
79
public:
80
  CiliumTcpProxyIntegrationTest()
81
7
      : CiliumTcpIntegrationTest(fmt::format(
82
7
            fmt::runtime(TestEnvironment::substitute(cilium_tcp_proxy_config_fmt, GetParam())),
83
7
            "true")) {}
84
};
85

            
86
INSTANTIATE_TEST_SUITE_P(IpVersions, CiliumTcpProxyIntegrationTest,
87
                         testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
88
                         TestUtility::ipTestParamsToString);
89

            
90
// Test upstream writing before downstream downstream does.
91
1
TEST_P(CiliumTcpProxyIntegrationTest, CiliumTcpProxyUpstreamWritesFirst) {
92
1
  initialize();
93
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
94
1
  FakeRawConnectionPtr fake_upstream_connection;
95
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
96

            
97
1
  ASSERT_TRUE(fake_upstream_connection->write("hello"));
98
1
  tcp_client->waitForData("hello");
99

            
100
1
  ASSERT_TRUE(tcp_client->write("hello"));
101
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5));
102

            
103
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
104
1
  tcp_client->waitForHalfClose();
105
1
  ASSERT_TRUE(tcp_client->write("", true));
106
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
107
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
108
1
}
109

            
110
// Test proxying data in both directions, and that all data is flushed properly
111
// when there is an upstream disconnect.
112
1
TEST_P(CiliumTcpProxyIntegrationTest, CiliumTcpProxyUpstreamDisconnect) {
113
1
  initialize();
114
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
115
1
  ASSERT_TRUE(tcp_client->write("hello"));
116
1
  FakeRawConnectionPtr fake_upstream_connection;
117
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
118

            
119
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5));
120
1
  ASSERT_TRUE(fake_upstream_connection->write("world"));
121
1
  ASSERT_TRUE(fake_upstream_connection->close());
122
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
123
1
  tcp_client->waitForHalfClose();
124
1
  tcp_client->close();
125

            
126
1
  EXPECT_EQ("world", tcp_client->data());
127
1
}
128

            
129
// Test proxying data in both directions, and that all data is flushed properly
130
// when the client disconnects.
131
1
TEST_P(CiliumTcpProxyIntegrationTest, CiliumTcpProxyDownstreamDisconnect) {
132
1
  initialize();
133
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
134
1
  ASSERT_TRUE(tcp_client->write("hello"));
135
1
  FakeRawConnectionPtr fake_upstream_connection;
136
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
137

            
138
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5));
139
1
  ASSERT_TRUE(fake_upstream_connection->write("world"));
140
1
  tcp_client->waitForData("world");
141
1
  ASSERT_TRUE(tcp_client->write("hello", true));
142
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(10));
143
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
144
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
145
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
146
1
  tcp_client->waitForDisconnect();
147
1
}
148

            
149
1
TEST_P(CiliumTcpProxyIntegrationTest, CiliumTcpProxyLargeWrite) {
150
1
  config_helper_.setBufferLimits(1024, 1024);
151
1
  initialize();
152

            
153
1
  std::string data(1024 * 16, 'a');
154
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
155
1
  ASSERT_TRUE(tcp_client->write(data));
156
1
  FakeRawConnectionPtr fake_upstream_connection;
157
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
158

            
159
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(data.size()));
160
1
  ASSERT_TRUE(fake_upstream_connection->write(data));
161
1
  tcp_client->waitForData(data);
162
1
  tcp_client->close();
163
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
164
1
  ASSERT_TRUE(fake_upstream_connection->close());
165
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
166

            
167
1
  uint32_t upstream_pauses =
168
1
      test_server_->counter("cluster.cluster1.upstream_flow_control_paused_reading_total")->value();
169
1
  uint32_t upstream_resumes =
170
1
      test_server_->counter("cluster.cluster1.upstream_flow_control_resumed_reading_total")
171
1
          ->value();
172
1
  EXPECT_EQ(upstream_pauses, upstream_resumes);
173

            
174
1
  uint32_t downstream_pauses =
175
1
      test_server_->counter("tcp.tcp_stats.downstream_flow_control_paused_reading_total")->value();
176
1
  uint32_t downstream_resumes =
177
1
      test_server_->counter("tcp.tcp_stats.downstream_flow_control_resumed_reading_total")->value();
178
1
  EXPECT_EQ(downstream_pauses, downstream_resumes);
179
1
}
180

            
181
// Test that a downstream flush works correctly (all data is flushed)
182
1
TEST_P(CiliumTcpProxyIntegrationTest, CiliumTcpProxyDownstreamFlush) {
183
  // Use a very large size to make sure it is larger than the kernel socket read
184
  // buffer.
185
1
  const uint32_t size = 50 * 1024 * 1024;
186
1
  config_helper_.setBufferLimits(size / 4, size / 4);
187
1
  initialize();
188

            
189
1
  std::string data(size, 'a');
190
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
191
1
  FakeRawConnectionPtr fake_upstream_connection;
192
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
193

            
194
1
  tcp_client->readDisable(true);
195
1
  ASSERT_TRUE(tcp_client->write("", true));
196

            
197
  // This ensures that readDisable(true) has been run on it's thread
198
  // before tcp_client starts writing.
199
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
200

            
201
1
  ASSERT_TRUE(fake_upstream_connection->write(data, true));
202

            
203
1
  test_server_->waitForCounterGe("cluster.cluster1.upstream_flow_control_paused_reading_total", 1);
204
1
  EXPECT_EQ(test_server_->counter("cluster.cluster1.upstream_flow_control_resumed_reading_total")
205
1
                ->value(),
206
1
            0);
207
1
  tcp_client->readDisable(false);
208
1
  tcp_client->waitForData(data);
209
1
  tcp_client->waitForHalfClose();
210
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
211

            
212
1
  uint32_t upstream_pauses =
213
1
      test_server_->counter("cluster.cluster1.upstream_flow_control_paused_reading_total")->value();
214
1
  uint32_t upstream_resumes =
215
1
      test_server_->counter("cluster.cluster1.upstream_flow_control_resumed_reading_total")
216
1
          ->value();
217
1
  EXPECT_GE(upstream_pauses, upstream_resumes);
218
1
  EXPECT_GT(upstream_resumes, 0);
219
1
}
220

            
221
// Test that an upstream flush works correctly (all data is flushed)
222
1
TEST_P(CiliumTcpProxyIntegrationTest, CiliumTcpProxyUpstreamFlush) {
223
  // Use a very large size to make sure it is larger than the kernel socket read
224
  // buffer.
225
1
  const uint32_t size = 50 * 1024 * 1024;
226
1
  config_helper_.setBufferLimits(size, size);
227
1
  initialize();
228

            
229
1
  std::string data(size, 'a');
230
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
231
1
  FakeRawConnectionPtr fake_upstream_connection;
232
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
233

            
234
1
  ASSERT_TRUE(fake_upstream_connection->readDisable(true));
235
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
236

            
237
  // This ensures that fake_upstream_connection->readDisable has been run on
238
  // it's thread before tcp_client starts writing.
239
1
  tcp_client->waitForHalfClose();
240

            
241
1
  ASSERT_TRUE(tcp_client->write(data, true, true, std::chrono::milliseconds(30000)));
242

            
243
1
  test_server_->waitForGaugeEq("tcp.tcp_stats.upstream_flush_active", 1);
244
1
  ASSERT_TRUE(fake_upstream_connection->readDisable(false));
245
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(data.size()));
246
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
247
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
248
1
  tcp_client->waitForHalfClose();
249

            
250
1
  EXPECT_EQ(test_server_->counter("tcp.tcp_stats.upstream_flush_total")->value(), 1);
251
1
  test_server_->waitForGaugeEq("tcp.tcp_stats.upstream_flush_active", 0);
252
1
}
253

            
254
// Test that Envoy doesn't crash or assert when shutting down with an upstream
255
// flush active
256
1
TEST_P(CiliumTcpProxyIntegrationTest, CiliumTcpProxyUpstreamFlushEnvoyExit) {
257
  // Use a very large size to make sure it is larger than the kernel socket read
258
  // buffer.
259
1
  const uint32_t size = 50 * 1024 * 1024;
260
1
  config_helper_.setBufferLimits(size, size);
261
1
  initialize();
262

            
263
1
  std::string data(size, 'a');
264
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
265
1
  FakeRawConnectionPtr fake_upstream_connection;
266
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
267

            
268
1
  ASSERT_TRUE(fake_upstream_connection->readDisable(true));
269
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
270

            
271
  // This ensures that fake_upstream_connection->readDisable has been run on
272
  // it's thread before tcp_client starts writing.
273
1
  tcp_client->waitForHalfClose();
274

            
275
1
  ASSERT_TRUE(tcp_client->write(data, true));
276

            
277
1
  test_server_->waitForGaugeEq("tcp.tcp_stats.upstream_flush_active", 1);
278
1
  test_server_.reset();
279
1
  ASSERT_TRUE(fake_upstream_connection->close());
280
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
281

            
282
  // Success criteria is that no ASSERTs fire and there are no leaks.
283
1
}
284

            
285
//
286
// Cilium Go test parser "linetester" with TCP proxy
287
//
288

            
289
// params: is_ingress ("true", "false")
290
const std::string cilium_linetester_config_fmt = R"EOF(
291
admin:
292
  address:
293
    socket_address:
294
      address: 127.0.0.1
295
      port_value: 0
296
static_resources:
297
  clusters:
298
  - name: cluster1
299
    type: ORIGINAL_DST
300
    lb_policy: CLUSTER_PROVIDED
301
    connect_timeout:
302
      seconds: 1
303
  - name: xds-grpc-cilium
304
    connect_timeout:
305
      seconds: 5
306
    type: STATIC
307
    lb_policy: ROUND_ROBIN
308
    http2_protocol_options:
309
    load_assignment:
310
      cluster_name: xds-grpc-cilium
311
      endpoints:
312
      - lb_endpoints:
313
        - endpoint:
314
            address:
315
              pipe:
316
                path: /var/run/cilium/xds.sock
317
  listeners:
318
    name: listener_0
319
    address:
320
      socket_address:
321
        address: 127.0.0.1
322
        port_value: 0
323
    listener_filters:
324
      name: test_bpf_metadata
325
      typed_config:
326
        "@type": type.googleapis.com/cilium.TestBpfMetadata
327
        is_ingress: {0}
328
    filter_chains:
329
      filters:
330
      - name: cilium.network
331
        typed_config:
332
          "@type": type.googleapis.com/cilium.NetworkFilter
333
          proxylib: "proxylib/libcilium.so"
334
      - name: envoy.tcp_proxy
335
        typed_config:
336
          "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
337
          stat_prefix: tcp_stats
338
          cluster: cluster1
339
)EOF";
340

            
341
const std::string TCP_POLICY_LINEPARSER_fmt = R"EOF(version_info: "0"
342
resources:
343
- "@type": type.googleapis.com/cilium.NetworkPolicy
344
  endpoint_ips:
345
  - '{{ ntop_ip_loopback_address }}'
346
  policy: 3
347
  ingress_per_port_policies:
348
  - port: 1
349
    end_port: {0}
350
    rules:
351
    - remote_policies: [ 1 ]
352
      l7_proto: "test.lineparser"
353
  egress_per_port_policies:
354
  - port: 1
355
    end_port: {0}
356
    rules:
357
    - remote_policies: [ 1 ]
358
      l7_proto: "test.lineparser"
359
)EOF";
360

            
361
class CiliumGoLinetesterIntegrationTest : public CiliumTcpIntegrationTest {
362
public:
363
  CiliumGoLinetesterIntegrationTest()
364
5
      : CiliumTcpIntegrationTest(fmt::format(
365
5
            fmt::runtime(TestEnvironment::substitute(cilium_linetester_config_fmt, GetParam())),
366
5
            "true")) {}
367

            
368
5
  std::string testPolicyFmt() override {
369
5
    return TestEnvironment::substitute(TCP_POLICY_LINEPARSER_fmt, GetParam());
370
5
  }
371
};
372

            
373
INSTANTIATE_TEST_SUITE_P(IpVersions, CiliumGoLinetesterIntegrationTest,
374
                         testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
375
                         TestUtility::ipTestParamsToString);
376

            
377
3
static FakeRawConnection::ValidatorFunction noMatch(const char* data_to_not_match) {
378
3
  return [data_to_not_match](const std::string& data) -> bool {
379
3
    auto found = data.find(data_to_not_match);
380
3
    return found == std::string::npos;
381
3
  };
382
3
}
383

            
384
1
TEST_P(CiliumGoLinetesterIntegrationTest, CiliumGoLineParserUpstreamWritesFirst) {
385
1
  initialize();
386
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
387
1
  FakeRawConnectionPtr fake_upstream_connection;
388
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
389

            
390
1
  ASSERT_TRUE(fake_upstream_connection->write("DROP reply direction\n"));
391
1
  ASSERT_TRUE(fake_upstream_connection->write("PASS reply direction\n"));
392
1
  tcp_client->waitForData("PASS reply direction\n");
393

            
394
1
  ASSERT_TRUE(tcp_client->write("PASS original direction\n"));
395
1
  ASSERT_TRUE(
396
1
      fake_upstream_connection->waitForData(FakeRawConnection::waitForInexactMatch("PASS")));
397

            
398
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
399
1
  tcp_client->waitForHalfClose();
400
1
  ASSERT_TRUE(tcp_client->write("", true));
401
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
402
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
403
1
}
404

            
405
1
TEST_P(CiliumGoLinetesterIntegrationTest, CiliumGoLineParserPartialLines) {
406
1
  initialize();
407
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
408
1
  FakeRawConnectionPtr fake_upstream_connection;
409
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
410

            
411
1
  ASSERT_TRUE(fake_upstream_connection->write("DROP reply "));
412
1
  absl::SleepFor(absl::Milliseconds(10));
413
1
  ASSERT_TRUE(fake_upstream_connection->write("direction\nPASS"));
414
1
  absl::SleepFor(absl::Milliseconds(10));
415
1
  ASSERT_TRUE(fake_upstream_connection->write(" reply direction\n"));
416
1
  tcp_client->waitForData("PASS reply direction\n");
417

            
418
1
  ASSERT_TRUE(tcp_client->write("PASS original direction\n"));
419
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(
420
1
      FakeRawConnection::waitForInexactMatch("PASS original direction\n")));
421

            
422
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
423
1
  tcp_client->waitForHalfClose();
424
1
  ASSERT_TRUE(tcp_client->write("", true));
425
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
426
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
427
1
}
428

            
429
1
TEST_P(CiliumGoLinetesterIntegrationTest, CiliumGoLineParserInject) {
430
1
  initialize();
431
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
432
1
  FakeRawConnectionPtr fake_upstream_connection;
433
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
434

            
435
1
  ASSERT_TRUE(tcp_client->write("INJECT reply direction\n"));
436
1
  ASSERT_TRUE(tcp_client->write("PASS original direction\n"));
437
1
  ASSERT_TRUE(fake_upstream_connection->write("PASS reply direction\n"));
438

            
439
  // These can in principle arrive in either order
440
1
  tcp_client->waitForData("PASS reply direction\n", false);
441
1
  tcp_client->waitForData("INJECT reply direction\n", false);
442

            
443
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(
444
1
      FakeRawConnection::waitForInexactMatch("PASS original direction\n")));
445

            
446
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
447
1
  tcp_client->waitForHalfClose();
448
1
  ASSERT_TRUE(tcp_client->write("", true));
449
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
450
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
451
1
}
452

            
453
1
TEST_P(CiliumGoLinetesterIntegrationTest, CiliumGoLineParserInjectPartial) {
454
1
  initialize();
455
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
456
1
  FakeRawConnectionPtr fake_upstream_connection;
457
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
458

            
459
1
  ASSERT_TRUE(fake_upstream_connection->write("PASS reply"));
460
1
  ASSERT_TRUE(tcp_client->write("INJECT reply direction\n"));
461
1
  ASSERT_TRUE(tcp_client->write("PASS original direction\n"));
462

            
463
1
  ASSERT_TRUE(fake_upstream_connection->write(" direction\n"));
464

            
465
  // These can in principle arrive in either order
466
1
  tcp_client->waitForData("PASS reply direction\n", false);
467
1
  tcp_client->waitForData("INJECT reply direction\n", false);
468

            
469
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(
470
1
      FakeRawConnection::waitForInexactMatch("PASS original direction\n")));
471

            
472
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
473
1
  tcp_client->waitForHalfClose();
474
1
  ASSERT_TRUE(tcp_client->write("", true));
475
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
476
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
477
1
}
478

            
479
1
TEST_P(CiliumGoLinetesterIntegrationTest, CiliumGoLineParserInjectPartialMultiple) {
480
1
  initialize();
481
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
482
1
  FakeRawConnectionPtr fake_upstream_connection;
483
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
484

            
485
1
  ASSERT_TRUE(fake_upstream_connection->write("PASS reply"));
486
1
  ASSERT_TRUE(tcp_client->write("INJECT reply direction\n"));
487
1
  ASSERT_TRUE(tcp_client->write("DROP original direction\n"));
488
1
  ASSERT_TRUE(tcp_client->write("INSERT original direction\n"));
489

            
490
1
  ASSERT_TRUE(fake_upstream_connection->write(" direction\n"));
491

            
492
  // These can in principle arrive in either order
493
1
  absl::SleepFor(absl::Milliseconds(10));
494
1
  tcp_client->waitForData("PASS reply direction\n", false);
495
1
  absl::SleepFor(absl::Milliseconds(10));
496
1
  tcp_client->waitForData("INJECT reply direction\n", false);
497

            
498
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(
499
1
      FakeRawConnection::waitForInexactMatch("INSERT original direction\n")));
500
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(noMatch("DROP")));
501

            
502
1
  ASSERT_TRUE(fake_upstream_connection->write("DROP reply direction\n"));
503
1
  ASSERT_TRUE(fake_upstream_connection->write("PASS2 reply direction\n"));
504
1
  tcp_client->waitForData("PASS2 reply direction\n", false);
505

            
506
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
507
1
  tcp_client->waitForHalfClose();
508
1
  ASSERT_TRUE(tcp_client->write("", true));
509
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
510
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
511
1
}
512

            
513
//
514
// Cilium Go test parser "blocktester" with TCP proxy
515
//
516

            
517
// params: is_ingress ("true", "false")
518
const std::string cilium_blocktester_config_fmt = R"EOF(
519
admin:
520
  address:
521
    socket_address:
522
      address: 127.0.0.1
523
      port_value: 0
524
static_resources:
525
  clusters:
526
  - name: cluster1
527
    type: ORIGINAL_DST
528
    lb_policy: CLUSTER_PROVIDED
529
    connect_timeout:
530
      seconds: 1
531
  - name: xds-grpc-cilium
532
    connect_timeout:
533
      seconds: 5
534
    type: STATIC
535
    lb_policy: ROUND_ROBIN
536
    http2_protocol_options:
537
    load_assignment:
538
      cluster_name: xds-grpc-cilium
539
      endpoints:
540
      - lb_endpoints:
541
        - endpoint:
542
            address:
543
              pipe:
544
                path: /var/run/cilium/xds.sock
545
  listeners:
546
    name: listener_0
547
    address:
548
      socket_address:
549
        address: 127.0.0.1
550
        port_value: 0
551
    listener_filters:
552
      name: test_bpf_metadata
553
      typed_config:
554
        "@type": type.googleapis.com/cilium.TestBpfMetadata
555
        is_ingress: {0}
556
    filter_chains:
557
      filters:
558
      - name: cilium.network
559
        typed_config:
560
          "@type": type.googleapis.com/cilium.NetworkFilter
561
          proxylib: "proxylib/libcilium.so"
562
          proxylib_params:
563
            access-log-path: "{{ test_udsdir }}/access_log.sock"
564
      - name: envoy.tcp_proxy
565
        typed_config:
566
          "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
567
          stat_prefix: tcp_stats
568
          cluster: cluster1
569
)EOF";
570

            
571
const std::string TCP_POLICY_BLOCKPARSER_fmt = R"EOF(version_info: "0"
572
resources:
573
- "@type": type.googleapis.com/cilium.NetworkPolicy
574
  endpoint_ips:
575
  - '{{ ntop_ip_loopback_address }}'
576
  policy: 3
577
  ingress_per_port_policies:
578
  - port: 1
579
    end_port: 65535
580
    rules:
581
    - remote_policies: [ 1 ]
582
      l7_proto: "test.blockparser"
583
  egress_per_port_policies:
584
  - port: 1
585
    end_port: 65535
586
    rules:
587
    - remote_policies: [ 1 ]
588
      l7_proto: "test.blockparser"
589
)EOF";
590

            
591
class CiliumGoBlocktesterIntegrationTest : public CiliumTcpIntegrationTest {
592
public:
593
  CiliumGoBlocktesterIntegrationTest()
594
6
      : CiliumTcpIntegrationTest(fmt::format(
595
6
            fmt::runtime(TestEnvironment::substitute(cilium_blocktester_config_fmt, GetParam())),
596
6
            "true")) {}
597

            
598
6
  std::string testPolicyFmt() override {
599
6
    return TestEnvironment::substitute(TCP_POLICY_BLOCKPARSER_fmt, GetParam());
600
6
  }
601
};
602

            
603
INSTANTIATE_TEST_SUITE_P(IpVersions, CiliumGoBlocktesterIntegrationTest,
604
                         testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
605
                         TestUtility::ipTestParamsToString);
606

            
607
1
TEST_P(CiliumGoBlocktesterIntegrationTest, CiliumGoBlockParserUpstreamWritesFirst) {
608
1
  initialize();
609
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
610
1
  FakeRawConnectionPtr fake_upstream_connection;
611
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
612

            
613
1
  ASSERT_TRUE(fake_upstream_connection->write("24:DROP reply direction\n"));
614
1
  ASSERT_TRUE(fake_upstream_connection->write("24:PASS reply direction\n"));
615
1
  tcp_client->waitForData("24:PASS reply direction\n");
616

            
617
1
  ASSERT_TRUE(tcp_client->write("27:PASS original direction\n"));
618
1
  ASSERT_TRUE(
619
1
      fake_upstream_connection->waitForData(FakeRawConnection::waitForInexactMatch("PASS")));
620

            
621
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
622
1
  tcp_client->waitForHalfClose();
623
1
  ASSERT_TRUE(tcp_client->write("", true));
624
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
625
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
626
1
}
627

            
628
1
TEST_P(CiliumGoBlocktesterIntegrationTest, CiliumGoBlockParserPartialBlocks) {
629
1
  initialize();
630
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
631
1
  FakeRawConnectionPtr fake_upstream_connection;
632
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
633

            
634
1
  ASSERT_TRUE(fake_upstream_connection->write("24:DROP reply "));
635
1
  ASSERT_TRUE(fake_upstream_connection->write("direction\n24:PASS"));
636
1
  ASSERT_TRUE(fake_upstream_connection->write(" reply direction\n"));
637
1
  tcp_client->waitForData("24:PASS reply direction\n");
638

            
639
1
  ASSERT_TRUE(tcp_client->write("27:PASS original direction\n"));
640
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(
641
1
      FakeRawConnection::waitForInexactMatch("27:PASS original direction\n")));
642

            
643
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
644
1
  tcp_client->waitForHalfClose();
645
1
  ASSERT_TRUE(tcp_client->write("", true));
646
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
647
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
648
1
}
649

            
650
1
TEST_P(CiliumGoBlocktesterIntegrationTest, CiliumGoBlockParserInject) {
651
1
  initialize();
652
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
653
1
  FakeRawConnectionPtr fake_upstream_connection;
654
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
655

            
656
1
  ASSERT_TRUE(tcp_client->write("26:INJECT reply direction\n"));
657
1
  ASSERT_TRUE(tcp_client->write("27:PASS original direction\n"));
658
1
  ASSERT_TRUE(fake_upstream_connection->write("24:PASS reply direction\n"));
659

            
660
  // These can in principle arrive in either order
661
1
  absl::SleepFor(absl::Milliseconds(10));
662
1
  tcp_client->waitForData("24:PASS reply direction\n", false);
663
1
  absl::SleepFor(absl::Milliseconds(10));
664
1
  tcp_client->waitForData("26:INJECT reply direction\n", false);
665

            
666
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(
667
1
      FakeRawConnection::waitForInexactMatch("27:PASS original direction\n")));
668

            
669
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
670
1
  tcp_client->waitForHalfClose();
671
1
  ASSERT_TRUE(tcp_client->write("", true));
672
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
673
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
674
1
}
675

            
676
1
TEST_P(CiliumGoBlocktesterIntegrationTest, CiliumGoBlockParserInjectPartial) {
677
1
  initialize();
678
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
679
1
  FakeRawConnectionPtr fake_upstream_connection;
680
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
681

            
682
1
  ASSERT_TRUE(fake_upstream_connection->write("24:PASS reply"));
683
1
  ASSERT_TRUE(tcp_client->write("26:INJECT reply direction\n"));
684
1
  ASSERT_TRUE(tcp_client->write("27:PASS original direction\n"));
685

            
686
1
  ASSERT_TRUE(fake_upstream_connection->write(" direction\n"));
687

            
688
  // These can in principle arrive in either order
689
1
  tcp_client->waitForData("24:PASS reply direction\n", false);
690
1
  tcp_client->waitForData("26:INJECT reply direction\n", false);
691

            
692
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(
693
1
      FakeRawConnection::waitForInexactMatch("27:PASS original direction\n")));
694

            
695
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
696
1
  tcp_client->waitForHalfClose();
697
1
  ASSERT_TRUE(tcp_client->write("", true));
698
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
699
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
700
1
}
701

            
702
1
TEST_P(CiliumGoBlocktesterIntegrationTest, CiliumGoBlockParserInjectPartialMultiple) {
703
1
  initialize();
704
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
705
1
  FakeRawConnectionPtr fake_upstream_connection;
706
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
707

            
708
1
  ASSERT_TRUE(fake_upstream_connection->write("24:PASS reply"));
709
1
  ASSERT_TRUE(tcp_client->write("26:INJECT reply direction\n"));
710
1
  ASSERT_TRUE(tcp_client->write("27:DROP original direction\n"));
711
1
  ASSERT_TRUE(tcp_client->write("29:INSERT original direction\n"));
712

            
713
1
  absl::SleepFor(absl::Milliseconds(100));
714
1
  ASSERT_TRUE(fake_upstream_connection->write(" dire"));
715

            
716
1
  absl::SleepFor(absl::Milliseconds(100));
717
1
  ASSERT_TRUE(fake_upstream_connection->write("ction\n"));
718

            
719
  // These can in principle arrive in either order
720
1
  tcp_client->waitForData("24:PASS reply direction\n", false);
721
1
  tcp_client->waitForData("26:INJECT reply direction\n", false);
722

            
723
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(
724
1
      FakeRawConnection::waitForInexactMatch("29:INSERT original direction\n")));
725
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(noMatch("DROP")));
726

            
727
1
  ASSERT_TRUE(fake_upstream_connection->write("24:DROP reply direction\n"));
728
1
  ASSERT_TRUE(fake_upstream_connection->write("25:PASS2 reply direction\n"));
729
1
  tcp_client->waitForData("25:PASS2 reply direction\n", false);
730

            
731
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
732
1
  tcp_client->waitForHalfClose();
733
1
  ASSERT_TRUE(tcp_client->write("", true));
734
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
735
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
736
1
}
737

            
738
1
TEST_P(CiliumGoBlocktesterIntegrationTest, CiliumGoBlockParserInjectBufferOverflow) {
739
1
  initialize();
740
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
741
1
  FakeRawConnectionPtr fake_upstream_connection;
742
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
743

            
744
1
  ASSERT_TRUE(tcp_client->write("26:INJECT reply direction\n"));
745
1
  ASSERT_TRUE(tcp_client->write("27:DROP original direction\n"));
746

            
747
1
  char buf[5000];
748
1
  memset(buf, 'A', sizeof buf);
749
1
  strncpy(buf, "5000:INSERT original direction", 30);
750
1
  buf[sizeof buf - 1] = '\n';
751

            
752
1
  ASSERT_TRUE(tcp_client->write(buf));
753
1
  tcp_client->waitForData("26:INJECT reply direction\n", false);
754

            
755
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(
756
1
      FakeRawConnection::waitForInexactMatch("INSERT original direction")));
757
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(noMatch("DROP")));
758

            
759
1
  ASSERT_TRUE(fake_upstream_connection->write("24:DROP reply direction\n"));
760
1
  ASSERT_TRUE(fake_upstream_connection->write("25:PASS2 reply direction\n"));
761
1
  tcp_client->waitForData("25:PASS2 reply direction\n", false);
762

            
763
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
764
1
  tcp_client->waitForHalfClose();
765
1
  ASSERT_TRUE(tcp_client->write("", true));
766
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
767
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
768
1
}
769

            
770
} // namespace Envoy