Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/network/utility.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/network/utility.h"
2
3
#include <cstddef>
4
#include <cstdint>
5
#include <list>
6
#include <memory>
7
#include <sstream>
8
#include <string>
9
#include <vector>
10
11
#include "envoy/buffer/buffer.h"
12
#include "envoy/common/exception.h"
13
#include "envoy/common/platform.h"
14
#include "envoy/config/core/v3/address.pb.h"
15
#include "envoy/network/address.h"
16
#include "envoy/network/connection.h"
17
18
#include "source/common/api/os_sys_calls_impl.h"
19
#include "source/common/buffer/buffer_impl.h"
20
#include "source/common/common/assert.h"
21
#include "source/common/common/cleanup.h"
22
#include "source/common/common/fmt.h"
23
#include "source/common/common/utility.h"
24
#include "source/common/network/address_impl.h"
25
#include "source/common/network/io_socket_error_impl.h"
26
#include "source/common/network/socket_option_impl.h"
27
#include "source/common/protobuf/protobuf.h"
28
#include "source/common/protobuf/utility.h"
29
#include "source/common/runtime/runtime_features.h"
30
31
#include "absl/container/fixed_array.h"
32
#include "absl/strings/match.h"
33
#include "absl/strings/string_view.h"
34
35
namespace Envoy {
36
namespace Network {
37
namespace {
38
39
928k
Address::InstanceConstSharedPtr instanceOrNull(StatusOr<Address::InstanceConstSharedPtr> address) {
40
928k
  if (address.ok()) {
41
928k
    return *address;
42
928k
  }
43
0
  return nullptr;
44
928k
}
45
46
} // namespace
47
48
0
std::string Utility::urlFromDatagramAddress(const Address::Instance& addr) {
49
0
  if (addr.ip() != nullptr) {
50
0
    return absl::StrCat(UDP_SCHEME, addr.asStringView());
51
0
  } else {
52
0
    return absl::StrCat(UNIX_SCHEME, addr.asStringView());
53
0
  }
54
0
}
55
56
664k
absl::StatusOr<Address::InstanceConstSharedPtr> Utility::resolveUrl(const std::string& url) {
57
664k
  Address::InstanceConstSharedPtr address{};
58
664k
  if (urlIsTcpScheme(url)) {
59
661k
    address = parseInternetAddressAndPortNoThrow(url.substr(TCP_SCHEME.size()));
60
661k
  } else if (urlIsUdpScheme(url)) {
61
521
    address = parseInternetAddressAndPortNoThrow(url.substr(UDP_SCHEME.size()));
62
2.82k
  } else if (urlIsUnixScheme(url)) {
63
446
    return Address::PipeInstance::create(url.substr(UNIX_SCHEME.size()));
64
2.37k
  } else {
65
2.37k
    return absl::InvalidArgumentError(absl::StrCat("unknown protocol scheme: ", url));
66
2.37k
  }
67
661k
  if (!address) {
68
1.43k
    return absl::InvalidArgumentError(absl::StrCat("malformed IP address: ", url));
69
1.43k
  }
70
660k
  return address;
71
661k
}
72
73
0
StatusOr<Socket::Type> Utility::socketTypeFromUrl(const std::string& url) {
74
0
  if (urlIsTcpScheme(url)) {
75
0
    return Socket::Type::Stream;
76
0
  } else if (urlIsUdpScheme(url)) {
77
0
    return Socket::Type::Datagram;
78
0
  } else if (urlIsUnixScheme(url)) {
79
0
    return Socket::Type::Stream;
80
0
  } else {
81
0
    return absl::InvalidArgumentError(absl::StrCat("unknown protocol scheme: ", url));
82
0
  }
83
0
}
84
85
664k
bool Utility::urlIsTcpScheme(absl::string_view url) { return absl::StartsWith(url, TCP_SCHEME); }
86
87
3.34k
bool Utility::urlIsUdpScheme(absl::string_view url) { return absl::StartsWith(url, UDP_SCHEME); }
88
89
2.82k
bool Utility::urlIsUnixScheme(absl::string_view url) { return absl::StartsWith(url, UNIX_SCHEME); }
90
91
namespace {
92
93
Api::IoCallUint64Result receiveMessage(uint64_t max_rx_datagram_size, Buffer::InstancePtr& buffer,
94
                                       IoHandle::RecvMsgOutput& output, IoHandle& handle,
95
                                       const Address::Instance& local_address,
96
0
                                       const IoHandle::UdpSaveCmsgConfig& save_cmsg_config) {
97
98
0
  auto reservation = buffer->reserveSingleSlice(max_rx_datagram_size);
99
0
  Buffer::RawSlice slice = reservation.slice();
100
0
  Api::IoCallUint64Result result =
101
0
      handle.recvmsg(&slice, 1, local_address.ip()->port(), save_cmsg_config, output);
102
103
0
  if (result.ok()) {
104
0
    reservation.commit(std::min(max_rx_datagram_size, result.return_value_));
105
0
  }
106
107
0
  return result;
108
0
}
109
110
934k
StatusOr<sockaddr_in> parseV4Address(const std::string& ip_address, uint16_t port) {
111
934k
  sockaddr_in sa4;
112
934k
  memset(&sa4, 0, sizeof(sa4));
113
934k
  if (inet_pton(AF_INET, ip_address.c_str(), &sa4.sin_addr) != 1) {
114
115k
    return absl::FailedPreconditionError("failed parsing ipv4");
115
115k
  }
116
818k
  sa4.sin_family = AF_INET;
117
818k
  sa4.sin_port = htons(port);
118
818k
  return sa4;
119
934k
}
120
121
115k
StatusOr<sockaddr_in6> parseV6Address(const std::string& ip_address, uint16_t port) {
122
  // Parse IPv6 with optional scope using getaddrinfo().
123
115k
  struct addrinfo hints;
124
115k
  memset(&hints, 0, sizeof(hints));
125
115k
  struct addrinfo* res = nullptr;
126
  // Suppresses any potentially lengthy network host address lookups and inhibit the invocation of
127
  // a name resolution service.
128
115k
  hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
129
115k
  hints.ai_family = AF_INET6;
130
  // Given that we don't specify a service but we use getaddrinfo() to only parse the node
131
  // address, specifying the socket type allows to hint the getaddrinfo() to return only an
132
  // element with the below socket type. The behavior though remains platform dependent and anyway
133
  // we consume only the first element (if the call succeeds).
134
115k
  hints.ai_socktype = SOCK_DGRAM;
135
115k
  hints.ai_protocol = IPPROTO_UDP;
136
115k
  const Api::SysCallIntResult rc = Api::OsSysCallsSingleton::get().getaddrinfo(
137
115k
      ip_address.c_str(), /*service=*/nullptr, &hints, &res);
138
115k
  if (rc.return_value_ != 0) {
139
6.68k
    return absl::FailedPreconditionError(fmt::format("getaddrinfo error: {}", rc.return_value_));
140
6.68k
  }
141
109k
  sockaddr_in6 sa6 = *reinterpret_cast<sockaddr_in6*>(res->ai_addr);
142
109k
  freeaddrinfo(res);
143
109k
  sa6.sin6_port = htons(port);
144
109k
  return sa6;
145
115k
}
146
147
} // namespace
148
149
Address::InstanceConstSharedPtr Utility::parseInternetAddressNoThrow(const std::string& ip_address,
150
274k
                                                                     uint16_t port, bool v6only) {
151
274k
  StatusOr<sockaddr_in> sa4 = parseV4Address(ip_address, port);
152
274k
  if (sa4.ok()) {
153
158k
    return instanceOrNull(
154
158k
        Address::InstanceFactory::createInstancePtr<Address::Ipv4Instance>(&sa4.value()));
155
158k
  }
156
157
115k
  StatusOr<sockaddr_in6> sa6 = parseV6Address(ip_address, port);
158
115k
  if (sa6.ok()) {
159
109k
    return instanceOrNull(
160
109k
        Address::InstanceFactory::createInstancePtr<Address::Ipv6Instance>(*sa6, v6only));
161
109k
  }
162
6.47k
  return nullptr;
163
115k
}
164
165
Address::InstanceConstSharedPtr
166
661k
Utility::parseInternetAddressAndPortNoThrow(const std::string& ip_address, bool v6only) {
167
661k
  if (ip_address.empty()) {
168
12
    return nullptr;
169
12
  }
170
661k
  if (ip_address[0] == '[') {
171
    // Appears to be an IPv6 address. Find the "]:" that separates the address from the port.
172
958
    const auto pos = ip_address.rfind("]:");
173
958
    if (pos == std::string::npos) {
174
52
      return nullptr;
175
52
    }
176
906
    const auto ip_str = ip_address.substr(1, pos - 1);
177
906
    const auto port_str = ip_address.substr(pos + 2);
178
906
    uint64_t port64 = 0;
179
906
    if (port_str.empty() || !absl::SimpleAtoi(port_str, &port64) || port64 > 65535) {
180
542
      return nullptr;
181
542
    }
182
364
    StatusOr<sockaddr_in6> sa6 = parseV6Address(ip_str, port64);
183
364
    if (sa6.ok()) {
184
156
      return instanceOrNull(
185
156
          Address::InstanceFactory::createInstancePtr<Address::Ipv6Instance>(*sa6, v6only));
186
156
    }
187
208
    return nullptr;
188
364
  }
189
  // Treat it as an IPv4 address followed by a port.
190
661k
  const auto pos = ip_address.rfind(':');
191
661k
  if (pos == std::string::npos) {
192
32
    return nullptr;
193
32
  }
194
660k
  const auto ip_str = ip_address.substr(0, pos);
195
660k
  const auto port_str = ip_address.substr(pos + 1);
196
660k
  uint64_t port64 = 0;
197
660k
  if (port_str.empty() || !absl::SimpleAtoi(port_str, &port64) || port64 > 65535) {
198
432
    return nullptr;
199
432
  }
200
660k
  StatusOr<sockaddr_in> sa4 = parseV4Address(ip_str, port64);
201
660k
  if (sa4.ok()) {
202
660k
    return instanceOrNull(
203
660k
        Address::InstanceFactory::createInstancePtr<Address::Ipv4Instance>(&sa4.value()));
204
660k
  }
205
161
  return nullptr;
206
660k
}
207
208
0
Address::InstanceConstSharedPtr Utility::copyInternetAddressAndPort(const Address::Ip& ip) {
209
0
  if (ip.version() == Address::IpVersion::v4) {
210
0
    return std::make_shared<Address::Ipv4Instance>(ip.addressAsString(), ip.port());
211
0
  }
212
0
  return std::make_shared<Address::Ipv6Instance>(ip.addressAsString(), ip.port());
213
0
}
214
215
// TODO(hennna): Currently getLocalAddress does not support choosing between
216
// multiple interfaces and addresses not returned by getifaddrs. In addition,
217
// the default is to return a loopback address of type version. This function may
218
// need to be updated in the future. Discussion can be found at Github issue #939.
219
1.97k
Address::InstanceConstSharedPtr Utility::getLocalAddress(const Address::IpVersion version) {
220
1.97k
  Address::InstanceConstSharedPtr ret;
221
1.97k
  if (Api::OsSysCallsSingleton::get().supportsGetifaddrs()) {
222
1.97k
    Api::InterfaceAddressVector interface_addresses{};
223
224
1.97k
    const Api::SysCallIntResult rc =
225
1.97k
        Api::OsSysCallsSingleton::get().getifaddrs(interface_addresses);
226
1.97k
    if (rc.return_value_ != 0) {
227
0
      ENVOY_LOG_MISC(debug, fmt::format("getifaddrs error: {}", rc.errno_));
228
1.97k
    } else {
229
      // man getifaddrs(3)
230
3.95k
      for (const auto& interface_address : interface_addresses) {
231
3.95k
        if (!isLoopbackAddress(*interface_address.interface_addr_) &&
232
3.95k
            interface_address.interface_addr_->ip()->version() == version) {
233
1.97k
          ret = interface_address.interface_addr_;
234
1.97k
          if (ret->ip()->version() == Address::IpVersion::v6) {
235
0
            ret = ret->ip()->ipv6()->addressWithoutScopeId();
236
0
          }
237
1.97k
          break;
238
1.97k
        }
239
3.95k
      }
240
1.97k
    }
241
1.97k
  }
242
243
  // If the local address is not found above, then return the loopback address by default.
244
1.97k
  if (ret == nullptr) {
245
0
    if (version == Address::IpVersion::v4) {
246
0
      ret = std::make_shared<Address::Ipv4Instance>("127.0.0.1");
247
0
    } else if (version == Address::IpVersion::v6) {
248
0
      ret = std::make_shared<Address::Ipv6Instance>("::1");
249
0
    }
250
0
  }
251
1.97k
  return ret;
252
1.97k
}
253
254
0
bool Utility::isSameIpOrLoopback(const ConnectionInfoProvider& connection_info_provider) {
255
  // These are local:
256
  // - Pipes
257
  // - Sockets to a loopback address
258
  // - Sockets where the local and remote address (ignoring port) are the same
259
0
  const auto& remote_address = connection_info_provider.remoteAddress();
260
0
  if (remote_address->type() == Address::Type::Pipe || isLoopbackAddress(*remote_address)) {
261
0
    return true;
262
0
  }
263
0
  const auto local_ip = connection_info_provider.localAddress()->ip();
264
0
  const auto remote_ip = remote_address->ip();
265
0
  if (remote_ip != nullptr && local_ip != nullptr &&
266
0
      remote_ip->addressAsString() == local_ip->addressAsString()) {
267
0
    return true;
268
0
  }
269
0
  return false;
270
0
}
271
272
66.9k
bool Utility::isInternalAddress(const Address::Instance& address) {
273
66.9k
  if (address.type() != Address::Type::Ip) {
274
10
    return false;
275
10
  }
276
277
66.9k
  if (address.ip()->version() == Address::IpVersion::v4) {
278
    // Handle the RFC1918 space for IPV4. Also count loopback as internal.
279
66.9k
    const uint32_t address4 = address.ip()->ipv4()->address();
280
66.9k
    const uint8_t* address4_bytes = reinterpret_cast<const uint8_t*>(&address4);
281
66.9k
    if ((address4_bytes[0] == 10) || (address4_bytes[0] == 192 && address4_bytes[1] == 168) ||
282
66.9k
        (address4_bytes[0] == 172 && address4_bytes[1] >= 16 && address4_bytes[1] <= 31) ||
283
66.9k
        address4 == htonl(INADDR_LOOPBACK)) {
284
15
      return true;
285
66.9k
    } else {
286
66.9k
      return false;
287
66.9k
    }
288
66.9k
  }
289
290
  // Local IPv6 address prefix defined in RFC4193. Local addresses have prefix FC00::/7.
291
  // Currently, the FD00::/8 prefix is locally assigned and FC00::/8 may be defined in the
292
  // future.
293
0
  static_assert(sizeof(absl::uint128) == sizeof(in6addr_loopback),
294
0
                "sizeof(absl::uint128) != sizeof(in6addr_loopback)");
295
0
  const absl::uint128 address6 = address.ip()->ipv6()->address();
296
0
  const uint8_t* address6_bytes = reinterpret_cast<const uint8_t*>(&address6);
297
0
  if (address6_bytes[0] == 0xfd ||
298
0
      memcmp(&address6, &in6addr_loopback, sizeof(in6addr_loopback)) == 0) {
299
0
    return true;
300
0
  }
301
302
0
  return false;
303
0
}
304
305
71.0k
bool Utility::isLoopbackAddress(const Address::Instance& address) {
306
71.0k
  if (address.type() != Address::Type::Ip) {
307
16
    return false;
308
16
  }
309
310
71.0k
  if (address.ip()->version() == Address::IpVersion::v4) {
311
    // Compare to the canonical v4 loopback address: 127.0.0.1.
312
71.0k
    return address.ip()->ipv4()->address() == htonl(INADDR_LOOPBACK);
313
71.0k
  } else if (address.ip()->version() == Address::IpVersion::v6) {
314
0
    static_assert(sizeof(absl::uint128) == sizeof(in6addr_loopback),
315
0
                  "sizeof(absl::uint128) != sizeof(in6addr_loopback)");
316
0
    absl::uint128 addr = address.ip()->ipv6()->address();
317
0
    return 0 == memcmp(&addr, &in6addr_loopback, sizeof(in6addr_loopback));
318
0
  }
319
0
  IS_ENVOY_BUG("unexpected address type");
320
0
  return false;
321
0
}
322
323
5.54k
Address::InstanceConstSharedPtr Utility::getCanonicalIpv4LoopbackAddress() {
324
5.54k
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
325
5.54k
                         new Address::Ipv4Instance("127.0.0.1", 0, nullptr));
326
5.54k
}
327
328
2
Address::InstanceConstSharedPtr Utility::getIpv6LoopbackAddress() {
329
2
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
330
2
                         new Address::Ipv6Instance("::1", 0, nullptr));
331
2
}
332
333
0
Address::InstanceConstSharedPtr Utility::getIpv4AnyAddress() {
334
0
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
335
0
                         new Address::Ipv4Instance(static_cast<uint32_t>(0)));
336
0
}
337
338
0
Address::InstanceConstSharedPtr Utility::getIpv6AnyAddress() {
339
0
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
340
0
                         new Address::Ipv6Instance(static_cast<uint32_t>(0)));
341
0
}
342
343
92.5k
const std::string& Utility::getIpv4CidrCatchAllAddress() {
344
92.5k
  CONSTRUCT_ON_FIRST_USE(std::string, "0.0.0.0/0");
345
92.5k
}
346
347
92.5k
const std::string& Utility::getIpv6CidrCatchAllAddress() {
348
92.5k
  CONSTRUCT_ON_FIRST_USE(std::string, "::/0");
349
92.5k
}
350
351
Address::InstanceConstSharedPtr Utility::getAddressWithPort(const Address::Instance& address,
352
1.49k
                                                            uint32_t port) {
353
1.49k
  switch (address.ip()->version()) {
354
1.48k
  case Address::IpVersion::v4:
355
1.48k
    return std::make_shared<Address::Ipv4Instance>(address.ip()->addressAsString(), port);
356
17
  case Address::IpVersion::v6:
357
17
    return std::make_shared<Address::Ipv6Instance>(address.ip()->addressAsString(), port);
358
1.49k
  }
359
0
  PANIC("not handled");
360
0
}
361
362
1.09k
Address::InstanceConstSharedPtr Utility::getOriginalDst(Socket& sock) {
363
1.09k
#ifdef SOL_IP
364
365
1.09k
  if (sock.addressType() != Address::Type::Ip) {
366
0
    return nullptr;
367
0
  }
368
369
1.09k
  auto ipVersion = sock.ipVersion();
370
1.09k
  if (!ipVersion.has_value()) {
371
0
    return nullptr;
372
0
  }
373
374
1.09k
  SocketOptionName opt_dst;
375
1.09k
  SocketOptionName opt_tp;
376
1.09k
  if (*ipVersion == Address::IpVersion::v4) {
377
1.01k
    opt_dst = ENVOY_SOCKET_SO_ORIGINAL_DST;
378
1.01k
    opt_tp = ENVOY_SOCKET_IP_TRANSPARENT;
379
1.01k
  } else {
380
85
    opt_dst = ENVOY_SOCKET_IP6T_SO_ORIGINAL_DST;
381
85
    opt_tp = ENVOY_SOCKET_IPV6_TRANSPARENT;
382
85
  }
383
384
1.09k
  sockaddr_storage orig_addr;
385
1.09k
  memset(&orig_addr, 0, sizeof(orig_addr));
386
1.09k
  socklen_t addr_len = sizeof(sockaddr_storage);
387
1.09k
  int status =
388
1.09k
      sock.getSocketOption(opt_dst.level(), opt_dst.option(), &orig_addr, &addr_len).return_value_;
389
390
1.09k
  if (status != 0) {
391
0
    if (Api::OsSysCallsSingleton::get().supportsIpTransparent(*ipVersion)) {
392
0
      socklen_t flag_len = sizeof(int);
393
0
      int is_tp;
394
0
      status =
395
0
          sock.getSocketOption(opt_tp.level(), opt_tp.option(), &is_tp, &flag_len).return_value_;
396
0
      if (status == 0 && is_tp) {
397
0
        return sock.ioHandle().localAddress();
398
0
      }
399
0
    }
400
0
    return nullptr;
401
0
  }
402
403
1.09k
  return Address::addressFromSockAddrOrDie(orig_addr, 0, -1, true /* default for v6 constructor */);
404
405
#else
406
  // TODO(zuercher): determine if connection redirection is possible under macOS (c.f. pfctl and
407
  // divert), and whether it's possible to find the learn destination address.
408
  UNREFERENCED_PARAMETER(sock);
409
  return nullptr;
410
#endif
411
1.09k
}
412
413
110k
absl::uint128 Utility::Ip6ntohl(const absl::uint128& address) {
414
110k
#ifdef ABSL_IS_LITTLE_ENDIAN
415
110k
  return flipOrder(address);
416
#else
417
  return address;
418
#endif
419
110k
}
420
421
6.36k
absl::uint128 Utility::Ip6htonl(const absl::uint128& address) {
422
6.36k
#ifdef ABSL_IS_LITTLE_ENDIAN
423
6.36k
  return flipOrder(address);
424
#else
425
  return address;
426
#endif
427
6.36k
}
428
429
117k
absl::uint128 Utility::flipOrder(const absl::uint128& input) {
430
117k
  absl::uint128 result{0};
431
117k
  absl::uint128 data = input;
432
1.99M
  for (int i = 0; i < 16; i++) {
433
1.87M
    result <<= 8;
434
1.87M
    result |= (data & 0x000000000000000000000000000000FF);
435
1.87M
    data >>= 8;
436
1.87M
  }
437
117k
  return result;
438
117k
}
439
440
Address::InstanceConstSharedPtr
441
0
Utility::protobufAddressToAddressNoThrow(const envoy::config::core::v3::Address& proto_address) {
442
0
  switch (proto_address.address_case()) {
443
0
  case envoy::config::core::v3::Address::AddressCase::kSocketAddress:
444
0
    return Utility::parseInternetAddressNoThrow(proto_address.socket_address().address(),
445
0
                                                proto_address.socket_address().port_value(),
446
0
                                                !proto_address.socket_address().ipv4_compat());
447
0
  case envoy::config::core::v3::Address::AddressCase::kPipe: {
448
0
    auto ret_or_error =
449
0
        Address::PipeInstance::create(proto_address.pipe().path(), proto_address.pipe().mode());
450
0
    if (ret_or_error.status().ok()) {
451
0
      return std::move(*ret_or_error);
452
0
    }
453
0
    return nullptr;
454
0
  }
455
0
  case envoy::config::core::v3::Address::AddressCase::kEnvoyInternalAddress:
456
0
    return std::make_shared<Address::EnvoyInternalInstance>(
457
0
        proto_address.envoy_internal_address().server_listener_name(),
458
0
        proto_address.envoy_internal_address().endpoint_id());
459
0
  case envoy::config::core::v3::Address::AddressCase::ADDRESS_NOT_SET:
460
0
    PANIC_DUE_TO_PROTO_UNSET;
461
0
  }
462
0
  PANIC_DUE_TO_CORRUPT_ENUM;
463
0
}
464
465
void Utility::addressToProtobufAddress(const Address::Instance& address,
466
15.9k
                                       envoy::config::core::v3::Address& proto_address) {
467
15.9k
  if (address.type() == Address::Type::Pipe) {
468
15.9k
    proto_address.mutable_pipe()->set_path(address.asString());
469
15.9k
  } else if (address.type() == Address::Type::Ip) {
470
44
    auto* socket_address = proto_address.mutable_socket_address();
471
44
    socket_address->set_address(address.ip()->addressAsString());
472
44
    socket_address->set_port_value(address.ip()->port());
473
44
  } else {
474
0
    ASSERT(address.type() == Address::Type::EnvoyInternal);
475
0
    auto* internal_address = proto_address.mutable_envoy_internal_address();
476
0
    internal_address->set_server_listener_name(address.envoyInternalAddress()->addressId());
477
0
    internal_address->set_endpoint_id(address.envoyInternalAddress()->endpointId());
478
0
  }
479
15.9k
}
480
481
Socket::Type
482
4.72k
Utility::protobufAddressSocketType(const envoy::config::core::v3::Address& proto_address) {
483
4.72k
  switch (proto_address.address_case()) {
484
2.62k
  case envoy::config::core::v3::Address::AddressCase::kSocketAddress: {
485
2.62k
    const auto protocol = proto_address.socket_address().protocol();
486
2.62k
    switch (protocol) {
487
0
      PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
488
2.46k
    case envoy::config::core::v3::SocketAddress::TCP:
489
2.46k
      return Socket::Type::Stream;
490
159
    case envoy::config::core::v3::SocketAddress::UDP:
491
159
      return Socket::Type::Datagram;
492
2.62k
    }
493
2.62k
  }
494
0
    PANIC_DUE_TO_CORRUPT_ENUM;
495
2.08k
  case envoy::config::core::v3::Address::AddressCase::kPipe:
496
2.08k
    return Socket::Type::Stream;
497
14
  case envoy::config::core::v3::Address::AddressCase::kEnvoyInternalAddress:
498
    // Currently internal address supports stream operation only.
499
14
    return Socket::Type::Stream;
500
0
  case envoy::config::core::v3::Address::AddressCase::ADDRESS_NOT_SET:
501
0
    PANIC_DUE_TO_PROTO_UNSET;
502
4.72k
  }
503
0
  PANIC_DUE_TO_CORRUPT_ENUM;
504
0
}
505
506
Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, const Buffer::Instance& buffer,
507
                                               const Address::Ip* local_ip,
508
0
                                               const Address::Instance& peer_address) {
509
0
  Buffer::RawSliceVector slices = buffer.getRawSlices();
510
0
  return writeToSocket(handle, slices.data(), slices.size(), local_ip, peer_address);
511
0
}
512
513
Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, Buffer::RawSlice* slices,
514
                                               uint64_t num_slices, const Address::Ip* local_ip,
515
0
                                               const Address::Instance& peer_address) {
516
0
  Api::IoCallUint64Result send_result(
517
0
      /*rc=*/0, /*err=*/Api::IoError::none());
518
519
0
  const bool is_connected = handle.wasConnected();
520
0
  do {
521
0
    if (is_connected) {
522
      // The socket is already connected, so the local and peer addresses should not be specified.
523
      // Instead, a writev is called.
524
0
      send_result = handle.writev(slices, num_slices);
525
0
    } else {
526
      // For non-connected sockets(), calling sendmsg with the peer address specified ensures the
527
      // connection happens first.
528
0
      send_result = handle.sendmsg(slices, num_slices, 0, local_ip, peer_address);
529
0
    }
530
0
  } while (!send_result.ok() &&
531
           // Send again if interrupted.
532
0
           send_result.err_->getErrorCode() == Api::IoError::IoErrorCode::Interrupt);
533
534
0
  if (send_result.ok()) {
535
0
    ENVOY_LOG_MISC(trace, "{} bytes {}", is_connected ? "writev" : "sendmsg",
536
0
                   send_result.return_value_);
537
0
  } else {
538
0
    ENVOY_LOG_MISC(debug, "{} failed with error code {}: {}", is_connected ? "writev" : "sendmsg",
539
0
                   static_cast<int>(send_result.err_->getErrorCode()),
540
0
                   send_result.err_->getErrorDetails());
541
0
  }
542
0
  return send_result;
543
0
}
544
545
namespace {
546
547
void passPayloadToProcessor(uint64_t bytes_read, Buffer::InstancePtr buffer,
548
                            Address::InstanceConstSharedPtr peer_addess,
549
                            Address::InstanceConstSharedPtr local_address,
550
                            UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time,
551
0
                            uint8_t tos, Buffer::RawSlice saved_cmsg) {
552
0
  ENVOY_BUG(peer_addess != nullptr,
553
0
            fmt::format("Unable to get remote address on the socket bound to local address: {}.",
554
0
                        (local_address == nullptr ? "unknown" : local_address->asString())));
555
556
  // Unix domain sockets are not supported
557
0
  ENVOY_BUG(peer_addess != nullptr && peer_addess->type() == Address::Type::Ip,
558
0
            fmt::format("Unsupported remote address: {} local address: {}, receive size: "
559
0
                        "{}",
560
0
                        peer_addess->asString(),
561
0
                        (local_address == nullptr ? "unknown" : local_address->asString()),
562
0
                        bytes_read));
563
0
  udp_packet_processor.processPacket(std::move(local_address), std::move(peer_addess),
564
0
                                     std::move(buffer), receive_time, tos, saved_cmsg);
565
0
}
566
567
Api::IoCallUint64Result readFromSocketRecvGro(IoHandle& handle,
568
                                              const Address::Instance& local_address,
569
                                              UdpPacketProcessor& udp_packet_processor,
570
                                              MonotonicTime receive_time, uint32_t* packets_dropped,
571
0
                                              uint32_t* num_packets_read) {
572
0
  ASSERT(Api::OsSysCallsSingleton::get().supportsUdpGro(),
573
0
         "cannot use GRO when the platform doesn't support it.");
574
0
  if (num_packets_read != nullptr) {
575
0
    *num_packets_read = 0;
576
0
  }
577
0
  Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
578
0
  IoHandle::RecvMsgOutput output(1, packets_dropped);
579
580
  // TODO(yugant): Avoid allocating 64k for each read by getting memory from UdpPacketProcessor
581
0
  const uint64_t max_rx_datagram_size_with_gro =
582
0
      num_packets_read != nullptr
583
0
          ? 64 * 1024
584
0
          : NUM_DATAGRAMS_PER_RECEIVE * udp_packet_processor.maxDatagramSize();
585
0
  ENVOY_LOG_MISC(trace, "starting gro recvmsg with max={}", max_rx_datagram_size_with_gro);
586
587
0
  Api::IoCallUint64Result result =
588
0
      receiveMessage(max_rx_datagram_size_with_gro, buffer, output, handle, local_address,
589
0
                     udp_packet_processor.saveCmsgConfig());
590
591
0
  if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
592
0
    return result;
593
0
  }
594
595
0
  const uint64_t gso_size = output.msg_[0].gso_size_;
596
0
  ENVOY_LOG_MISC(trace, "gro recvmsg bytes {} with gso_size as {}", result.return_value_, gso_size);
597
598
  // Skip gso segmentation and proceed as a single payload.
599
0
  if (gso_size == 0u) {
600
0
    if (num_packets_read != nullptr) {
601
0
      *num_packets_read += 1;
602
0
    }
603
0
    passPayloadToProcessor(result.return_value_, std::move(buffer),
604
0
                           std::move(output.msg_[0].peer_address_),
605
0
                           std::move(output.msg_[0].local_address_), udp_packet_processor,
606
0
                           receive_time, output.msg_[0].tos_, output.msg_[0].saved_cmsg_);
607
0
    return result;
608
0
  }
609
610
  // Segment the buffer read by the recvmsg syscall into gso_sized sub buffers.
611
  // TODO(mattklein123): The following code should be optimized to avoid buffer copies, either by
612
  // switching to slices or by using a CoW buffer type.
613
0
  while (buffer->length() > 0) {
614
0
    const uint64_t bytes_to_copy = std::min(buffer->length(), gso_size);
615
0
    Buffer::InstancePtr sub_buffer = std::make_unique<Buffer::OwnedImpl>();
616
0
    sub_buffer->move(*buffer, bytes_to_copy);
617
0
    if (num_packets_read != nullptr) {
618
0
      *num_packets_read += 1;
619
0
    }
620
0
    passPayloadToProcessor(bytes_to_copy, std::move(sub_buffer), output.msg_[0].peer_address_,
621
0
                           output.msg_[0].local_address_, udp_packet_processor, receive_time,
622
0
                           output.msg_[0].tos_, output.msg_[0].saved_cmsg_);
623
0
  }
624
625
0
  return result;
626
0
}
627
628
Api::IoCallUint64Result
629
readFromSocketRecvMmsg(IoHandle& handle, const Address::Instance& local_address,
630
                       UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time,
631
0
                       uint32_t* packets_dropped, uint32_t* num_packets_read) {
632
0
  ASSERT(Api::OsSysCallsSingleton::get().supportsMmsg(),
633
0
         "cannot use recvmmsg when the platform doesn't support it.");
634
0
  const auto max_rx_datagram_size = udp_packet_processor.maxDatagramSize();
635
0
  if (num_packets_read != nullptr) {
636
0
    *num_packets_read = 0;
637
0
  }
638
639
  // Buffer::ReservationSingleSlice is always passed by value, and can only be constructed
640
  // by Buffer::Instance::reserve(), so this is needed to keep a fixed array
641
  // in which all elements are legally constructed.
642
0
  struct BufferAndReservation {
643
0
    BufferAndReservation(uint64_t max_rx_datagram_size)
644
0
        : buffer_(std::make_unique<Buffer::OwnedImpl>()),
645
0
          reservation_(buffer_->reserveSingleSlice(max_rx_datagram_size, true)) {}
646
647
0
    Buffer::InstancePtr buffer_;
648
0
    Buffer::ReservationSingleSlice reservation_;
649
0
  };
650
0
  constexpr uint32_t num_slices_per_packet = 1u;
651
0
  absl::InlinedVector<BufferAndReservation, NUM_DATAGRAMS_PER_RECEIVE> buffers;
652
0
  RawSliceArrays slices(NUM_DATAGRAMS_PER_RECEIVE,
653
0
                        absl::FixedArray<Buffer::RawSlice>(num_slices_per_packet));
654
0
  for (uint32_t i = 0; i < NUM_DATAGRAMS_PER_RECEIVE; i++) {
655
0
    buffers.push_back(max_rx_datagram_size);
656
0
    slices[i][0] = buffers[i].reservation_.slice();
657
0
  }
658
659
0
  IoHandle::RecvMsgOutput output(NUM_DATAGRAMS_PER_RECEIVE, packets_dropped);
660
0
  ENVOY_LOG_MISC(trace, "starting recvmmsg with packets={} max={}", NUM_DATAGRAMS_PER_RECEIVE,
661
0
                 max_rx_datagram_size);
662
0
  Api::IoCallUint64Result result = handle.recvmmsg(slices, local_address.ip()->port(),
663
0
                                                   udp_packet_processor.saveCmsgConfig(), output);
664
0
  if (!result.ok()) {
665
0
    return result;
666
0
  }
667
668
0
  uint64_t packets_read = result.return_value_;
669
0
  ENVOY_LOG_MISC(trace, "recvmmsg read {} packets", packets_read);
670
0
  for (uint64_t i = 0; i < packets_read; ++i) {
671
0
    if (output.msg_[i].truncated_and_dropped_) {
672
0
      continue;
673
0
    }
674
675
0
    Buffer::RawSlice* slice = slices[i].data();
676
0
    const uint64_t msg_len = output.msg_[i].msg_len_;
677
0
    ASSERT(msg_len <= slice->len_);
678
0
    ENVOY_LOG_MISC(debug, "Receive a packet with {} bytes from {}", msg_len,
679
0
                   output.msg_[i].peer_address_->asString());
680
681
0
    buffers[i].reservation_.commit(std::min(max_rx_datagram_size, msg_len));
682
683
0
    if (num_packets_read != nullptr) {
684
0
      *num_packets_read += 1;
685
0
    }
686
0
    passPayloadToProcessor(msg_len, std::move(buffers[i].buffer_), output.msg_[i].peer_address_,
687
0
                           output.msg_[i].local_address_, udp_packet_processor, receive_time,
688
0
                           output.msg_[i].tos_, output.msg_[i].saved_cmsg_);
689
0
  }
690
0
  return result;
691
0
}
692
693
Api::IoCallUint64Result readFromSocketRecvMsg(IoHandle& handle,
694
                                              const Address::Instance& local_address,
695
                                              UdpPacketProcessor& udp_packet_processor,
696
                                              MonotonicTime receive_time, uint32_t* packets_dropped,
697
0
                                              uint32_t* num_packets_read) {
698
0
  if (num_packets_read != nullptr) {
699
0
    *num_packets_read = 0;
700
0
  }
701
0
  Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
702
0
  IoHandle::RecvMsgOutput output(1, packets_dropped);
703
704
0
  ENVOY_LOG_MISC(trace, "starting recvmsg with max={}", udp_packet_processor.maxDatagramSize());
705
0
  Api::IoCallUint64Result result =
706
0
      receiveMessage(udp_packet_processor.maxDatagramSize(), buffer, output, handle, local_address,
707
0
                     udp_packet_processor.saveCmsgConfig());
708
709
0
  if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
710
0
    return result;
711
0
  }
712
713
0
  ENVOY_LOG_MISC(trace, "recvmsg bytes {}", result.return_value_);
714
715
0
  if (num_packets_read != nullptr) {
716
0
    *num_packets_read = 1;
717
0
  }
718
0
  passPayloadToProcessor(result.return_value_, std::move(buffer),
719
0
                         std::move(output.msg_[0].peer_address_),
720
0
                         std::move(output.msg_[0].local_address_), udp_packet_processor,
721
0
                         receive_time, output.msg_[0].tos_, output.msg_[0].saved_cmsg_);
722
0
  return result;
723
0
}
724
725
} // namespace
726
727
Api::IoCallUint64Result
728
Utility::readFromSocket(IoHandle& handle, const Address::Instance& local_address,
729
                        UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time,
730
                        UdpRecvMsgMethod recv_msg_method, uint32_t* packets_dropped,
731
0
                        uint32_t* num_packets_read) {
732
0
  if (recv_msg_method == UdpRecvMsgMethod::RecvMsgWithGro) {
733
0
    return readFromSocketRecvGro(handle, local_address, udp_packet_processor, receive_time,
734
0
                                 packets_dropped, num_packets_read);
735
0
  } else if (recv_msg_method == UdpRecvMsgMethod::RecvMmsg) {
736
0
    return readFromSocketRecvMmsg(handle, local_address, udp_packet_processor, receive_time,
737
0
                                  packets_dropped, num_packets_read);
738
0
  }
739
0
  return readFromSocketRecvMsg(handle, local_address, udp_packet_processor, receive_time,
740
0
                               packets_dropped, num_packets_read);
741
0
}
742
743
Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
744
                                               const Address::Instance& local_address,
745
                                               UdpPacketProcessor& udp_packet_processor,
746
                                               TimeSource& time_source, bool allow_gro,
747
0
                                               bool allow_mmsg, uint32_t& packets_dropped) {
748
0
  UdpRecvMsgMethod recv_msg_method = UdpRecvMsgMethod::RecvMsg;
749
0
  if (allow_gro && handle.supportsUdpGro()) {
750
0
    recv_msg_method = UdpRecvMsgMethod::RecvMsgWithGro;
751
0
  } else if (allow_mmsg && handle.supportsMmsg()) {
752
0
    recv_msg_method = UdpRecvMsgMethod::RecvMmsg;
753
0
  }
754
755
  // Read at least one time, and attempt to read numPacketsExpectedPerEventLoop() packets unless
756
  // this goes over MAX_NUM_PACKETS_PER_EVENT_LOOP.
757
0
  size_t num_packets_to_read = std::min<size_t>(
758
0
      MAX_NUM_PACKETS_PER_EVENT_LOOP, udp_packet_processor.numPacketsExpectedPerEventLoop());
759
0
  const bool apply_read_limit_differently = Runtime::runtimeFeatureEnabled(
760
0
      "envoy.reloadable_features.udp_socket_apply_aggregated_read_limit");
761
0
  size_t num_reads;
762
0
  if (apply_read_limit_differently) {
763
    // Call socket read at least once and at most num_packets_read to avoid infinite loop.
764
0
    num_reads = std::max<size_t>(1, num_packets_to_read);
765
0
  } else {
766
0
    switch (recv_msg_method) {
767
0
    case UdpRecvMsgMethod::RecvMsgWithGro:
768
0
      num_reads = (num_packets_to_read / NUM_DATAGRAMS_PER_RECEIVE);
769
0
      break;
770
0
    case UdpRecvMsgMethod::RecvMmsg:
771
0
      num_reads = (num_packets_to_read / NUM_DATAGRAMS_PER_RECEIVE);
772
0
      break;
773
0
    case UdpRecvMsgMethod::RecvMsg:
774
0
      num_reads = num_packets_to_read;
775
0
      break;
776
0
    }
777
    // Make sure to read at least once.
778
0
    num_reads = std::max<size_t>(1, num_reads);
779
0
  }
780
781
0
  do {
782
0
    const uint32_t old_packets_dropped = packets_dropped;
783
0
    uint32_t num_packets_processed = 0;
784
0
    const MonotonicTime receive_time = time_source.monotonicTime();
785
0
    Api::IoCallUint64Result result = Utility::readFromSocket(
786
0
        handle, local_address, udp_packet_processor, receive_time, recv_msg_method,
787
0
        &packets_dropped, apply_read_limit_differently ? &num_packets_processed : nullptr);
788
789
0
    if (!result.ok()) {
790
      // No more to read or encountered a system error.
791
0
      return std::move(result.err_);
792
0
    }
793
794
0
    if (packets_dropped != old_packets_dropped) {
795
      // The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller
796
      // value. So as long as this count differs from previously recorded value,
797
      // more packets are dropped by kernel.
798
0
      const uint32_t delta =
799
0
          (packets_dropped > old_packets_dropped)
800
0
              ? (packets_dropped - old_packets_dropped)
801
0
              : (packets_dropped + (std::numeric_limits<uint32_t>::max() - old_packets_dropped) +
802
0
                 1);
803
0
      ENVOY_LOG_EVERY_POW_2_MISC(
804
0
          warn,
805
0
          "Kernel dropped {} datagram(s). Consider increasing receive buffer size and/or "
806
0
          "max datagram size.",
807
0
          delta);
808
0
      udp_packet_processor.onDatagramsDropped(delta);
809
0
    }
810
0
    if (apply_read_limit_differently) {
811
0
      if (num_packets_to_read <= num_packets_processed) {
812
0
        return std::move(result.err_);
813
0
      }
814
0
      num_packets_to_read -= num_packets_processed;
815
0
    }
816
0
    --num_reads;
817
0
    if (num_reads == 0) {
818
0
      return std::move(result.err_);
819
0
    }
820
0
  } while (true);
821
0
}
822
823
ResolvedUdpSocketConfig::ResolvedUdpSocketConfig(
824
    const envoy::config::core::v3::UdpSocketConfig& config, bool prefer_gro_default)
825
    : max_rx_datagram_size_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_rx_datagram_size,
826
                                                            DEFAULT_UDP_MAX_DATAGRAM_SIZE)),
827
0
      prefer_gro_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, prefer_gro, prefer_gro_default)) {
828
0
  if (prefer_gro_ && !Api::OsSysCallsSingleton::get().supportsUdpGro()) {
829
0
    ENVOY_LOG_MISC(
830
0
        warn, "GRO requested but not supported by the OS. Check OS config or disable prefer_gro.");
831
0
  }
832
0
}
833
834
} // namespace Network
835
} // namespace Envoy