Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/test/integration/utility.cc
Line
Count
Source (jump to first uncovered line)
1
#include "utility.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7
8
#include "envoy/config/bootstrap/v3/bootstrap.pb.h"
9
#include "envoy/event/dispatcher.h"
10
#include "envoy/extensions/transport_sockets/quic/v3/quic_transport.pb.h"
11
#include "envoy/network/connection.h"
12
13
#include "source/common/api/api_impl.h"
14
#include "source/common/buffer/buffer_impl.h"
15
#include "source/common/common/assert.h"
16
#include "source/common/common/fmt.h"
17
#include "source/common/config/utility.h"
18
#include "source/common/http/header_map_impl.h"
19
#include "source/common/http/headers.h"
20
#include "source/common/network/address_impl.h"
21
#include "source/common/network/utility.h"
22
#include "source/common/quic/quic_stat_names.h"
23
#include "source/common/upstream/upstream_impl.h"
24
#include "source/extensions/http/header_validators/envoy_default/http1_header_validator.h"
25
#include "source/extensions/http/header_validators/envoy_default/http2_header_validator.h"
26
27
#ifdef ENVOY_ENABLE_QUIC
28
#include "source/common/quic/client_connection_factory_impl.h"
29
#include "source/common/quic/quic_client_transport_socket_factory.h"
30
#include "quiche/quic/core/deterministic_connection_id_generator.h"
31
#endif
32
33
#ifdef ENVOY_ENABLE_YAML
34
#include "test/common/upstream/utility.h"
35
#include "test/integration/ssl_utility.h"
36
#endif
37
#include "test/mocks/common.h"
38
#include "test/mocks/server/instance.h"
39
#include "test/mocks/server/transport_socket_factory_context.h"
40
#include "test/mocks/stats/mocks.h"
41
#include "test/mocks/upstream/cluster_info.h"
42
#include "test/test_common/environment.h"
43
#include "test/test_common/network_utility.h"
44
#include "test/test_common/printers.h"
45
#include "test/test_common/utility.h"
46
47
#include "absl/strings/match.h"
48
49
namespace Envoy {
50
51
using ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig;
52
using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ClientHttp1HeaderValidator;
53
using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ClientHttp2HeaderValidator;
54
using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ConfigOverrides;
55
using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ServerHttp1HeaderValidator;
56
using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ServerHttp2HeaderValidator;
57
58
namespace {
59
60
0
RawConnectionDriver::DoWriteCallback writeBufferCallback(Buffer::Instance& data) {
61
0
  auto shared_data = std::make_shared<Buffer::OwnedImpl>();
62
0
  shared_data->move(data);
63
0
  return [shared_data](Buffer::Instance& dest) {
64
0
    if (shared_data->length() > 0) {
65
0
      dest.add(*shared_data);
66
0
      shared_data->drain(shared_data->length());
67
0
    }
68
0
    return false;
69
0
  };
70
0
}
71
72
} // namespace
73
74
0
void BufferingStreamDecoder::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
75
0
  ASSERT(!complete_);
76
0
  complete_ = end_stream;
77
0
  headers_ = std::move(headers);
78
0
  if (complete_) {
79
0
    onComplete();
80
0
  }
81
0
}
82
83
0
void BufferingStreamDecoder::decodeData(Buffer::Instance& data, bool end_stream) {
84
0
  ASSERT(!complete_);
85
0
  complete_ = end_stream;
86
0
  body_.append(data.toString());
87
0
  if (complete_) {
88
0
    onComplete();
89
0
  }
90
0
}
91
92
0
void BufferingStreamDecoder::decodeTrailers(Http::ResponseTrailerMapPtr&&) {
93
0
  PANIC("not implemented");
94
0
}
95
96
0
void BufferingStreamDecoder::onComplete() {
97
0
  ASSERT(complete_);
98
0
  on_complete_cb_();
99
0
}
100
101
0
void BufferingStreamDecoder::onResetStream(Http::StreamResetReason, absl::string_view) {
102
0
  ADD_FAILURE();
103
0
}
104
105
// A callback for a QUIC client connection to unblock the test after handshake succeeds. QUIC
106
// network connection initiates handshake and raises Connected event when it's done. Tests should
107
// proceed with sending requests afterwards.
108
class TestConnectionCallbacks : public Network::ConnectionCallbacks {
109
public:
110
0
  TestConnectionCallbacks(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
111
112
  // Network::ConnectionCallbacks
113
0
  void onEvent(Network::ConnectionEvent event) override {
114
0
    if (event == Network::ConnectionEvent::Connected) {
115
      // Handshake finished, unblock the test to continue. This is needed because we call
116
      // Dispatcher::run() with Block to wait for the handshake to finish before proceeding.
117
      // TODO(danzh) find an alternative approach with behaviors more in parallel with SSL.
118
0
      connected_ = true;
119
0
      dispatcher_.exit();
120
0
    } else if (event == Network::ConnectionEvent::RemoteClose) {
121
      // If the peer closes the connection, no need to wait anymore.
122
0
      dispatcher_.exit();
123
0
    } else {
124
0
      if (!connected_) {
125
        // Before handshake gets established, any connection failure should exit the loop. I.e. a
126
        // QUIC connection may fail of INVALID_VERSION if both this client doesn't support any of
127
        // the versions the server advertised before handshake established. In this case the
128
        // connection is closed locally and this is in a blocking event loop.
129
0
        dispatcher_.exit();
130
0
      }
131
0
    }
132
0
  }
133
134
0
  void onAboveWriteBufferHighWatermark() override {}
135
0
  void onBelowWriteBufferLowWatermark() override {}
136
137
private:
138
  Event::Dispatcher& dispatcher_;
139
  bool connected_{false};
140
};
141
142
Network::UpstreamTransportSocketFactoryPtr
143
IntegrationUtil::createQuicUpstreamTransportSocketFactory(Api::Api& api, Stats::Store& store,
144
                                                          Ssl::ContextManager& context_manager,
145
                                                          ThreadLocal::Instance& threadlocal,
146
                                                          const std::string& san_to_match,
147
0
                                                          bool connect_to_upstreams) {
148
0
  NiceMock<Server::Configuration::MockTransportSocketFactoryContext> context;
149
0
  ON_CALL(context.server_context_, api()).WillByDefault(testing::ReturnRef(api));
150
0
  ON_CALL(context, statsScope()).WillByDefault(testing::ReturnRef(*store.rootScope()));
151
0
  ON_CALL(context, sslContextManager()).WillByDefault(testing::ReturnRef(context_manager));
152
0
  ON_CALL(context.server_context_, threadLocal()).WillByDefault(testing::ReturnRef(threadlocal));
153
0
  envoy::extensions::transport_sockets::quic::v3::QuicUpstreamTransport
154
0
      quic_transport_socket_config;
155
0
  auto* tls_context = quic_transport_socket_config.mutable_upstream_tls_context();
156
0
#ifdef ENVOY_ENABLE_YAML
157
0
  initializeUpstreamTlsContextConfig(
158
0
      Ssl::ClientSslTransportOptions().setAlpn(true).setSan(san_to_match).setSni("lyft.com"),
159
0
      *tls_context, connect_to_upstreams);
160
#else
161
  UNREFERENCED_PARAMETER(tls_context);
162
  UNREFERENCED_PARAMETER(san_to_match);
163
  UNREFERENCED_PARAMETER(connect_to_upstreams);
164
  RELEASE_ASSERT(0, "unsupported");
165
#endif // ENVOY_ENABLE_YAML
166
167
0
  envoy::config::core::v3::TransportSocket message;
168
0
  message.mutable_typed_config()->PackFrom(quic_transport_socket_config);
169
0
  auto& config_factory = Config::Utility::getAndCheckFactory<
170
0
      Server::Configuration::UpstreamTransportSocketConfigFactory>(message);
171
0
  return config_factory.createTransportSocketFactory(quic_transport_socket_config, context).value();
172
0
}
173
174
BufferingStreamDecoderPtr
175
sendRequestAndWaitForResponse(Event::Dispatcher& dispatcher, const std::string& method,
176
                              const std::string& url, const std::string& body,
177
                              const std::string& host, const std::string& content_type,
178
0
                              Http::CodecClientProd& client) {
179
0
  BufferingStreamDecoderPtr response(new BufferingStreamDecoder([&]() -> void {
180
0
    client.close();
181
0
    dispatcher.exit();
182
0
  }));
183
0
  Http::RequestEncoder& encoder = client.newStream(*response);
184
0
  encoder.getStream().addCallbacks(*response);
185
186
0
  Http::TestRequestHeaderMapImpl headers;
187
0
  headers.setMethod(method);
188
0
  headers.setPath(url);
189
0
  headers.setHost(host);
190
0
  headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http);
191
0
  if (!content_type.empty()) {
192
0
    headers.setContentType(content_type);
193
0
  }
194
0
  const auto status = encoder.encodeHeaders(headers, body.empty());
195
0
  ASSERT(status.ok());
196
0
  if (!body.empty()) {
197
0
    Buffer::OwnedImpl body_buffer(body);
198
0
    encoder.encodeData(body_buffer, true);
199
0
  }
200
201
0
  dispatcher.run(Event::Dispatcher::RunType::Block);
202
0
  return response;
203
0
}
204
205
BufferingStreamDecoderPtr
206
IntegrationUtil::makeSingleRequest(const Network::Address::InstanceConstSharedPtr& addr,
207
                                   const std::string& method, const std::string& url,
208
                                   const std::string& body, Http::CodecType type,
209
0
                                   const std::string& host, const std::string& content_type) {
210
0
  NiceMock<Stats::MockIsolatedStatsStore> mock_stats_store;
211
0
  Quic::QuicStatNames quic_stat_names(mock_stats_store.symbolTable());
212
0
  NiceMock<Random::MockRandomGenerator> random;
213
0
  Event::GlobalTimeSystem time_system;
214
0
  NiceMock<Random::MockRandomGenerator> random_generator;
215
0
  envoy::config::bootstrap::v3::Bootstrap bootstrap;
216
0
  Api::Impl api(Thread::threadFactoryForTest(), mock_stats_store, time_system,
217
0
                Filesystem::fileSystemForTest(), random_generator, bootstrap);
218
0
  Event::DispatcherPtr dispatcher(api.allocateDispatcher("test_thread"));
219
0
  TestConnectionCallbacks connection_callbacks(*dispatcher);
220
0
  Network::TransportSocketOptionsConstSharedPtr options;
221
222
0
  std::shared_ptr<Upstream::MockClusterInfo> cluster{new NiceMock<Upstream::MockClusterInfo>()};
223
0
  Upstream::HostDescriptionConstSharedPtr host_description =
224
0
      std::make_shared<Upstream::HostDescriptionImpl>(
225
0
          cluster, "",
226
0
          *Network::Utility::resolveUrl(
227
0
              fmt::format("{}://127.0.0.1:80", (type == Http::CodecType::HTTP3 ? "udp" : "tcp"))),
228
0
          nullptr, nullptr, envoy::config::core::v3::Locality().default_instance(),
229
0
          envoy::config::endpoint::v3::Endpoint::HealthCheckConfig::default_instance(), 0,
230
0
          time_system);
231
232
0
  if (type <= Http::CodecType::HTTP2) {
233
0
    Http::CodecClientProd client(type,
234
0
                                 dispatcher->createClientConnection(
235
0
                                     addr, Network::Address::InstanceConstSharedPtr(),
236
0
                                     Network::Test::createRawBufferSocket(), nullptr, nullptr),
237
0
                                 host_description, *dispatcher, random, options);
238
0
    return sendRequestAndWaitForResponse(*dispatcher, method, url, body, host, content_type,
239
0
                                         client);
240
0
  }
241
242
0
#ifdef ENVOY_ENABLE_QUIC
243
0
  testing::NiceMock<ThreadLocal::MockInstance> threadlocal;
244
0
  NiceMock<Server::Configuration::MockServerFactoryContext> server_factory_context;
245
0
  Extensions::TransportSockets::Tls::ContextManagerImpl manager(server_factory_context);
246
0
  Network::UpstreamTransportSocketFactoryPtr transport_socket_factory =
247
0
      createQuicUpstreamTransportSocketFactory(api, mock_stats_store, manager, threadlocal,
248
0
                                               "spiffe://lyft.com/backend-team");
249
0
  auto& quic_transport_socket_factory =
250
0
      dynamic_cast<Quic::QuicClientTransportSocketFactory&>(*transport_socket_factory);
251
0
  auto persistent_info = std::make_unique<Quic::PersistentQuicInfoImpl>(*dispatcher, 0);
252
253
0
  Network::Address::InstanceConstSharedPtr local_address;
254
0
  if (addr->ip()->version() == Network::Address::IpVersion::v4) {
255
0
    local_address = Network::Utility::getLocalAddress(Network::Address::IpVersion::v4);
256
0
  } else {
257
    // Docker only works with loopback v6 address.
258
0
    local_address = std::make_shared<Network::Address::Ipv6Instance>("::1");
259
0
  }
260
0
  quic::DeterministicConnectionIdGenerator generator(quic::kQuicDefaultConnectionIdLength);
261
0
  Network::ClientConnectionPtr connection = Quic::createQuicNetworkConnection(
262
0
      *persistent_info, quic_transport_socket_factory.getCryptoConfig(),
263
0
      quic::QuicServerId(
264
0
          quic_transport_socket_factory.clientContextConfig()->serverNameIndication(),
265
0
          static_cast<uint16_t>(addr->ip()->port())),
266
0
      *dispatcher, addr, local_address, quic_stat_names, {}, *mock_stats_store.rootScope(), nullptr,
267
0
      nullptr, generator, quic_transport_socket_factory);
268
0
  connection->addConnectionCallbacks(connection_callbacks);
269
0
  Http::CodecClientProd client(type, std::move(connection), host_description, *dispatcher, random,
270
0
                               options);
271
  // Quic connection needs to finish handshake.
272
0
  dispatcher->run(Event::Dispatcher::RunType::Block);
273
0
  return sendRequestAndWaitForResponse(*dispatcher, method, url, body, host, content_type, client);
274
#else
275
  ASSERT(false, "running a QUIC integration test without compiling QUIC");
276
  return nullptr;
277
#endif
278
0
}
279
280
BufferingStreamDecoderPtr
281
IntegrationUtil::makeSingleRequest(uint32_t port, const std::string& method, const std::string& url,
282
                                   const std::string& body, Http::CodecType type,
283
                                   Network::Address::IpVersion ip_version, const std::string& host,
284
0
                                   const std::string& content_type) {
285
0
  auto addr = *Network::Utility::resolveUrl(
286
0
      fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(ip_version), port));
287
0
  return makeSingleRequest(addr, method, url, body, type, host, content_type);
288
0
}
289
290
RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& request_data,
291
                                         ReadCallback response_data_callback,
292
                                         Network::Address::IpVersion version,
293
                                         Event::Dispatcher& dispatcher,
294
                                         Network::TransportSocketPtr transport_socket)
295
    : RawConnectionDriver(port, writeBufferCallback(request_data), response_data_callback, version,
296
0
                          dispatcher, std::move(transport_socket)) {}
297
298
RawConnectionDriver::RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback,
299
                                         ReadCallback response_data_callback,
300
                                         Network::Address::IpVersion version,
301
                                         Event::Dispatcher& dispatcher,
302
                                         Network::TransportSocketPtr transport_socket)
303
0
    : dispatcher_(dispatcher), remaining_bytes_to_send_(0) {
304
0
  api_ = Api::createApiForTest(stats_store_);
305
0
  Event::GlobalTimeSystem time_system;
306
0
  callbacks_ = std::make_unique<ConnectionCallbacks>(
307
0
      [this, write_request_callback]() {
308
0
        Buffer::OwnedImpl buffer;
309
0
        const bool close_after = write_request_callback(buffer);
310
0
        remaining_bytes_to_send_ += buffer.length();
311
0
        client_->write(buffer, close_after);
312
0
      },
313
0
      dispatcher);
314
315
0
  if (transport_socket == nullptr) {
316
0
    transport_socket = Network::Test::createRawBufferSocket();
317
0
  }
318
319
0
  client_ = dispatcher_.createClientConnection(
320
0
      *Network::Utility::resolveUrl(
321
0
          fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)),
322
0
      Network::Address::InstanceConstSharedPtr(), std::move(transport_socket), nullptr, nullptr);
323
  // ConnectionCallbacks will call write_request_callback from the connect and low-watermark
324
  // callbacks. Set a small buffer limit so high-watermark is triggered after every write and
325
  // low-watermark is triggered every time the buffer is drained.
326
0
  client_->setBufferLimits(1);
327
0
  client_->addConnectionCallbacks(*callbacks_);
328
0
  client_->addReadFilter(
329
0
      Network::ReadFilterSharedPtr{new ForwardingFilter(*this, response_data_callback)});
330
0
  client_->addBytesSentCallback([&](uint64_t bytes) {
331
0
    remaining_bytes_to_send_ -= bytes;
332
0
    return true;
333
0
  });
334
0
  client_->connect();
335
0
}
336
337
// This factory is needed to avoid dealing with the ServerFactoryContext, which is
338
// problematic in dedicated threads for fake upstreams or test client.
339
// UHV is needed in fake upstreams or test client for translation
340
// of extended CONNECT to upgrade, which is done by the codecs.
341
class FakeHeaderValidatorFactory : public Http::HeaderValidatorFactory {
342
public:
343
0
  FakeHeaderValidatorFactory(const HeaderValidatorConfig& config) : config_(config) {}
344
345
  Http::ServerHeaderValidatorPtr
346
0
  createServerHeaderValidator(Http::Protocol protocol, Http::HeaderValidatorStats& stats) override {
347
0
    ConfigOverrides config_overrides;
348
0
349
0
    switch (protocol) {
350
0
    case Http::Protocol::Http3:
351
0
    case Http::Protocol::Http2:
352
0
      return std::make_unique<ServerHttp2HeaderValidator>(config_, protocol, stats,
353
0
                                                          config_overrides);
354
0
    case Http::Protocol::Http11:
355
0
    case Http::Protocol::Http10:
356
0
      return std::make_unique<ServerHttp1HeaderValidator>(config_, protocol, stats,
357
0
                                                          config_overrides);
358
0
    }
359
0
    PANIC_DUE_TO_CORRUPT_ENUM;
360
0
  }
361
362
  Http::ClientHeaderValidatorPtr
363
0
  createClientHeaderValidator(Http::Protocol protocol, Http::HeaderValidatorStats& stats) override {
364
0
    ConfigOverrides config_overrides;
365
0
366
0
    switch (protocol) {
367
0
    case Http::Protocol::Http3:
368
0
    case Http::Protocol::Http2:
369
0
      return std::make_unique<ClientHttp2HeaderValidator>(config_, protocol, stats,
370
0
                                                          config_overrides);
371
0
    case Http::Protocol::Http11:
372
0
    case Http::Protocol::Http10:
373
0
      return std::make_unique<ClientHttp1HeaderValidator>(config_, protocol, stats,
374
0
                                                          config_overrides);
375
0
    }
376
0
    PANIC_DUE_TO_CORRUPT_ENUM;
377
0
  }
378
379
private:
380
  const HeaderValidatorConfig config_;
381
};
382
383
Http::HeaderValidatorFactoryPtr
384
1.22k
IntegrationUtil::makeHeaderValidationFactory([[maybe_unused]] const HeaderValidatorConfig& config) {
385
#ifdef ENVOY_ENABLE_UHV
386
  return std::make_unique<FakeHeaderValidatorFactory>(config);
387
#else
388
1.22k
  return nullptr;
389
1.22k
#endif
390
1.22k
}
391
392
0
RawConnectionDriver::~RawConnectionDriver() = default;
393
394
0
testing::AssertionResult RawConnectionDriver::waitForConnection() {
395
  // TODO(mattklein123): Add a timeout and switch to events and waitFor().
396
0
  while (!callbacks_->connected() && !callbacks_->closed()) {
397
0
    Event::GlobalTimeSystem().timeSystem().realSleepDoNotUseWithoutScrutiny(
398
0
        std::chrono::milliseconds(10));
399
0
    dispatcher_.run(Event::Dispatcher::RunType::NonBlock);
400
0
  }
401
0
  if (!callbacks_->connected()) {
402
0
    return testing::AssertionFailure();
403
0
  }
404
0
  return testing::AssertionSuccess();
405
0
}
406
407
testing::AssertionResult RawConnectionDriver::run(Event::Dispatcher::RunType run_type,
408
0
                                                  std::chrono::milliseconds timeout) {
409
0
  Event::TimerPtr timeout_timer = dispatcher_.createTimer([this]() -> void { dispatcher_.exit(); });
410
0
  timeout_timer->enableTimer(timeout);
411
412
0
  dispatcher_.run(run_type);
413
414
0
  if (timeout_timer->enabled()) {
415
0
    timeout_timer->disableTimer();
416
0
    return testing::AssertionSuccess();
417
0
  }
418
0
  return testing::AssertionFailure();
419
0
}
420
421
0
void RawConnectionDriver::close() { client_->close(Network::ConnectionCloseType::FlushWrite); }
422
423
0
bool RawConnectionDriver::allBytesSent() const { return remaining_bytes_to_send_ == 0; }
424
425
WaitForPayloadReader::WaitForPayloadReader(Event::Dispatcher& dispatcher)
426
2.11k
    : dispatcher_(dispatcher) {}
427
428
130
Network::FilterStatus WaitForPayloadReader::onData(Buffer::Instance& data, bool end_stream) {
429
130
  data_.append(data.toString());
430
130
  data.drain(data.length());
431
130
  read_end_stream_ = end_stream;
432
130
  if ((!data_to_wait_for_.empty() && absl::StartsWith(data_, data_to_wait_for_)) ||
433
130
      (exact_match_ == false && data_.find(data_to_wait_for_) != std::string::npos) || end_stream) {
434
0
    data_to_wait_for_.clear();
435
0
    dispatcher_.exit();
436
0
  }
437
438
130
  if (wait_for_length_ && data_.size() >= length_to_wait_for_) {
439
0
    wait_for_length_ = false;
440
0
    dispatcher_.exit();
441
0
  }
442
443
130
  return Network::FilterStatus::StopIteration;
444
130
}
445
446
} // namespace Envoy