1
#include <fmt/base.h>
2
#include <fmt/format.h>
3
#include <gtest/gtest-param-test.h>
4
#include <gtest/gtest.h>
5

            
6
#include <chrono>
7
#include <cstdint>
8
#include <string>
9

            
10
#include "test/integration/fake_upstream.h"
11
#include "test/integration/integration_tcp_client.h"
12
#include "test/test_common/environment.h"
13
#include "test/test_common/utility.h"
14

            
15
#include "tests/cilium_tcp_integration.h"
16

            
17
namespace Envoy {
18

            
19
//
20
// Cilium filters with TCP proxy
21
//
22

            
23
// params: is_ingress ("true", "false")
24
const std::string cilium_tcp_proxy_config_fmt = R"EOF(
25
admin:
26
  address:
27
    socket_address:
28
      address: 127.0.0.1
29
      port_value: 0
30
static_resources:
31
  clusters:
32
  - name: cluster1
33
    connect_timeout: 5s
34
    type: STATIC
35
    load_assignment:
36
      cluster_name: internal-cluster
37
      endpoints:
38
      - lb_endpoints:
39
        - endpoint:
40
            address:
41
              socket_address:
42
                address: 127.0.0.1
43
                port_value: 0
44
  - name: xds-grpc-cilium
45
    connect_timeout:
46
      seconds: 5
47
    type: STATIC
48
    lb_policy: ROUND_ROBIN
49
    http2_protocol_options:
50
    load_assignment:
51
      cluster_name: xds-grpc-cilium
52
      endpoints:
53
      - lb_endpoints:
54
        - endpoint:
55
            address:
56
              pipe:
57
                path: /var/run/cilium/xds.sock
58
  - name: internal-cluster
59
    connect_timeout: 5s
60
    type: STATIC
61
    load_assignment:
62
      cluster_name: internal-cluster
63
      endpoints:
64
      - lb_endpoints:
65
        - endpoint:
66
            address:
67
              socket_address:
68
                address: 127.0.0.1
69
                port_value: 11111
70
  - name: internal-cluster2
71
    connect_timeout: 5s
72
    type: STATIC
73
    load_assignment:
74
      cluster_name: internal-cluster
75
      endpoints:
76
      - lb_endpoints:
77
        - endpoint:
78
            address:
79
              socket_address:
80
                address: 127.0.0.1
81
                port_value: 22222
82
  listeners:
83
  - name: listener_0
84
    address:
85
      socket_address:
86
        address: 127.0.0.1
87
        port_value: 0
88
    filter_chains:
89
      filters:
90
      - name: cilium.network.websocket.client
91
        typed_config:
92
          "@type": type.googleapis.com/cilium.WebSocketClient
93
          access_log_path: "{{ test_udsdir }}/access_log.sock"
94
          origin: "jarno.cilium.rocks"
95
          host: "jarno.cilium.rocks"
96
          ping_interval:
97
            nanos: 1000000
98
          ping_when_idle: true
99
      - name: envoy.tcp_proxy
100
        typed_config:
101
          "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
102
          stat_prefix: tcp_stats
103
          cluster: internal-cluster
104
  - name: internal-listener
105
    address:
106
      socket_address:
107
        address: 127.0.0.1
108
        port_value: 11111
109
    listener_filters:
110
      name: test_bpf_metadata
111
      typed_config:
112
        "@type": type.googleapis.com/cilium.TestBpfMetadata
113
        is_ingress: {0}
114
    filter_chains:
115
    - filters:
116
      - name: cilium.network
117
        typed_config:
118
          "@type": type.googleapis.com/cilium.NetworkFilter
119
      - name: envoy.http_connection_manager
120
        typed_config:
121
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
122
          stat_prefix: config_test
123
          upgrade_configs:
124
          - upgrade_type: websocket
125
          codec_type: auto
126
          use_remote_address: true
127
          skip_xff_append: true
128
          http_filters:
129
          - name: test_l7policy
130
            typed_config:
131
              "@type": type.googleapis.com/cilium.L7Policy
132
              access_log_path: "{{ test_udsdir }}/access_log.sock"
133
          - name: envoy.filters.http.router
134
            typed_config:
135
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
136
          route_config:
137
            name: policy_enabled
138
            virtual_hosts:
139
              name: integration
140
              domains: "*"
141
              routes:
142
              - route:
143
                  cluster: internal-cluster2
144
                  max_grpc_timeout:
145
                    seconds: 0
146
                    nanos: 0
147
                match:
148
                  prefix: "/"
149
  - name: internal-listener2
150
    address:
151
      socket_address:
152
        address: 127.0.0.1
153
        port_value: 22222
154
    filter_chains:
155
    - filters:
156
      - name: cilium.network.websocket.server
157
        typed_config:
158
          "@type": type.googleapis.com/cilium.WebSocketServer
159
          access_log_path: "{{ test_udsdir }}/access_log.sock"
160
          origin: "jarno.cilium.rocks"
161
      - name: envoy.filters.network.tcp_proxy
162
        typed_config:
163
          "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
164
          stat_prefix: tcp_stats
165
          cluster: cluster1
166
)EOF";
167

            
168
class CiliumWebSocketIntegrationTest : public CiliumTcpIntegrationTest {
169
public:
170
  CiliumWebSocketIntegrationTest()
171
4
      : CiliumTcpIntegrationTest(fmt::format(
172
4
            fmt::runtime(TestEnvironment::substitute(cilium_tcp_proxy_config_fmt, GetParam())),
173
4
            "true")) {
174
4
    config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
175
      // replace 0 port in "cluster1" with the fake backend port
176
4
      bootstrap.mutable_static_resources()
177
4
          ->mutable_clusters(0)
178
4
          ->mutable_load_assignment()
179
4
          ->mutable_endpoints(0)
180
4
          ->mutable_lb_endpoints(0)
181
4
          ->mutable_endpoint()
182
4
          ->mutable_address()
183
4
          ->mutable_socket_address()
184
4
          ->set_port_value(fake_upstreams_[0]->localAddress()->ip()->port());
185
4
    });
186
4
  }
187

            
188
4
  std::string testPolicyFmt() override {
189
4
    return TestEnvironment::substitute(R"EOF(version_info: "0"
190
4
resources:
191
4
- "@type": type.googleapis.com/cilium.NetworkPolicy
192
4
  endpoint_ips:
193
4
  - '{{ ntop_ip_loopback_address }}'
194
4
  policy: 3
195
4
  ingress_per_port_policies:
196
4
  - port: {0}
197
4
    rules:
198
4
    - remote_policies: [ 1 ]
199
4
  egress_per_port_policies:
200
4
  - port: {0}
201
4
    rules:
202
4
    - remote_policies: [ 1 ]
203
4
)EOF",
204
4
                                       GetParam());
205
4
  }
206
};
207

            
208
INSTANTIATE_TEST_SUITE_P(IpVersions, CiliumWebSocketIntegrationTest,
209
                         testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
210
                         TestUtility::ipTestParamsToString);
211

            
212
// Test upstream writing before downstream downstream does.
213
1
TEST_P(CiliumWebSocketIntegrationTest, CiliumWebSocketUpstreamWritesFirst) {
214
1
  initialize();
215
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
216
1
  FakeRawConnectionPtr fake_upstream_connection;
217
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
218

            
219
1
  test_server_->waitForCounterGe("websocket.ping_sent_count", 1);
220

            
221
1
  ASSERT_TRUE(fake_upstream_connection->write("hello"));
222
1
  tcp_client->waitForData("hello");
223

            
224
1
  ASSERT_TRUE(tcp_client->write("hello"));
225
1
  std::string received;
226
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5, &received));
227
1
  ASSERT_EQ(received, "hello");
228

            
229
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
230
1
  tcp_client->waitForHalfClose();
231
1
  ASSERT_TRUE(tcp_client->write("", true));
232
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
233
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
234
1
}
235

            
236
// Test proxying data in both directions, and that all data is flushed properly
237
// when there is an upstream disconnect.
238
1
TEST_P(CiliumWebSocketIntegrationTest, CiliumWebSocketUpstreamDisconnect) {
239
1
  initialize();
240
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
241
1
  ASSERT_TRUE(tcp_client->write("hello"));
242
1
  FakeRawConnectionPtr fake_upstream_connection;
243
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
244

            
245
1
  std::string received;
246
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5, &received));
247
1
  ASSERT_EQ(received, "hello");
248

            
249
1
  test_server_->waitForCounterGe("websocket.ping_sent_count", 1);
250

            
251
1
  ASSERT_TRUE(fake_upstream_connection->write("world"));
252
1
  ASSERT_TRUE(fake_upstream_connection->close());
253
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
254
1
  tcp_client->waitForHalfClose();
255
1
  tcp_client->close();
256

            
257
1
  EXPECT_EQ("world", tcp_client->data());
258
1
}
259

            
260
// Test proxying data in both directions, and that all data is flushed properly
261
// when the client disconnects.
262
1
TEST_P(CiliumWebSocketIntegrationTest, CiliumWebSocketDownstreamDisconnect) {
263
1
  initialize();
264
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
265
1
  ASSERT_TRUE(tcp_client->write("hello"));
266
1
  FakeRawConnectionPtr fake_upstream_connection;
267
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
268

            
269
1
  std::string received;
270
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5, &received));
271
1
  ASSERT_EQ(received, "hello");
272
1
  ASSERT_TRUE(fake_upstream_connection->write("world"));
273
1
  tcp_client->waitForData("world");
274

            
275
1
  test_server_->waitForCounterGe("websocket.ping_sent_count", 1);
276

            
277
1
  ASSERT_TRUE(tcp_client->write("hello", true));
278
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(10, &received));
279
1
  ASSERT_EQ(received, "hellohello");
280
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
281
1
  ASSERT_TRUE(fake_upstream_connection->write("", true));
282
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
283
1
  tcp_client->waitForDisconnect();
284
1
}
285

            
286
1
TEST_P(CiliumWebSocketIntegrationTest, CiliumWebSocketLargeWrite) {
287
1
  config_helper_.setBufferLimits(1024, 1024);
288
1
  initialize();
289

            
290
1
  std::string data(1024 * 16, 'a');
291
1
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
292
1
  ASSERT_TRUE(tcp_client->write(data));
293
1
  FakeRawConnectionPtr fake_upstream_connection;
294
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
295

            
296
1
  std::string received;
297
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(data.size(), &received));
298
1
  ASSERT_EQ(received, data);
299
1
  ASSERT_TRUE(fake_upstream_connection->write(data));
300
1
  tcp_client->waitForData(data);
301

            
302
1
  test_server_->waitForCounterGe("websocket.ping_sent_count", 1);
303

            
304
1
  tcp_client->close();
305
1
  ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
306
1
  ASSERT_TRUE(fake_upstream_connection->close());
307
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
308

            
309
1
  uint32_t upstream_pauses =
310
1
      test_server_->counter("cluster.cluster1.upstream_flow_control_paused_reading_total")->value();
311
1
  uint32_t upstream_resumes =
312
1
      test_server_->counter("cluster.cluster1.upstream_flow_control_resumed_reading_total")
313
1
          ->value();
314
1
  EXPECT_EQ(upstream_pauses, upstream_resumes);
315
1
  uint32_t downstream_pauses =
316
1
      test_server_->counter("tcp.tcp_stats.downstream_flow_control_paused_reading_total")->value();
317
1
  uint32_t downstream_resumes =
318
1
      test_server_->counter("tcp.tcp_stats.downstream_flow_control_resumed_reading_total")->value();
319
  // Since we are receiving early data, downstream connection will already be read
320
  // disabled so downstream pause metric is not emitted when upstream buffer hits high
321
  // watermark. When the upstream buffer watermark goes down, downstream will be read
322
  // enabled and downstream resume metric will be emitted.
323
1
  EXPECT_EQ(downstream_pauses, 2);
324
1
  EXPECT_EQ(downstream_resumes, 2);
325
1
}
326

            
327
} // namespace Envoy