1
#include <fmt/base.h>
2
#include <fmt/format.h>
3
#include <gtest/gtest-param-test.h>
4
#include <gtest/gtest.h>
5

            
6
#include <cstddef>
7
#include <string>
8

            
9
#include "envoy/event/dispatcher.h"
10

            
11
#include "source/common/buffer/buffer_impl.h"
12

            
13
#include "test/integration/fake_upstream.h"
14
#include "test/integration/integration_stream_decoder.h"
15
#include "test/test_common/environment.h"
16
#include "test/test_common/utility.h"
17

            
18
#include "absl/strings/string_view.h"
19
#include "tests/bpf_metadata.h" // host_map_config, original_dst_address
20
#include "tests/cilium_http_integration.h"
21

            
22
using namespace std::literals;
23

            
24
namespace Envoy {
25

            
26
// params: is_ingress ("true", "false")
27
const std::string cilium_tcp_proxy_config_fmt = R"EOF(
28
admin:
29
  address:
30
    socket_address:
31
      address: 127.0.0.1
32
      port_value: 0
33
static_resources:
34
  clusters:
35
  - name: cluster1
36
    type: ORIGINAL_DST
37
    lb_policy: CLUSTER_PROVIDED
38
    connect_timeout:
39
      seconds: 1
40
  - name: xds-grpc-cilium
41
    connect_timeout:
42
      seconds: 5
43
    type: STATIC
44
    lb_policy: ROUND_ROBIN
45
    http2_protocol_options:
46
    load_assignment:
47
      cluster_name: xds-grpc-cilium
48
      endpoints:
49
      - lb_endpoints:
50
        - endpoint:
51
            address:
52
              pipe:
53
                path: /var/run/cilium/xds.sock
54
  listeners:
55
    name: http
56
    address:
57
      socket_address:
58
        address: 127.0.0.1
59
        port_value: 0
60
    listener_filters:
61
      name: test_bpf_metadata
62
      typed_config:
63
        "@type": type.googleapis.com/cilium.TestBpfMetadata
64
        is_ingress: {0}
65
    filter_chains:
66
      filters:
67
      - name: cilium.network.websocket.server
68
        typed_config:
69
          "@type": type.googleapis.com/cilium.WebSocketServer
70
          access_log_path: "{{ test_udsdir }}/access_log.sock"
71
          origin: "jarno.cilium.rocks"
72
      - name: cilium.network
73
        typed_config:
74
          "@type": type.googleapis.com/cilium.NetworkFilter
75
      - name: envoy.tcp_proxy
76
        typed_config:
77
          "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
78
          stat_prefix: tcp_stats
79
          cluster: cluster1
80
)EOF";
81

            
82
class CiliumWebSocketIntegrationTest : public CiliumHttpIntegrationTest {
83
public:
84
  CiliumWebSocketIntegrationTest()
85
2
      : CiliumHttpIntegrationTest(fmt::format(
86
2
            fmt::runtime(TestEnvironment::substitute(cilium_tcp_proxy_config_fmt, GetParam())),
87
2
            "false")) {
88
2
    host_map_config = R"EOF(version_info: "0"
89
2
resources:
90
2
- "@type": type.googleapis.com/cilium.NetworkPolicyHosts
91
2
  policy: 173
92
2
  host_addresses: [ "192.168.0.1", "f00d::1" ]
93
2
- "@type": type.googleapis.com/cilium.NetworkPolicyHosts
94
2
  policy: 1
95
2
  host_addresses: [ "127.0.0.0/8", "::/104" ]
96
2
)EOF";
97
2
  }
98

            
99
2
  std::string testPolicyFmt() override {
100
2
    return TestEnvironment::substitute(R"EOF(version_info: "0"
101
2
resources:
102
2
- "@type": type.googleapis.com/cilium.NetworkPolicy
103
2
  endpoint_ips:
104
2
  - '{{ ntop_ip_loopback_address }}'
105
2
  policy: 3
106
2
  ingress_per_port_policies:
107
2
  - port: {0}
108
2
    rules:
109
2
    - remote_policies: [ 1 ]
110
2
  egress_per_port_policies:
111
2
  - port: {0}
112
2
    rules:
113
2
    - remote_policies: [ 1 ]
114
2
)EOF",
115
2
                                       GetParam());
116
2
  }
117

            
118
1
  void denied(Http::TestRequestHeaderMapImpl&& headers) {
119
1
    codec_client_ = makeHttpConnection(lookupPort("http"));
120
1
    auto response = codec_client_->makeHeaderOnlyRequest(headers);
121
1
    ASSERT_TRUE(response->waitForEndStream());
122

            
123
1
    EXPECT_TRUE(response->complete());
124
1
    EXPECT_EQ("403", response->headers().getStatusValue());
125
1
    cleanupUpstreamAndDownstream();
126
1
  }
127
};
128

            
129
INSTANTIATE_TEST_SUITE_P(IpVersions, CiliumWebSocketIntegrationTest,
130
                         testing::ValuesIn(TestEnvironment::getIpVersionsForTest()));
131

            
132
1
TEST_P(CiliumWebSocketIntegrationTest, DeniedNonWebSocket) {
133
1
  initialize();
134
1
  denied({{":method", "GET"}, {":path", "/"}, {":authority", "host"}});
135
1
}
136

            
137
1
TEST_P(CiliumWebSocketIntegrationTest, AcceptedWebSocket) {
138
1
  initialize();
139
1
  auto request_headers = Http::TestRequestHeaderMapImpl{
140
1
      {":method", "GET"},
141
1
      {":path", "/"},
142
1
      {":authority", "host"},
143
1
      {"Upgrade", "websocket"},
144
1
      {"Connection", "Upgrade"},
145
1
      {"Origin", "jarno.cilium.rocks"},
146
1
      {"Sec-WebSocket-Key", "dGhlIHNhbXBsZSBub25jZQ=="},
147
1
      {"Sec-WebSocket-Version", "13"},
148
1
      {"x-request-id", "000000ff-0000-0000-0000-000000000001"},
149
1
      {"x-envoy-original-dst-host", original_dst_address->asString()}};
150
1
  codec_client_ = makeHttpConnection(lookupPort("http"));
151

            
152
1
  IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers);
153
1
  FakeRawConnectionPtr fake_upstream_connection;
154
1
  ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
155
  // Wait for the response to be read by the codec client.
156
1
  response->waitForHeaders();
157
1
  EXPECT_EQ("101", response->headers().getStatusValue());
158

            
159
1
  auto client_conn = codec_client_->connection();
160

            
161
  // Create websocket framed data & write it on the client connection
162
1
  Buffer::OwnedImpl buf{"\x82\x5"
163
1
                        "hello"};
164
1
  client_conn->write(buf, false);
165
  // Run the dispatcher so that the write event is handled
166
1
  client_conn->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
167

            
168
1
  std::string data;
169
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(5, &data));
170
1
  ASSERT_EQ(data, "hello");
171
1
  ASSERT_TRUE(fake_upstream_connection->write("world"));
172
  // There is no way to clear the fake upstream data, so we must keep track of how much of it
173
  // we already saw.
174
1
  auto seen_data_len = data.length();
175

            
176
1
  response->waitForBodyData(7);
177
1
  absl::string_view resp = response->body();
178
1
  ASSERT_EQ(resp.substr(0, 7), "\x82\x5"
179
1
                               "world");
180
1
  response->clearBody();
181

            
182
  // Send multiple frames back-to-back
183
1
  ASSERT_EQ(buf.length(), 0);
184
1
  buf.add("\x82\x6"
185
1
          "hello2"
186
1
          "\x82\x7"
187
1
          "hello21"
188
1
          "\x82\x3"
189
1
          "foo");
190
1
  client_conn->write(buf, false);
191
  // Run the dispatcher so that the write event is handled
192
1
  client_conn->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
193

            
194
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(seen_data_len + 16, &data));
195
1
  ASSERT_EQ(data.substr(seen_data_len), "hello2hello21foo");
196
1
  seen_data_len = data.length();
197

            
198
1
  ASSERT_TRUE(fake_upstream_connection->write("bar"));
199

            
200
1
  response->waitForBodyData(5);
201
1
  resp = response->body();
202
1
  ASSERT_EQ(resp.substr(0, 5), "\x82\x3"
203
1
                               "bar");
204
1
  response->clearBody();
205

            
206
  // Bigger size formats & multiple responses.
207
  // Officially optimal length formats must be used, but our implementation
208
  // accepts larger formats with less data, which makes testing easier.
209
1
  ASSERT_EQ(buf.length(), 0);
210
1
  absl::string_view frame16{"\x82\x7e\0\x5"
211
1
                            "len16",
212
1
                            9};
213
1
  buf.add(frame16);
214
1
  client_conn->write(buf, false);
215
  // Run the dispatcher so that the write event is handled
216
1
  client_conn->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
217

            
218
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(seen_data_len + 5, &data));
219
1
  ASSERT_EQ(data.substr(seen_data_len), "len16");
220
1
  seen_data_len = data.length();
221

            
222
1
  ASSERT_TRUE(fake_upstream_connection->write("foo"));
223
1
  response->waitForBodyData(5);
224
1
  ASSERT_TRUE(fake_upstream_connection->write("bar"));
225
1
  response->waitForBodyData(10);
226
1
  resp = response->body();
227
1
  ASSERT_EQ(resp.substr(0, 5), "\x82\x3"
228
1
                               "foo");
229
1
  ASSERT_EQ(resp.substr(5, 5), "\x82\x3"
230
1
                               "bar");
231
1
  response->clearBody();
232

            
233
  // 64-bit size format
234
  // Officially optimal length formats must be used, but our implementation
235
  // accepts larger formats with less data, which makes testing easier.
236
1
  ASSERT_EQ(buf.length(), 0);
237
1
  absl::string_view frame64{"\x82\x7f\0\0\0\0\0\0\0\x5"
238
1
                            "len64",
239
1
                            15};
240
1
  buf.add(frame64);
241
1
  client_conn->write(buf, false);
242
  // Run the dispatcher so that the write event is handled
243
1
  client_conn->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
244

            
245
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(seen_data_len + 5, &data));
246
1
  ASSERT_EQ(data.substr(seen_data_len), "len64");
247
1
  seen_data_len = data.length();
248

            
249
1
  ASSERT_TRUE(fake_upstream_connection->write("hello"));
250
1
  response->waitForBodyData(7);
251
1
  resp = response->body();
252
1
  ASSERT_EQ(resp.substr(0, 7), "\x82\x5"
253
1
                               "hello");
254
1
  response->clearBody();
255

            
256
  // Gaps within a frame
257
1
  ASSERT_EQ(buf.length(), 0);
258
1
  buf.add("\x82\x5"
259
1
          "hello"
260
1
          "\x82\xe"
261
1
          "gap ");
262
1
  client_conn->write(buf, false);
263
  // Run the dispatcher so that the write event is handled
264
1
  client_conn->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
265

            
266
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(seen_data_len + 9, &data));
267
1
  ASSERT_EQ(data.substr(seen_data_len), "hellogap ");
268
1
  seen_data_len = data.length();
269

            
270
1
  ASSERT_TRUE(fake_upstream_connection->write("bar42"));
271

            
272
1
  ASSERT_EQ(buf.length(), 0);
273
1
  buf.add("in between"
274
1
          "\x82\x3"
275
1
          "foo");
276
1
  client_conn->write(buf, false);
277
  // Run the dispatcher so that the write event is handled
278
1
  client_conn->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
279

            
280
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(seen_data_len + 13, &data));
281
1
  ASSERT_EQ(data.substr(seen_data_len), "in betweenfoo");
282
1
  seen_data_len = data.length();
283

            
284
1
  response->waitForBodyData(7);
285
1
  resp = response->body();
286
1
  ASSERT_EQ(resp.substr(0, 7), "\x82\x5"
287
1
                               "bar42");
288
1
  response->clearBody();
289

            
290
  // Masked frames
291
1
  ASSERT_EQ(buf.length(), 0);
292
1
  auto msg = "heello there\r\n"s;
293
1
  unsigned char mask[4] = {0x12, 0x34, 0x56, 0x78};
294
1
  auto masked = msg;
295
15
  for (size_t i = 0; i < msg.length(); i++) {
296
14
    masked[i] = msg[i] ^ mask[i % 4];
297
14
  }
298
1
  buf.add("\x82\x8e");
299
1
  buf.add(mask, 4);
300
1
  buf.add(masked.data(), masked.length());
301
1
  client_conn->write(buf, false);
302
  // Run the dispatcher so that the write event is handled
303
1
  client_conn->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
304

            
305
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(seen_data_len + 14, &data));
306
1
  ASSERT_EQ(data.substr(seen_data_len), msg);
307
1
  seen_data_len = data.length();
308

            
309
1
  ASSERT_TRUE(fake_upstream_connection->write(msg));
310

            
311
1
  response->waitForBodyData(16);
312
1
  ASSERT_EQ(response->body().length(), 16);
313
1
  resp = response->body();
314
1
  ASSERT_EQ(resp.substr(0, 16), "\x82\xe"
315
1
                                "heello there\r\n");
316
1
  response->clearBody();
317

            
318
  // 2nd masked frame
319
1
  ASSERT_EQ(buf.length(), 0);
320
1
  auto msg2 = "hello there\r\n"s;
321
1
  unsigned char mask2[4] = {0x90, 0xab, 0xcd, 0xef};
322
1
  auto masked2 = msg2;
323
14
  for (size_t i = 0; i < msg2.length(); i++) {
324
13
    masked2[i] = msg2[i] ^ mask2[i % 4];
325
13
  }
326
  // Write frame header
327
1
  buf.add("\x82\x8d");
328
1
  buf.add(mask2, 4);
329
1
  client_conn->write(buf, false);
330
  // Run the dispatcher so that the write event is handled
331
1
  client_conn->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
332

            
333
  // Write 5 first bytes
334
1
  buf.add(masked2.data(), 5);
335
1
  client_conn->write(buf, false);
336
  // Run the dispatcher so that the write event is handled
337
1
  client_conn->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
338

            
339
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(seen_data_len + 5, &data));
340
1
  ASSERT_EQ(data.substr(seen_data_len), absl::string_view(msg2.data(), 5));
341
1
  seen_data_len = data.length();
342

            
343
  // Write remaining bytes
344
1
  buf.add(masked2.data() + 5, masked2.length() - 5);
345
1
  client_conn->write(buf, false);
346
  // Run the dispatcher so that the write event is handled
347
1
  client_conn->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
348

            
349
1
  ASSERT_TRUE(fake_upstream_connection->waitForData(seen_data_len + 13 - 5, &data));
350
1
  ASSERT_EQ(data.substr(seen_data_len), msg2.data() + 5);
351
  // seen_data_len = data.length(); // not used after, no need to update
352

            
353
1
  ASSERT_TRUE(fake_upstream_connection->write(msg2));
354

            
355
1
  response->waitForBodyData(15);
356
1
  resp = response->body();
357

            
358
1
  ASSERT_EQ(resp.substr(0, 15), "\x82\xd"
359
1
                                "hello there\r\n");
360
1
  response->clearBody();
361

            
362
  // Close
363
1
  ASSERT_TRUE(fake_upstream_connection->close());
364
1
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
365

            
366
  // Wait for websocket close frame
367
1
  response->waitForBodyData(2);
368
1
  absl::string_view close_frame{"\x88\0", 2};
369
1
  ASSERT_EQ(response->body(), close_frame);
370

            
371
1
  cleanupUpstreamAndDownstream();
372
1
}
373

            
374
} // namespace Envoy