Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/network/utility.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/network/utility.h"
2
3
#include <cstdint>
4
#include <list>
5
#include <memory>
6
#include <sstream>
7
#include <string>
8
#include <vector>
9
10
#include "envoy/buffer/buffer.h"
11
#include "envoy/common/exception.h"
12
#include "envoy/common/platform.h"
13
#include "envoy/config/core/v3/address.pb.h"
14
#include "envoy/network/address.h"
15
#include "envoy/network/connection.h"
16
17
#include "source/common/api/os_sys_calls_impl.h"
18
#include "source/common/buffer/buffer_impl.h"
19
#include "source/common/common/assert.h"
20
#include "source/common/common/cleanup.h"
21
#include "source/common/common/fmt.h"
22
#include "source/common/common/utility.h"
23
#include "source/common/network/address_impl.h"
24
#include "source/common/network/io_socket_error_impl.h"
25
#include "source/common/network/socket_option_impl.h"
26
#include "source/common/protobuf/protobuf.h"
27
#include "source/common/protobuf/utility.h"
28
#include "source/common/runtime/runtime_features.h"
29
30
#include "absl/container/fixed_array.h"
31
#include "absl/strings/match.h"
32
#include "absl/strings/string_view.h"
33
34
namespace Envoy {
35
namespace Network {
36
37
935k
Address::InstanceConstSharedPtr instanceOrNull(StatusOr<Address::InstanceConstSharedPtr> address) {
38
935k
  if (address.ok()) {
39
935k
    return *address;
40
935k
  }
41
0
  return nullptr;
42
935k
}
43
44
0
std::string Utility::urlFromDatagramAddress(const Address::Instance& addr) {
45
0
  if (addr.ip() != nullptr) {
46
0
    return absl::StrCat(UDP_SCHEME, addr.asStringView());
47
0
  } else {
48
0
    return absl::StrCat(UNIX_SCHEME, addr.asStringView());
49
0
  }
50
0
}
51
52
628k
Address::InstanceConstSharedPtr Utility::resolveUrl(const std::string& url) {
53
628k
  if (urlIsTcpScheme(url)) {
54
625k
    return parseInternetAddressAndPort(url.substr(TCP_SCHEME.size()));
55
625k
  } else if (urlIsUdpScheme(url)) {
56
237
    return parseInternetAddressAndPort(url.substr(UDP_SCHEME.size()));
57
1.93k
  } else if (urlIsUnixScheme(url)) {
58
333
    return std::make_shared<Address::PipeInstance>(url.substr(UNIX_SCHEME.size()));
59
1.60k
  } else {
60
1.60k
    throwEnvoyExceptionOrPanic(absl::StrCat("unknown protocol scheme: ", url));
61
1.60k
  }
62
628k
}
63
64
0
StatusOr<Socket::Type> Utility::socketTypeFromUrl(const std::string& url) {
65
0
  if (urlIsTcpScheme(url)) {
66
0
    return Socket::Type::Stream;
67
0
  } else if (urlIsUdpScheme(url)) {
68
0
    return Socket::Type::Datagram;
69
0
  } else if (urlIsUnixScheme(url)) {
70
0
    return Socket::Type::Stream;
71
0
  } else {
72
0
    return absl::InvalidArgumentError(absl::StrCat("unknown protocol scheme: ", url));
73
0
  }
74
0
}
75
76
628k
bool Utility::urlIsTcpScheme(absl::string_view url) { return absl::StartsWith(url, TCP_SCHEME); }
77
78
2.17k
bool Utility::urlIsUdpScheme(absl::string_view url) { return absl::StartsWith(url, UDP_SCHEME); }
79
80
1.93k
bool Utility::urlIsUnixScheme(absl::string_view url) { return absl::StartsWith(url, UNIX_SCHEME); }
81
82
namespace {
83
84
Api::IoCallUint64Result receiveMessage(uint64_t max_rx_datagram_size, Buffer::InstancePtr& buffer,
85
                                       IoHandle::RecvMsgOutput& output, IoHandle& handle,
86
0
                                       const Address::Instance& local_address) {
87
88
0
  auto reservation = buffer->reserveSingleSlice(max_rx_datagram_size);
89
0
  Buffer::RawSlice slice = reservation.slice();
90
0
  Api::IoCallUint64Result result = handle.recvmsg(&slice, 1, local_address.ip()->port(), output);
91
92
0
  if (result.ok()) {
93
0
    reservation.commit(std::min(max_rx_datagram_size, result.return_value_));
94
0
  }
95
96
0
  return result;
97
0
}
98
99
940k
StatusOr<sockaddr_in> parseV4Address(const std::string& ip_address, uint16_t port) {
100
940k
  sockaddr_in sa4;
101
940k
  memset(&sa4, 0, sizeof(sa4));
102
940k
  if (inet_pton(AF_INET, ip_address.c_str(), &sa4.sin_addr) != 1) {
103
132k
    return absl::FailedPreconditionError("failed parsing ipv4");
104
132k
  }
105
808k
  sa4.sin_family = AF_INET;
106
808k
  sa4.sin_port = htons(port);
107
808k
  return sa4;
108
940k
}
109
110
132k
StatusOr<sockaddr_in6> parseV6Address(const std::string& ip_address, uint16_t port) {
111
  // Parse IPv6 with optional scope using getaddrinfo().
112
132k
  struct addrinfo hints;
113
132k
  memset(&hints, 0, sizeof(hints));
114
132k
  struct addrinfo* res = nullptr;
115
  // Suppresses any potentially lengthy network host address lookups and inhibit the invocation of
116
  // a name resolution service.
117
132k
  hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
118
132k
  hints.ai_family = AF_INET6;
119
  // Given that we don't specify a service but we use getaddrinfo() to only parse the node
120
  // address, specifying the socket type allows to hint the getaddrinfo() to return only an
121
  // element with the below socket type. The behavior though remains platform dependent and anyway
122
  // we consume only the first element (if the call succeeds).
123
132k
  hints.ai_socktype = SOCK_DGRAM;
124
132k
  hints.ai_protocol = IPPROTO_UDP;
125
132k
  const Api::SysCallIntResult rc = Api::OsSysCallsSingleton::get().getaddrinfo(
126
132k
      ip_address.c_str(), /*service=*/nullptr, &hints, &res);
127
132k
  if (rc.return_value_ != 0) {
128
5.18k
    return absl::FailedPreconditionError(fmt::format("getaddrinfo error: {}", rc.return_value_));
129
5.18k
  }
130
127k
  sockaddr_in6 sa6 = *reinterpret_cast<sockaddr_in6*>(res->ai_addr);
131
127k
  freeaddrinfo(res);
132
127k
  sa6.sin6_port = htons(port);
133
127k
  return sa6;
134
132k
}
135
136
} // namespace
137
138
Address::InstanceConstSharedPtr Utility::parseInternetAddressNoThrow(const std::string& ip_address,
139
314k
                                                                     uint16_t port, bool v6only) {
140
314k
  StatusOr<sockaddr_in> sa4 = parseV4Address(ip_address, port);
141
314k
  if (sa4.ok()) {
142
182k
    return instanceOrNull(
143
182k
        Address::InstanceFactory::createInstancePtr<Address::Ipv4Instance>(&sa4.value()));
144
182k
  }
145
146
132k
  StatusOr<sockaddr_in6> sa6 = parseV6Address(ip_address, port);
147
132k
  if (sa6.ok()) {
148
126k
    return instanceOrNull(
149
126k
        Address::InstanceFactory::createInstancePtr<Address::Ipv6Instance>(*sa6, v6only));
150
126k
  }
151
5.07k
  return nullptr;
152
132k
}
153
154
Address::InstanceConstSharedPtr Utility::parseInternetAddress(const std::string& ip_address,
155
308k
                                                              uint16_t port, bool v6only) {
156
308k
  const Address::InstanceConstSharedPtr address =
157
308k
      parseInternetAddressNoThrow(ip_address, port, v6only);
158
308k
  if (address == nullptr) {
159
70
    throwWithMalformedIp(ip_address);
160
70
  }
161
308k
  return address;
162
308k
}
163
164
Address::InstanceConstSharedPtr
165
626k
Utility::parseInternetAddressAndPortNoThrow(const std::string& ip_address, bool v6only) {
166
626k
  if (ip_address.empty()) {
167
7
    return nullptr;
168
7
  }
169
626k
  if (ip_address[0] == '[') {
170
    // Appears to be an IPv6 address. Find the "]:" that separates the address from the port.
171
643
    const auto pos = ip_address.rfind("]:");
172
643
    if (pos == std::string::npos) {
173
44
      return nullptr;
174
44
    }
175
599
    const auto ip_str = ip_address.substr(1, pos - 1);
176
599
    const auto port_str = ip_address.substr(pos + 2);
177
599
    uint64_t port64 = 0;
178
599
    if (port_str.empty() || !absl::SimpleAtoi(port_str, &port64) || port64 > 65535) {
179
330
      return nullptr;
180
330
    }
181
269
    StatusOr<sockaddr_in6> sa6 = parseV6Address(ip_str, port64);
182
269
    if (sa6.ok()) {
183
164
      return instanceOrNull(
184
164
          Address::InstanceFactory::createInstancePtr<Address::Ipv6Instance>(*sa6, v6only));
185
164
    }
186
105
    return nullptr;
187
269
  }
188
  // Treat it as an IPv4 address followed by a port.
189
625k
  const auto pos = ip_address.rfind(':');
190
625k
  if (pos == std::string::npos) {
191
20
    return nullptr;
192
20
  }
193
625k
  const auto ip_str = ip_address.substr(0, pos);
194
625k
  const auto port_str = ip_address.substr(pos + 1);
195
625k
  uint64_t port64 = 0;
196
625k
  if (port_str.empty() || !absl::SimpleAtoi(port_str, &port64) || port64 > 65535) {
197
256
    return nullptr;
198
256
  }
199
625k
  StatusOr<sockaddr_in> sa4 = parseV4Address(ip_str, port64);
200
625k
  if (sa4.ok()) {
201
625k
    return instanceOrNull(
202
625k
        Address::InstanceFactory::createInstancePtr<Address::Ipv4Instance>(&sa4.value()));
203
625k
  }
204
96
  return nullptr;
205
625k
}
206
207
Address::InstanceConstSharedPtr Utility::parseInternetAddressAndPort(const std::string& ip_address,
208
626k
                                                                     bool v6only) {
209
210
626k
  const Address::InstanceConstSharedPtr address =
211
626k
      parseInternetAddressAndPortNoThrow(ip_address, v6only);
212
626k
  if (address == nullptr) {
213
858
    throwWithMalformedIp(ip_address);
214
858
  }
215
626k
  return address;
216
626k
}
217
218
0
Address::InstanceConstSharedPtr Utility::copyInternetAddressAndPort(const Address::Ip& ip) {
219
0
  if (ip.version() == Address::IpVersion::v4) {
220
0
    return std::make_shared<Address::Ipv4Instance>(ip.addressAsString(), ip.port());
221
0
  }
222
0
  return std::make_shared<Address::Ipv6Instance>(ip.addressAsString(), ip.port());
223
0
}
224
225
928
void Utility::throwWithMalformedIp(absl::string_view ip_address) {
226
928
  throwEnvoyExceptionOrPanic(absl::StrCat("malformed IP address: ", ip_address));
227
928
}
228
229
// TODO(hennna): Currently getLocalAddress does not support choosing between
230
// multiple interfaces and addresses not returned by getifaddrs. In addition,
231
// the default is to return a loopback address of type version. This function may
232
// need to be updated in the future. Discussion can be found at Github issue #939.
233
2.64k
Address::InstanceConstSharedPtr Utility::getLocalAddress(const Address::IpVersion version) {
234
2.64k
  Address::InstanceConstSharedPtr ret;
235
2.64k
  if (Api::OsSysCallsSingleton::get().supportsGetifaddrs()) {
236
2.64k
    Api::InterfaceAddressVector interface_addresses{};
237
238
2.64k
    const Api::SysCallIntResult rc =
239
2.64k
        Api::OsSysCallsSingleton::get().getifaddrs(interface_addresses);
240
2.64k
    if (rc.return_value_ != 0) {
241
0
      ENVOY_LOG_MISC(debug, fmt::format("getifaddrs error: {}", rc.errno_));
242
2.64k
    } else {
243
      // man getifaddrs(3)
244
5.29k
      for (const auto& interface_address : interface_addresses) {
245
5.29k
        if (!isLoopbackAddress(*interface_address.interface_addr_) &&
246
5.29k
            interface_address.interface_addr_->ip()->version() == version) {
247
2.64k
          ret = interface_address.interface_addr_;
248
2.64k
          if (ret->ip()->version() == Address::IpVersion::v6) {
249
0
            ret = ret->ip()->ipv6()->addressWithoutScopeId();
250
0
          }
251
2.64k
          break;
252
2.64k
        }
253
5.29k
      }
254
2.64k
    }
255
2.64k
  }
256
257
  // If the local address is not found above, then return the loopback address by default.
258
2.64k
  if (ret == nullptr) {
259
0
    if (version == Address::IpVersion::v4) {
260
0
      ret = std::make_shared<Address::Ipv4Instance>("127.0.0.1");
261
0
    } else if (version == Address::IpVersion::v6) {
262
0
      ret = std::make_shared<Address::Ipv6Instance>("::1");
263
0
    }
264
0
  }
265
2.64k
  return ret;
266
2.64k
}
267
268
0
bool Utility::isSameIpOrLoopback(const ConnectionInfoProvider& connection_info_provider) {
269
  // These are local:
270
  // - Pipes
271
  // - Sockets to a loopback address
272
  // - Sockets where the local and remote address (ignoring port) are the same
273
0
  const auto& remote_address = connection_info_provider.remoteAddress();
274
0
  if (remote_address->type() == Address::Type::Pipe || isLoopbackAddress(*remote_address)) {
275
0
    return true;
276
0
  }
277
0
  const auto local_ip = connection_info_provider.localAddress()->ip();
278
0
  const auto remote_ip = remote_address->ip();
279
0
  if (remote_ip != nullptr && local_ip != nullptr &&
280
0
      remote_ip->addressAsString() == local_ip->addressAsString()) {
281
0
    return true;
282
0
  }
283
0
  return false;
284
0
}
285
286
60.6k
bool Utility::isInternalAddress(const Address::Instance& address) {
287
60.6k
  if (address.type() != Address::Type::Ip) {
288
10
    return false;
289
10
  }
290
291
60.5k
  if (address.ip()->version() == Address::IpVersion::v4) {
292
    // Handle the RFC1918 space for IPV4. Also count loopback as internal.
293
60.5k
    const uint32_t address4 = address.ip()->ipv4()->address();
294
60.5k
    const uint8_t* address4_bytes = reinterpret_cast<const uint8_t*>(&address4);
295
60.5k
    if ((address4_bytes[0] == 10) || (address4_bytes[0] == 192 && address4_bytes[1] == 168) ||
296
60.5k
        (address4_bytes[0] == 172 && address4_bytes[1] >= 16 && address4_bytes[1] <= 31) ||
297
60.5k
        address4 == htonl(INADDR_LOOPBACK)) {
298
14
      return true;
299
60.5k
    } else {
300
60.5k
      return false;
301
60.5k
    }
302
60.5k
  }
303
304
  // Local IPv6 address prefix defined in RFC4193. Local addresses have prefix FC00::/7.
305
  // Currently, the FD00::/8 prefix is locally assigned and FC00::/8 may be defined in the
306
  // future.
307
0
  static_assert(sizeof(absl::uint128) == sizeof(in6addr_loopback),
308
0
                "sizeof(absl::uint128) != sizeof(in6addr_loopback)");
309
0
  const absl::uint128 address6 = address.ip()->ipv6()->address();
310
0
  const uint8_t* address6_bytes = reinterpret_cast<const uint8_t*>(&address6);
311
0
  if (address6_bytes[0] == 0xfd ||
312
0
      memcmp(&address6, &in6addr_loopback, sizeof(in6addr_loopback)) == 0) {
313
0
    return true;
314
0
  }
315
316
0
  return false;
317
0
}
318
319
68.7k
bool Utility::isLoopbackAddress(const Address::Instance& address) {
320
68.7k
  if (address.type() != Address::Type::Ip) {
321
24
    return false;
322
24
  }
323
324
68.6k
  if (address.ip()->version() == Address::IpVersion::v4) {
325
    // Compare to the canonical v4 loopback address: 127.0.0.1.
326
68.6k
    return address.ip()->ipv4()->address() == htonl(INADDR_LOOPBACK);
327
68.6k
  } else if (address.ip()->version() == Address::IpVersion::v6) {
328
0
    static_assert(sizeof(absl::uint128) == sizeof(in6addr_loopback),
329
0
                  "sizeof(absl::uint128) != sizeof(in6addr_loopback)");
330
0
    absl::uint128 addr = address.ip()->ipv6()->address();
331
0
    return 0 == memcmp(&addr, &in6addr_loopback, sizeof(in6addr_loopback));
332
0
  }
333
0
  IS_ENVOY_BUG("unexpected address type");
334
0
  return false;
335
0
}
336
337
7.60k
Address::InstanceConstSharedPtr Utility::getCanonicalIpv4LoopbackAddress() {
338
7.60k
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
339
7.60k
                         new Address::Ipv4Instance("127.0.0.1", 0, nullptr));
340
7.60k
}
341
342
2
Address::InstanceConstSharedPtr Utility::getIpv6LoopbackAddress() {
343
2
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
344
2
                         new Address::Ipv6Instance("::1", 0, nullptr));
345
2
}
346
347
0
Address::InstanceConstSharedPtr Utility::getIpv4AnyAddress() {
348
0
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
349
0
                         new Address::Ipv4Instance(static_cast<uint32_t>(0)));
350
0
}
351
352
0
Address::InstanceConstSharedPtr Utility::getIpv6AnyAddress() {
353
0
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
354
0
                         new Address::Ipv6Instance(static_cast<uint32_t>(0)));
355
0
}
356
357
108k
const std::string& Utility::getIpv4CidrCatchAllAddress() {
358
108k
  CONSTRUCT_ON_FIRST_USE(std::string, "0.0.0.0/0");
359
108k
}
360
361
108k
const std::string& Utility::getIpv6CidrCatchAllAddress() {
362
108k
  CONSTRUCT_ON_FIRST_USE(std::string, "::/0");
363
108k
}
364
365
Address::InstanceConstSharedPtr Utility::getAddressWithPort(const Address::Instance& address,
366
1.33k
                                                            uint32_t port) {
367
1.33k
  switch (address.ip()->version()) {
368
1.32k
  case Address::IpVersion::v4:
369
1.32k
    return std::make_shared<Address::Ipv4Instance>(address.ip()->addressAsString(), port);
370
18
  case Address::IpVersion::v6:
371
18
    return std::make_shared<Address::Ipv6Instance>(address.ip()->addressAsString(), port);
372
1.33k
  }
373
0
  PANIC("not handled");
374
0
}
375
376
713
Address::InstanceConstSharedPtr Utility::getOriginalDst(Socket& sock) {
377
713
#ifdef SOL_IP
378
379
713
  if (sock.addressType() != Address::Type::Ip) {
380
0
    return nullptr;
381
0
  }
382
383
713
  auto ipVersion = sock.ipVersion();
384
713
  if (!ipVersion.has_value()) {
385
0
    return nullptr;
386
0
  }
387
388
713
  SocketOptionName opt_dst;
389
713
  SocketOptionName opt_tp;
390
713
  if (*ipVersion == Address::IpVersion::v4) {
391
622
    opt_dst = ENVOY_SOCKET_SO_ORIGINAL_DST;
392
622
    opt_tp = ENVOY_SOCKET_IP_TRANSPARENT;
393
622
  } else {
394
91
    opt_dst = ENVOY_SOCKET_IP6T_SO_ORIGINAL_DST;
395
91
    opt_tp = ENVOY_SOCKET_IPV6_TRANSPARENT;
396
91
  }
397
398
713
  sockaddr_storage orig_addr;
399
713
  memset(&orig_addr, 0, sizeof(orig_addr));
400
713
  socklen_t addr_len = sizeof(sockaddr_storage);
401
713
  int status =
402
713
      sock.getSocketOption(opt_dst.level(), opt_dst.option(), &orig_addr, &addr_len).return_value_;
403
404
713
  if (status != 0) {
405
0
    if (Api::OsSysCallsSingleton::get().supportsIpTransparent(*ipVersion)) {
406
0
      socklen_t flag_len = sizeof(int);
407
0
      int is_tp;
408
0
      status =
409
0
          sock.getSocketOption(opt_tp.level(), opt_tp.option(), &is_tp, &flag_len).return_value_;
410
0
      if (status == 0 && is_tp) {
411
0
        return sock.ioHandle().localAddress();
412
0
      }
413
0
    }
414
0
    return nullptr;
415
0
  }
416
417
713
  return Address::addressFromSockAddrOrDie(orig_addr, 0, -1, true /* default for v6 constructor */);
418
419
#else
420
  // TODO(zuercher): determine if connection redirection is possible under macOS (c.f. pfctl and
421
  // divert), and whether it's possible to find the learn destination address.
422
  UNREFERENCED_PARAMETER(sock);
423
  return nullptr;
424
#endif
425
713
}
426
427
0
void Utility::parsePortRangeList(absl::string_view string, std::list<PortRange>& list) {
428
0
  const auto ranges = StringUtil::splitToken(string, ",");
429
0
  for (const auto& s : ranges) {
430
0
    const std::string s_string{s};
431
0
    std::stringstream ss(s_string);
432
0
    uint32_t min = 0;
433
0
    uint32_t max = 0;
434
435
0
    if (absl::StrContains(s, '-')) {
436
0
      char dash = 0;
437
0
      ss >> min;
438
0
      ss >> dash;
439
0
      ss >> max;
440
0
    } else {
441
0
      ss >> min;
442
0
      max = min;
443
0
    }
444
445
0
    if (s.empty() || (min > 65535) || (max > 65535) || ss.fail() || !ss.eof()) {
446
0
      throwEnvoyExceptionOrPanic(fmt::format("invalid port number or range '{}'", s_string));
447
0
    }
448
449
0
    list.emplace_back(PortRange(min, max));
450
0
  }
451
0
}
452
453
0
bool Utility::portInRangeList(const Address::Instance& address, const std::list<PortRange>& list) {
454
0
  if (address.type() != Address::Type::Ip) {
455
0
    return false;
456
0
  }
457
458
0
  for (const PortRange& p : list) {
459
0
    if (p.contains(address.ip()->port())) {
460
0
      return true;
461
0
    }
462
0
  }
463
0
  return false;
464
0
}
465
466
134k
absl::uint128 Utility::Ip6ntohl(const absl::uint128& address) {
467
134k
#ifdef ABSL_IS_LITTLE_ENDIAN
468
134k
  return flipOrder(address);
469
#else
470
  return address;
471
#endif
472
134k
}
473
474
10.5k
absl::uint128 Utility::Ip6htonl(const absl::uint128& address) {
475
10.5k
#ifdef ABSL_IS_LITTLE_ENDIAN
476
10.5k
  return flipOrder(address);
477
#else
478
  return address;
479
#endif
480
10.5k
}
481
482
144k
absl::uint128 Utility::flipOrder(const absl::uint128& input) {
483
144k
  absl::uint128 result{0};
484
144k
  absl::uint128 data = input;
485
2.46M
  for (int i = 0; i < 16; i++) {
486
2.31M
    result <<= 8;
487
2.31M
    result |= (data & 0x000000000000000000000000000000FF);
488
2.31M
    data >>= 8;
489
2.31M
  }
490
144k
  return result;
491
144k
}
492
493
Address::InstanceConstSharedPtr
494
0
Utility::protobufAddressToAddress(const envoy::config::core::v3::Address& proto_address) {
495
0
  switch (proto_address.address_case()) {
496
0
  case envoy::config::core::v3::Address::AddressCase::kSocketAddress:
497
0
    return Utility::parseInternetAddress(proto_address.socket_address().address(),
498
0
                                         proto_address.socket_address().port_value(),
499
0
                                         !proto_address.socket_address().ipv4_compat());
500
0
  case envoy::config::core::v3::Address::AddressCase::kPipe:
501
0
    return std::make_shared<Address::PipeInstance>(proto_address.pipe().path(),
502
0
                                                   proto_address.pipe().mode());
503
0
  case envoy::config::core::v3::Address::AddressCase::kEnvoyInternalAddress:
504
0
    return std::make_shared<Address::EnvoyInternalInstance>(
505
0
        proto_address.envoy_internal_address().server_listener_name(),
506
0
        proto_address.envoy_internal_address().endpoint_id());
507
0
  case envoy::config::core::v3::Address::AddressCase::ADDRESS_NOT_SET:
508
0
    PANIC_DUE_TO_PROTO_UNSET;
509
0
  }
510
0
  PANIC_DUE_TO_CORRUPT_ENUM;
511
0
}
512
513
void Utility::addressToProtobufAddress(const Address::Instance& address,
514
32
                                       envoy::config::core::v3::Address& proto_address) {
515
32
  if (address.type() == Address::Type::Pipe) {
516
6
    proto_address.mutable_pipe()->set_path(address.asString());
517
26
  } else if (address.type() == Address::Type::Ip) {
518
26
    auto* socket_address = proto_address.mutable_socket_address();
519
26
    socket_address->set_address(address.ip()->addressAsString());
520
26
    socket_address->set_port_value(address.ip()->port());
521
26
  } else {
522
0
    ASSERT(address.type() == Address::Type::EnvoyInternal);
523
0
    auto* internal_address = proto_address.mutable_envoy_internal_address();
524
0
    internal_address->set_server_listener_name(address.envoyInternalAddress()->addressId());
525
0
    internal_address->set_endpoint_id(address.envoyInternalAddress()->endpointId());
526
0
  }
527
32
}
528
529
Socket::Type
530
6.08k
Utility::protobufAddressSocketType(const envoy::config::core::v3::Address& proto_address) {
531
6.08k
  switch (proto_address.address_case()) {
532
3.41k
  case envoy::config::core::v3::Address::AddressCase::kSocketAddress: {
533
3.41k
    const auto protocol = proto_address.socket_address().protocol();
534
3.41k
    switch (protocol) {
535
0
      PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
536
3.32k
    case envoy::config::core::v3::SocketAddress::TCP:
537
3.32k
      return Socket::Type::Stream;
538
92
    case envoy::config::core::v3::SocketAddress::UDP:
539
92
      return Socket::Type::Datagram;
540
3.41k
    }
541
3.41k
  }
542
0
    PANIC_DUE_TO_CORRUPT_ENUM;
543
2.66k
  case envoy::config::core::v3::Address::AddressCase::kPipe:
544
2.66k
    return Socket::Type::Stream;
545
7
  case envoy::config::core::v3::Address::AddressCase::kEnvoyInternalAddress:
546
    // Currently internal address supports stream operation only.
547
7
    return Socket::Type::Stream;
548
0
  case envoy::config::core::v3::Address::AddressCase::ADDRESS_NOT_SET:
549
0
    PANIC_DUE_TO_PROTO_UNSET;
550
6.08k
  }
551
0
  PANIC_DUE_TO_CORRUPT_ENUM;
552
0
}
553
554
Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, const Buffer::Instance& buffer,
555
                                               const Address::Ip* local_ip,
556
0
                                               const Address::Instance& peer_address) {
557
0
  Buffer::RawSliceVector slices = buffer.getRawSlices();
558
0
  return writeToSocket(handle, slices.data(), slices.size(), local_ip, peer_address);
559
0
}
560
561
Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, Buffer::RawSlice* slices,
562
                                               uint64_t num_slices, const Address::Ip* local_ip,
563
0
                                               const Address::Instance& peer_address) {
564
0
  Api::IoCallUint64Result send_result(
565
0
      /*rc=*/0, /*err=*/Api::IoError::none());
566
0
  do {
567
0
    send_result = handle.sendmsg(slices, num_slices, 0, local_ip, peer_address);
568
0
  } while (!send_result.ok() &&
569
           // Send again if interrupted.
570
0
           send_result.err_->getErrorCode() == Api::IoError::IoErrorCode::Interrupt);
571
572
0
  if (send_result.ok()) {
573
0
    ENVOY_LOG_MISC(trace, "sendmsg bytes {}", send_result.return_value_);
574
0
  } else {
575
0
    ENVOY_LOG_MISC(debug, "sendmsg failed with error code {}: {}",
576
0
                   static_cast<int>(send_result.err_->getErrorCode()),
577
0
                   send_result.err_->getErrorDetails());
578
0
  }
579
0
  return send_result;
580
0
}
581
582
void passPayloadToProcessor(uint64_t bytes_read, Buffer::InstancePtr buffer,
583
                            Address::InstanceConstSharedPtr peer_addess,
584
                            Address::InstanceConstSharedPtr local_address,
585
0
                            UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time) {
586
0
  RELEASE_ASSERT(
587
0
      peer_addess != nullptr,
588
0
      fmt::format("Unable to get remote address on the socket bount to local address: {} ",
589
0
                  local_address->asString()));
590
591
  // Unix domain sockets are not supported
592
0
  RELEASE_ASSERT(peer_addess->type() == Address::Type::Ip,
593
0
                 fmt::format("Unsupported remote address: {} local address: {}, receive size: "
594
0
                             "{}",
595
0
                             peer_addess->asString(), local_address->asString(), bytes_read));
596
0
  udp_packet_processor.processPacket(std::move(local_address), std::move(peer_addess),
597
0
                                     std::move(buffer), receive_time);
598
0
}
599
600
Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
601
                                                const Address::Instance& local_address,
602
                                                UdpPacketProcessor& udp_packet_processor,
603
                                                MonotonicTime receive_time, bool use_gro,
604
0
                                                uint32_t* packets_dropped) {
605
606
0
  if (use_gro) {
607
0
    Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
608
0
    IoHandle::RecvMsgOutput output(1, packets_dropped);
609
610
    // TODO(yugant): Avoid allocating 24k for each read by getting memory from UdpPacketProcessor
611
0
    const uint64_t max_rx_datagram_size_with_gro =
612
0
        NUM_DATAGRAMS_PER_RECEIVE * udp_packet_processor.maxDatagramSize();
613
0
    ENVOY_LOG_MISC(trace, "starting gro recvmsg with max={}", max_rx_datagram_size_with_gro);
614
615
0
    Api::IoCallUint64Result result =
616
0
        receiveMessage(max_rx_datagram_size_with_gro, buffer, output, handle, local_address);
617
618
0
    if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
619
0
      return result;
620
0
    }
621
622
0
    const uint64_t gso_size = output.msg_[0].gso_size_;
623
0
    ENVOY_LOG_MISC(trace, "gro recvmsg bytes {} with gso_size as {}", result.return_value_,
624
0
                   gso_size);
625
626
    // Skip gso segmentation and proceed as a single payload.
627
0
    if (gso_size == 0u) {
628
0
      passPayloadToProcessor(
629
0
          result.return_value_, std::move(buffer), std::move(output.msg_[0].peer_address_),
630
0
          std::move(output.msg_[0].local_address_), udp_packet_processor, receive_time);
631
0
      return result;
632
0
    }
633
634
    // Segment the buffer read by the recvmsg syscall into gso_sized sub buffers.
635
    // TODO(mattklein123): The following code should be optimized to avoid buffer copies, either by
636
    // switching to slices or by using a CoW buffer type.
637
0
    while (buffer->length() > 0) {
638
0
      const uint64_t bytes_to_copy = std::min(buffer->length(), gso_size);
639
0
      Buffer::InstancePtr sub_buffer = std::make_unique<Buffer::OwnedImpl>();
640
0
      sub_buffer->move(*buffer, bytes_to_copy);
641
0
      passPayloadToProcessor(bytes_to_copy, std::move(sub_buffer), output.msg_[0].peer_address_,
642
0
                             output.msg_[0].local_address_, udp_packet_processor, receive_time);
643
0
    }
644
645
0
    return result;
646
0
  }
647
648
0
  if (handle.supportsMmsg()) {
649
0
    const auto max_rx_datagram_size = udp_packet_processor.maxDatagramSize();
650
651
    // Buffer::ReservationSingleSlice is always passed by value, and can only be constructed
652
    // by Buffer::Instance::reserve(), so this is needed to keep a fixed array
653
    // in which all elements are legally constructed.
654
0
    struct BufferAndReservation {
655
0
      BufferAndReservation(uint64_t max_rx_datagram_size)
656
0
          : buffer_(std::make_unique<Buffer::OwnedImpl>()),
657
0
            reservation_(buffer_->reserveSingleSlice(max_rx_datagram_size, true)) {}
658
659
0
      Buffer::InstancePtr buffer_;
660
0
      Buffer::ReservationSingleSlice reservation_;
661
0
    };
662
0
    constexpr uint32_t num_slices_per_packet = 1u;
663
0
    absl::InlinedVector<BufferAndReservation, NUM_DATAGRAMS_PER_RECEIVE> buffers;
664
0
    RawSliceArrays slices(NUM_DATAGRAMS_PER_RECEIVE,
665
0
                          absl::FixedArray<Buffer::RawSlice>(num_slices_per_packet));
666
0
    for (uint32_t i = 0; i < NUM_DATAGRAMS_PER_RECEIVE; i++) {
667
0
      buffers.push_back(max_rx_datagram_size);
668
0
      slices[i][0] = buffers[i].reservation_.slice();
669
0
    }
670
671
0
    IoHandle::RecvMsgOutput output(NUM_DATAGRAMS_PER_RECEIVE, packets_dropped);
672
0
    ENVOY_LOG_MISC(trace, "starting recvmmsg with packets={} max={}", NUM_DATAGRAMS_PER_RECEIVE,
673
0
                   max_rx_datagram_size);
674
0
    Api::IoCallUint64Result result = handle.recvmmsg(slices, local_address.ip()->port(), output);
675
0
    if (!result.ok()) {
676
0
      return result;
677
0
    }
678
679
0
    uint64_t packets_read = result.return_value_;
680
0
    ENVOY_LOG_MISC(trace, "recvmmsg read {} packets", packets_read);
681
0
    for (uint64_t i = 0; i < packets_read; ++i) {
682
0
      if (output.msg_[i].truncated_and_dropped_) {
683
0
        continue;
684
0
      }
685
686
0
      Buffer::RawSlice* slice = slices[i].data();
687
0
      const uint64_t msg_len = output.msg_[i].msg_len_;
688
0
      ASSERT(msg_len <= slice->len_);
689
0
      ENVOY_LOG_MISC(debug, "Receive a packet with {} bytes from {}", msg_len,
690
0
                     output.msg_[i].peer_address_->asString());
691
692
0
      buffers[i].reservation_.commit(std::min(max_rx_datagram_size, msg_len));
693
694
0
      passPayloadToProcessor(msg_len, std::move(buffers[i].buffer_), output.msg_[i].peer_address_,
695
0
                             output.msg_[i].local_address_, udp_packet_processor, receive_time);
696
0
    }
697
0
    return result;
698
0
  }
699
700
0
  Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
701
0
  IoHandle::RecvMsgOutput output(1, packets_dropped);
702
703
0
  ENVOY_LOG_MISC(trace, "starting recvmsg with max={}", udp_packet_processor.maxDatagramSize());
704
0
  Api::IoCallUint64Result result =
705
0
      receiveMessage(udp_packet_processor.maxDatagramSize(), buffer, output, handle, local_address);
706
707
0
  if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
708
0
    return result;
709
0
  }
710
711
0
  ENVOY_LOG_MISC(trace, "recvmsg bytes {}", result.return_value_);
712
713
0
  passPayloadToProcessor(
714
0
      result.return_value_, std::move(buffer), std::move(output.msg_[0].peer_address_),
715
0
      std::move(output.msg_[0].local_address_), udp_packet_processor, receive_time);
716
0
  return result;
717
0
}
718
719
Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
720
                                               const Address::Instance& local_address,
721
                                               UdpPacketProcessor& udp_packet_processor,
722
                                               TimeSource& time_source, bool prefer_gro,
723
0
                                               uint32_t& packets_dropped) {
724
  // Read at least one time, and attempt to read numPacketsExpectedPerEventLoop() packets unless
725
  // this goes over MAX_NUM_PACKETS_PER_EVENT_LOOP.
726
0
  size_t num_packets_to_read = std::min<size_t>(
727
0
      MAX_NUM_PACKETS_PER_EVENT_LOOP, udp_packet_processor.numPacketsExpectedPerEventLoop());
728
0
  const bool use_gro = prefer_gro && handle.supportsUdpGro();
729
0
  size_t num_reads =
730
0
      use_gro ? (num_packets_to_read / NUM_DATAGRAMS_PER_RECEIVE)
731
0
              : (handle.supportsMmsg() ? (num_packets_to_read / NUM_DATAGRAMS_PER_RECEIVE)
732
0
                                       : num_packets_to_read);
733
  // Make sure to read at least once.
734
0
  num_reads = std::max<size_t>(1, num_reads);
735
0
  do {
736
0
    const uint32_t old_packets_dropped = packets_dropped;
737
0
    const MonotonicTime receive_time = time_source.monotonicTime();
738
0
    Api::IoCallUint64Result result = Utility::readFromSocket(
739
0
        handle, local_address, udp_packet_processor, receive_time, use_gro, &packets_dropped);
740
741
0
    if (!result.ok()) {
742
      // No more to read or encountered a system error.
743
0
      return std::move(result.err_);
744
0
    }
745
746
0
    if (packets_dropped != old_packets_dropped) {
747
      // The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller
748
      // value. So as long as this count differs from previously recorded value,
749
      // more packets are dropped by kernel.
750
0
      const uint32_t delta =
751
0
          (packets_dropped > old_packets_dropped)
752
0
              ? (packets_dropped - old_packets_dropped)
753
0
              : (packets_dropped + (std::numeric_limits<uint32_t>::max() - old_packets_dropped) +
754
0
                 1);
755
0
      ENVOY_LOG_EVERY_POW_2_MISC(
756
0
          warn,
757
0
          "Kernel dropped {} datagram(s). Consider increasing receive buffer size and/or "
758
0
          "max datagram size.",
759
0
          delta);
760
0
      udp_packet_processor.onDatagramsDropped(delta);
761
0
    }
762
0
    --num_reads;
763
0
    if (num_reads == 0) {
764
0
      return std::move(result.err_);
765
0
    }
766
0
  } while (true);
767
0
}
768
769
ResolvedUdpSocketConfig::ResolvedUdpSocketConfig(
770
    const envoy::config::core::v3::UdpSocketConfig& config, bool prefer_gro_default)
771
    : max_rx_datagram_size_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_rx_datagram_size,
772
                                                            DEFAULT_UDP_MAX_DATAGRAM_SIZE)),
773
0
      prefer_gro_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, prefer_gro, prefer_gro_default)) {
774
0
  if (prefer_gro_ && !Api::OsSysCallsSingleton::get().supportsUdpGro()) {
775
0
    ENVOY_LOG_MISC(
776
0
        warn, "GRO requested but not supported by the OS. Check OS config or disable prefer_gro.");
777
0
  }
778
0
}
779
780
} // namespace Network
781
} // namespace Envoy