1
#include "source/common/network/utility.h"
2

            
3
#include <cstddef>
4
#include <cstdint>
5
#include <list>
6
#include <memory>
7
#include <sstream>
8
#include <string>
9
#include <vector>
10

            
11
#include "envoy/buffer/buffer.h"
12
#include "envoy/common/exception.h"
13
#include "envoy/common/platform.h"
14
#include "envoy/config/core/v3/address.pb.h"
15
#include "envoy/network/address.h"
16
#include "envoy/network/connection.h"
17

            
18
#include "source/common/api/os_sys_calls_impl.h"
19
#include "source/common/buffer/buffer_impl.h"
20
#include "source/common/common/assert.h"
21
#include "source/common/common/fmt.h"
22
#include "source/common/common/utility.h"
23
#include "source/common/network/address_impl.h"
24
#include "source/common/network/io_socket_error_impl.h"
25
#include "source/common/network/ip_address_parsing.h"
26
#include "source/common/network/socket_option_impl.h"
27
#include "source/common/protobuf/protobuf.h"
28
#include "source/common/protobuf/utility.h"
29
#include "source/common/runtime/runtime_features.h"
30

            
31
#include "absl/container/fixed_array.h"
32
#include "absl/strings/match.h"
33
#include "absl/strings/string_view.h"
34

            
35
namespace Envoy {
36
namespace Network {
37
namespace {
38

            
39
348135
Address::InstanceConstSharedPtr instanceOrNull(StatusOr<Address::InstanceConstSharedPtr> address) {
40
348135
  if (address.ok()) {
41
348131
    return *address;
42
348131
  }
43
4
  return nullptr;
44
348135
}
45

            
46
} // namespace
47

            
48
9
std::string Utility::urlFromDatagramAddress(const Address::Instance& addr) {
49
9
  if (addr.ip() != nullptr) {
50
7
    return absl::StrCat(UDP_SCHEME, addr.asStringView());
51
7
  } else {
52
2
    return absl::StrCat(UNIX_SCHEME, addr.asStringView());
53
2
  }
54
9
}
55

            
56
174696
absl::StatusOr<Address::InstanceConstSharedPtr> Utility::resolveUrl(const std::string& url) {
57
174696
  Address::InstanceConstSharedPtr address{};
58
174696
  if (urlIsTcpScheme(url)) {
59
172628
    address = parseInternetAddressAndPortNoThrow(url.substr(TCP_SCHEME.size()));
60
172654
  } else if (urlIsUdpScheme(url)) {
61
2033
    address = parseInternetAddressAndPortNoThrow(url.substr(UDP_SCHEME.size()));
62
2057
  } else if (urlIsUnixScheme(url)) {
63
31
    return Address::PipeInstance::create(url.substr(UNIX_SCHEME.size()));
64
33
  } else {
65
4
    return absl::InvalidArgumentError(absl::StrCat("unknown protocol scheme: ", url));
66
4
  }
67
174661
  if (!address) {
68
20
    return absl::InvalidArgumentError(absl::StrCat("malformed IP address: ", url));
69
20
  }
70
174641
  return address;
71
174661
}
72

            
73
16
StatusOr<Socket::Type> Utility::socketTypeFromUrl(const std::string& url) {
74
16
  if (urlIsTcpScheme(url)) {
75
4
    return Socket::Type::Stream;
76
12
  } else if (urlIsUdpScheme(url)) {
77
6
    return Socket::Type::Datagram;
78
8
  } else if (urlIsUnixScheme(url)) {
79
4
    return Socket::Type::Stream;
80
4
  } else {
81
2
    return absl::InvalidArgumentError(absl::StrCat("unknown protocol scheme: ", url));
82
2
  }
83
16
}
84

            
85
174712
bool Utility::urlIsTcpScheme(absl::string_view url) { return absl::StartsWith(url, TCP_SCHEME); }
86

            
87
2080
bool Utility::urlIsUdpScheme(absl::string_view url) { return absl::StartsWith(url, UDP_SCHEME); }
88

            
89
41
bool Utility::urlIsUnixScheme(absl::string_view url) { return absl::StartsWith(url, UNIX_SCHEME); }
90

            
91
namespace {
92

            
93
Api::IoCallUint64Result receiveMessage(uint64_t max_rx_datagram_size, Buffer::InstancePtr& buffer,
94
                                       IoHandle::RecvMsgOutput& output, IoHandle& handle,
95
                                       const Address::Instance& local_address,
96
436231
                                       const IoHandle::UdpSaveCmsgConfig& save_cmsg_config) {
97

            
98
436231
  auto reservation = buffer->reserveSingleSlice(max_rx_datagram_size);
99
436231
  Buffer::RawSlice slice = reservation.slice();
100
436231
  Api::IoCallUint64Result result =
101
436231
      handle.recvmsg(&slice, 1, local_address.ip()->port(), save_cmsg_config, output);
102

            
103
436231
  if (result.ok()) {
104
394378
    reservation.commit(std::min(max_rx_datagram_size, result.return_value_));
105
394378
  }
106

            
107
436231
  return result;
108
436231
}
109

            
110
} // namespace
111

            
112
Address::InstanceConstSharedPtr
113
Utility::parseInternetAddressNoThrow(const std::string& ip_address, uint16_t port, bool v6only,
114
174922
                                     absl::optional<std::string> network_namespace) {
115
174922
  StatusOr<sockaddr_in> sa4 = IpAddressParsing::parseIPv4(ip_address, port);
116
174922
  if (sa4.ok()) {
117
126888
    return instanceOrNull(Address::InstanceFactory::createInstancePtr<Address::Ipv4Instance>(
118
126888
        &sa4.value(), nullptr, network_namespace));
119
126888
  }
120

            
121
48034
  StatusOr<sockaddr_in6> sa6 = IpAddressParsing::parseIPv6(ip_address, port);
122
48034
  if (sa6.ok()) {
123
45321
    return instanceOrNull(Address::InstanceFactory::createInstancePtr<Address::Ipv6Instance>(
124
45321
        *sa6, v6only, nullptr, network_namespace));
125
45321
  }
126
2713
  return nullptr;
127
48034
}
128

            
129
Address::InstanceConstSharedPtr
130
Utility::parseInternetAddressAndPortNoThrow(const std::string& ip_address, bool v6only,
131
175990
                                            absl::optional<std::string> network_namespace) {
132
175990
  if (ip_address.empty()) {
133
3
    return nullptr;
134
3
  }
135
175987
  if (ip_address[0] == '[') {
136
    // Appears to be an IPv6 address. Find the "]:" that separates the address from the port.
137
458
    const auto pos = ip_address.rfind("]:");
138
458
    if (pos == std::string::npos) {
139
3
      return nullptr;
140
3
    }
141
455
    const auto ip_str = ip_address.substr(1, pos - 1);
142
455
    const auto port_str = ip_address.substr(pos + 2);
143
455
    uint64_t port64 = 0;
144
455
    if (port_str.empty() || !absl::SimpleAtoi(port_str, &port64) || port64 > 65535) {
145
9
      return nullptr;
146
9
    }
147
446
    StatusOr<sockaddr_in6> sa6 = IpAddressParsing::parseIPv6(ip_str, static_cast<uint16_t>(port64));
148
446
    if (sa6.ok()) {
149
440
      return instanceOrNull(Address::InstanceFactory::createInstancePtr<Address::Ipv6Instance>(
150
440
          *sa6, v6only, nullptr, network_namespace));
151
440
    }
152
6
    return nullptr;
153
446
  }
154
  // Treat it as an IPv4 address followed by a port.
155
175529
  const auto pos = ip_address.rfind(':');
156
175529
  if (pos == std::string::npos) {
157
13
    return nullptr;
158
13
  }
159
175516
  const auto ip_str = ip_address.substr(0, pos);
160
175516
  const auto port_str = ip_address.substr(pos + 1);
161
175516
  uint64_t port64 = 0;
162
175516
  if (port_str.empty() || !absl::SimpleAtoi(port_str, &port64) || port64 > 65535) {
163
14
    return nullptr;
164
14
  }
165
175502
  StatusOr<sockaddr_in> sa4 = IpAddressParsing::parseIPv4(ip_str, static_cast<uint16_t>(port64));
166
175502
  if (sa4.ok()) {
167
175486
    return instanceOrNull(Address::InstanceFactory::createInstancePtr<Address::Ipv4Instance>(
168
175486
        &sa4.value(), nullptr, network_namespace));
169
175486
  }
170
16
  return nullptr;
171
175502
}
172

            
173
50
Address::InstanceConstSharedPtr Utility::copyInternetAddressAndPort(const Address::Ip& ip) {
174
50
  if (ip.version() == Address::IpVersion::v4) {
175
48
    return std::make_shared<Address::Ipv4Instance>(ip.addressAsString(), ip.port());
176
48
  }
177
2
  return std::make_shared<Address::Ipv6Instance>(ip.addressAsString(), ip.port());
178
50
}
179

            
180
// TODO(hennna): Currently getLocalAddress does not support choosing between
181
// multiple interfaces and addresses not returned by getifaddrs. In addition,
182
// the default is to return a loopback address of type version. This function may
183
// need to be updated in the future. Discussion can be found at Github issue #939.
184
10664
Address::InstanceConstSharedPtr Utility::getLocalAddress(const Address::IpVersion version) {
185
10664
  Address::InstanceConstSharedPtr ret;
186
10664
  if (Api::OsSysCallsSingleton::get().supportsGetifaddrs()) {
187
10664
    Api::InterfaceAddressVector interface_addresses{};
188

            
189
10664
    const Api::SysCallIntResult rc =
190
10664
        Api::OsSysCallsSingleton::get().getifaddrs(interface_addresses);
191
10664
    if (rc.return_value_ != 0) {
192
1
      ENVOY_LOG_MISC(debug, fmt::format("getifaddrs error: {}", rc.errno_));
193
10663
    } else {
194
      // man getifaddrs(3)
195
21500
      for (const auto& interface_address : interface_addresses) {
196
21500
        if (!isLoopbackAddress(*interface_address.interface_addr_) &&
197
21500
            interface_address.interface_addr_->ip()->version() == version) {
198
10663
          ret = interface_address.interface_addr_;
199
10663
          if (ret->ip()->version() == Address::IpVersion::v6) {
200
87
            ret = ret->ip()->ipv6()->addressWithoutScopeId();
201
87
          }
202
10663
          break;
203
10663
        }
204
21500
      }
205
10663
    }
206
10664
  }
207

            
208
  // If the local address is not found above, then return the loopback address by default.
209
10664
  if (ret == nullptr) {
210
1
    if (version == Address::IpVersion::v4) {
211
1
      ret = std::make_shared<Address::Ipv4Instance>("127.0.0.1");
212
1
    } else if (version == Address::IpVersion::v6) {
213
      ret = std::make_shared<Address::Ipv6Instance>("::1");
214
    }
215
1
  }
216
10664
  return ret;
217
10664
}
218

            
219
31
bool Utility::isSameIpOrLoopback(const ConnectionInfoProvider& connection_info_provider) {
220
  // These are local:
221
  // - Pipes
222
  // - Sockets to a loopback address
223
  // - Sockets where the local and remote address (ignoring port) are the same
224
31
  const auto& remote_address = connection_info_provider.remoteAddress();
225
31
  if (remote_address->type() == Address::Type::Pipe || isLoopbackAddress(*remote_address)) {
226
17
    return true;
227
17
  }
228
14
  const auto local_ip = connection_info_provider.localAddress()->ip();
229
14
  const auto remote_ip = remote_address->ip();
230
14
  if (remote_ip != nullptr && local_ip != nullptr &&
231
14
      remote_ip->addressAsString() == local_ip->addressAsString()) {
232
4
    return true;
233
4
  }
234
10
  return false;
235
14
}
236

            
237
66
bool Utility::isInternalAddress(const Address::Instance& address) {
238
66
  if (address.type() != Address::Type::Ip) {
239
1
    return false;
240
1
  }
241

            
242
65
  if (address.ip()->version() == Address::IpVersion::v4) {
243
    // Handle the RFC1918 space for IPV4. Also count loopback as internal.
244
56
    const uint32_t address4 = address.ip()->ipv4()->address();
245
56
    const uint8_t* address4_bytes = reinterpret_cast<const uint8_t*>(&address4);
246
56
    if ((address4_bytes[0] == 10) || (address4_bytes[0] == 192 && address4_bytes[1] == 168) ||
247
56
        (address4_bytes[0] == 172 && address4_bytes[1] >= 16 && address4_bytes[1] <= 31) ||
248
56
        address4 == htonl(INADDR_LOOPBACK)) {
249
34
      return true;
250
34
    } else {
251
22
      return false;
252
22
    }
253
56
  }
254

            
255
  // Local IPv6 address prefix defined in RFC4193. Local addresses have prefix FC00::/7.
256
  // Currently, the FD00::/8 prefix is locally assigned and FC00::/8 may be defined in the
257
  // future.
258
9
  static_assert(sizeof(absl::uint128) == sizeof(in6addr_loopback),
259
9
                "sizeof(absl::uint128) != sizeof(in6addr_loopback)");
260
9
  const absl::uint128 address6 = address.ip()->ipv6()->address();
261
9
  const uint8_t* address6_bytes = reinterpret_cast<const uint8_t*>(&address6);
262
9
  if (address6_bytes[0] == 0xfd ||
263
9
      memcmp(&address6, &in6addr_loopback, sizeof(in6addr_loopback)) == 0) {
264
5
    return true;
265
5
  }
266

            
267
4
  return false;
268
9
}
269

            
270
22348
bool Utility::isLoopbackAddress(const Address::Instance& address) {
271
22348
  if (address.type() != Address::Type::Ip) {
272
1
    return false;
273
1
  }
274

            
275
22347
  if (address.ip()->version() == Address::IpVersion::v4) {
276
    // Compare to the canonical v4 loopback address: 127.0.0.1.
277
22154
    return address.ip()->ipv4()->address() == htonl(INADDR_LOOPBACK);
278
22155
  } else if (address.ip()->version() == Address::IpVersion::v6) {
279
193
    static_assert(sizeof(absl::uint128) == sizeof(in6addr_loopback),
280
193
                  "sizeof(absl::uint128) != sizeof(in6addr_loopback)");
281
193
    absl::uint128 addr = address.ip()->ipv6()->address();
282
193
    return 0 == memcmp(&addr, &in6addr_loopback, sizeof(in6addr_loopback));
283
193
  }
284
  IS_ENVOY_BUG("unexpected address type");
285
  return false;
286
22347
}
287

            
288
4516
Address::InstanceConstSharedPtr Utility::getCanonicalIpv4LoopbackAddress() {
289
4516
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
290
4516
                         new Address::Ipv4Instance("127.0.0.1", 0, nullptr));
291
4516
}
292

            
293
67
Address::InstanceConstSharedPtr Utility::getIpv6LoopbackAddress() {
294
67
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
295
67
                         new Address::Ipv6Instance("::1", 0, nullptr));
296
67
}
297

            
298
28
Address::InstanceConstSharedPtr Utility::getIpv4AnyAddress() {
299
28
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
300
28
                         new Address::Ipv4Instance(static_cast<uint32_t>(0)));
301
28
}
302

            
303
5
Address::InstanceConstSharedPtr Utility::getIpv6AnyAddress() {
304
5
  CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
305
5
                         new Address::Ipv6Instance(static_cast<uint32_t>(0)));
306
5
}
307

            
308
34804
const std::string& Utility::getIpv4CidrCatchAllAddress() {
309
34804
  CONSTRUCT_ON_FIRST_USE(std::string, "0.0.0.0/0");
310
34804
}
311

            
312
34798
const std::string& Utility::getIpv6CidrCatchAllAddress() {
313
34798
  CONSTRUCT_ON_FIRST_USE(std::string, "::/0");
314
34798
}
315

            
316
Address::InstanceConstSharedPtr Utility::getAddressWithPort(const Address::Instance& address,
317
1809
                                                            uint32_t port) {
318
1809
  switch (address.ip()->version()) {
319
1483
  case Address::IpVersion::v4: {
320
    // Copy the sockaddr and update the port to preserve all address properties.
321
1483
    sockaddr_in addr = *reinterpret_cast<const sockaddr_in*>(address.sockAddr());
322
1483
    addr.sin_port = htons(static_cast<uint16_t>(port));
323
1483
    return std::make_shared<Address::Ipv4Instance>(&addr);
324
  }
325
326
  case Address::IpVersion::v6: {
326
    // Copy the sockaddr and update the port to preserve all address properties including scope ID.
327
326
    sockaddr_in6 addr = *reinterpret_cast<const sockaddr_in6*>(address.sockAddr());
328
326
    addr.sin6_port = htons(static_cast<uint16_t>(port));
329
326
    return std::make_shared<Address::Ipv6Instance>(addr, address.ip()->ipv6()->v6only());
330
  }
331
1809
  }
332
  PANIC("not handled");
333
}
334

            
335
31
Address::InstanceConstSharedPtr Utility::getOriginalDst(Socket& sock) {
336
31
#ifdef SOL_IP
337

            
338
31
  if (sock.addressType() != Address::Type::Ip) {
339
1
    return nullptr;
340
1
  }
341

            
342
30
  auto ipVersion = sock.ipVersion();
343
30
  if (!ipVersion.has_value()) {
344
1
    return nullptr;
345
1
  }
346

            
347
29
  SocketOptionName opt_dst;
348
29
  SocketOptionName opt_tp;
349
29
  if (*ipVersion == Address::IpVersion::v4) {
350
26
    opt_dst = ENVOY_SOCKET_SO_ORIGINAL_DST;
351
26
    opt_tp = ENVOY_SOCKET_IP_TRANSPARENT;
352
26
  } else {
353
3
    opt_dst = ENVOY_SOCKET_IP6T_SO_ORIGINAL_DST;
354
3
    opt_tp = ENVOY_SOCKET_IPV6_TRANSPARENT;
355
3
  }
356

            
357
29
  sockaddr_storage orig_addr;
358
29
  memset(&orig_addr, 0, sizeof(orig_addr));
359
29
  socklen_t addr_len = sizeof(sockaddr_storage);
360
29
  int status =
361
29
      sock.getSocketOption(opt_dst.level(), opt_dst.option(), &orig_addr, &addr_len).return_value_;
362

            
363
29
  if (status != 0) {
364
6
    if (Api::OsSysCallsSingleton::get().supportsIpTransparent(*ipVersion)) {
365
4
      socklen_t flag_len = sizeof(int);
366
4
      int is_tp;
367
4
      status =
368
4
          sock.getSocketOption(opt_tp.level(), opt_tp.option(), &is_tp, &flag_len).return_value_;
369
4
      if (status == 0 && is_tp) {
370
2
        auto address_or_error = sock.ioHandle().localAddress();
371
2
        if (address_or_error.status().ok()) {
372
2
          return *address_or_error;
373
2
        }
374
2
      }
375
4
    }
376
4
    return nullptr;
377
6
  }
378

            
379
23
  return Address::addressFromSockAddrOrDie(orig_addr, 0, -1, true /* default for v6 constructor */);
380

            
381
#else
382
  // TODO(zuercher): determine if connection redirection is possible under macOS (c.f. pfctl and
383
  // divert), and whether it's possible to find the learn destination address.
384
  UNREFERENCED_PARAMETER(sock);
385
  return nullptr;
386
#endif
387
29
}
388

            
389
35400
absl::uint128 Utility::Ip6ntohl(const absl::uint128& address) {
390
35400
#ifdef ABSL_IS_LITTLE_ENDIAN
391
35400
  return flipOrder(address);
392
#else
393
  return address;
394
#endif
395
35400
}
396

            
397
78
absl::uint128 Utility::Ip6htonl(const absl::uint128& address) {
398
78
#ifdef ABSL_IS_LITTLE_ENDIAN
399
78
  return flipOrder(address);
400
#else
401
  return address;
402
#endif
403
78
}
404

            
405
35478
absl::uint128 Utility::flipOrder(const absl::uint128& input) {
406
35478
  absl::uint128 result{0};
407
35478
  absl::uint128 data = input;
408
603126
  for (int i = 0; i < 16; i++) {
409
567648
    result <<= 8;
410
567648
    result |= (data & 0x000000000000000000000000000000FF);
411
567648
    data >>= 8;
412
567648
  }
413
35478
  return result;
414
35478
}
415

            
416
Address::InstanceConstSharedPtr
417
35
Utility::protobufAddressToAddressNoThrow(const envoy::config::core::v3::Address& proto_address) {
418
35
  switch (proto_address.address_case()) {
419
32
  case envoy::config::core::v3::Address::AddressCase::kSocketAddress:
420
32
    return Utility::parseInternetAddressNoThrow(
421
32
        proto_address.socket_address().address(), proto_address.socket_address().port_value(),
422
32
        !proto_address.socket_address().ipv4_compat(),
423
32
        proto_address.socket_address().network_namespace_filepath());
424
2
  case envoy::config::core::v3::Address::AddressCase::kPipe: {
425
2
    auto ret_or_error =
426
2
        Address::PipeInstance::create(proto_address.pipe().path(), proto_address.pipe().mode());
427
2
    if (ret_or_error.status().ok()) {
428
2
      return std::move(*ret_or_error);
429
2
    }
430
    return nullptr;
431
2
  }
432
1
  case envoy::config::core::v3::Address::AddressCase::kEnvoyInternalAddress:
433
1
    return std::make_shared<Address::EnvoyInternalInstance>(
434
1
        proto_address.envoy_internal_address().server_listener_name(),
435
1
        proto_address.envoy_internal_address().endpoint_id());
436
  case envoy::config::core::v3::Address::AddressCase::ADDRESS_NOT_SET:
437
    PANIC_DUE_TO_PROTO_UNSET;
438
35
  }
439
  PANIC_DUE_TO_CORRUPT_ENUM;
440
}
441

            
442
void Utility::addressToProtobufAddress(const Address::Instance& address,
443
1983
                                       envoy::config::core::v3::Address& proto_address) {
444
1983
  if (address.type() == Address::Type::Pipe) {
445
29
    proto_address.mutable_pipe()->set_path(address.asString());
446
1980
  } else if (address.type() == Address::Type::Ip) {
447
1953
    auto* socket_address = proto_address.mutable_socket_address();
448
1953
    socket_address->set_address(address.ip()->addressAsString());
449
1953
    socket_address->set_port_value(address.ip()->port());
450
1953
    if (address.networkNamespace().has_value()) {
451
1
      socket_address->set_network_namespace_filepath(address.networkNamespace().value());
452
1
    }
453
1953
  } else {
454
1
    ASSERT(address.type() == Address::Type::EnvoyInternal);
455
1
    auto* internal_address = proto_address.mutable_envoy_internal_address();
456
1
    internal_address->set_server_listener_name(address.envoyInternalAddress()->addressId());
457
1
    internal_address->set_endpoint_id(address.envoyInternalAddress()->endpointId());
458
1
  }
459
1983
}
460

            
461
Socket::Type
462
12419
Utility::protobufAddressSocketType(const envoy::config::core::v3::Address& proto_address) {
463
12419
  switch (proto_address.address_case()) {
464
12388
  case envoy::config::core::v3::Address::AddressCase::kSocketAddress: {
465
12388
    const auto protocol = proto_address.socket_address().protocol();
466
12388
    switch (protocol) {
467
      PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
468
11081
    case envoy::config::core::v3::SocketAddress::TCP:
469
11081
      return Socket::Type::Stream;
470
1307
    case envoy::config::core::v3::SocketAddress::UDP:
471
1307
      return Socket::Type::Datagram;
472
12388
    }
473
12388
  }
474
    PANIC_DUE_TO_CORRUPT_ENUM;
475
24
  case envoy::config::core::v3::Address::AddressCase::kPipe:
476
24
    return Socket::Type::Stream;
477
7
  case envoy::config::core::v3::Address::AddressCase::kEnvoyInternalAddress:
478
    // Currently internal address supports stream operation only.
479
7
    return Socket::Type::Stream;
480
  case envoy::config::core::v3::Address::AddressCase::ADDRESS_NOT_SET:
481
    PANIC_DUE_TO_PROTO_UNSET;
482
12419
  }
483
  PANIC_DUE_TO_CORRUPT_ENUM;
484
}
485

            
486
Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, const Buffer::Instance& buffer,
487
                                               const Address::Ip* local_ip,
488
597356
                                               const Address::Instance& peer_address) {
489
597356
  Buffer::RawSliceVector slices = buffer.getRawSlices();
490
597356
  return writeToSocket(handle, slices.data(), slices.size(), local_ip, peer_address);
491
597356
}
492

            
493
Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, Buffer::RawSlice* slices,
494
                                               uint64_t num_slices, const Address::Ip* local_ip,
495
597448
                                               const Address::Instance& peer_address) {
496
597448
  Api::IoCallUint64Result send_result(
497
597448
      /*rc=*/0, /*err=*/Api::IoError::none());
498

            
499
597448
  const bool is_connected = handle.wasConnected();
500
597448
  do {
501
597448
    if (is_connected) {
502
      // The socket is already connected, so the local and peer addresses should not be specified.
503
      // Instead, a writev is called.
504
240839
      send_result = handle.writev(slices, num_slices);
505
473603
    } else {
506
      // For non-connected sockets(), calling sendmsg with the peer address specified ensures the
507
      // connection happens first.
508
356609
      send_result = handle.sendmsg(slices, num_slices, 0, local_ip, peer_address);
509
356609
    }
510
597448
  } while (!send_result.ok() &&
511
           // Send again if interrupted.
512
597448
           send_result.err_->getErrorCode() == Api::IoError::IoErrorCode::Interrupt);
513

            
514
597480
  if (send_result.ok()) {
515
597453
    ENVOY_LOG_MISC(trace, "{} bytes {}", is_connected ? "writev" : "sendmsg",
516
597453
                   send_result.return_value_);
517
204243
  } else {
518
27
    ENVOY_LOG_MISC(debug, "{} failed with error code {}: {}", is_connected ? "writev" : "sendmsg",
519
27
                   static_cast<int>(send_result.err_->getErrorCode()),
520
27
                   send_result.err_->getErrorDetails());
521
27
  }
522
597448
  return send_result;
523
597448
}
524

            
525
namespace {
526

            
527
void passPayloadToProcessor(uint64_t bytes_read, Buffer::InstancePtr buffer,
528
                            Address::InstanceConstSharedPtr peer_addess,
529
                            Address::InstanceConstSharedPtr local_address,
530
                            UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time,
531
703851
                            uint8_t tos, Buffer::OwnedImpl saved_cmsg) {
532
703851
  ENVOY_BUG(peer_addess != nullptr,
533
703851
            fmt::format("Unable to get remote address on the socket bound to local address: {}.",
534
703851
                        (local_address == nullptr ? "unknown" : local_address->asString())));
535

            
536
  // Unix domain sockets are not supported
537
703851
  ENVOY_BUG(peer_addess != nullptr && peer_addess->type() == Address::Type::Ip,
538
703851
            fmt::format("Unsupported remote address: {} local address: {}, receive size: "
539
703851
                        "{}",
540
703851
                        peer_addess->asString(),
541
703851
                        (local_address == nullptr ? "unknown" : local_address->asString()),
542
703851
                        bytes_read));
543
703851
  udp_packet_processor.processPacket(std::move(local_address), std::move(peer_addess),
544
703851
                                     std::move(buffer), receive_time, tos, std::move(saved_cmsg));
545
703851
}
546

            
547
Api::IoCallUint64Result readFromSocketRecvGro(IoHandle& handle,
548
                                              const Address::Instance& local_address,
549
                                              UdpPacketProcessor& udp_packet_processor,
550
                                              TimeSource& time_source, uint32_t* packets_dropped,
551
436198
                                              uint32_t* num_packets_read) {
552
436198
  ASSERT(Api::OsSysCallsSingleton::get().supportsUdpGro(),
553
436198
         "cannot use GRO when the platform doesn't support it.");
554
436198
  if (num_packets_read != nullptr) {
555
436198
    *num_packets_read = 0;
556
436198
  }
557
436198
  Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
558
436198
  IoHandle::RecvMsgOutput output(1, packets_dropped);
559

            
560
  // TODO(yugant): Avoid allocating 64k for each read by getting memory from UdpPacketProcessor
561
436198
  const uint64_t max_rx_datagram_size_with_gro =
562
436198
      num_packets_read != nullptr
563
436198
          ? 64 * 1024
564
436198
          : NUM_DATAGRAMS_PER_RECEIVE * udp_packet_processor.maxDatagramSize();
565
436198
  ENVOY_LOG_MISC(trace, "starting gro recvmsg with max={}", max_rx_datagram_size_with_gro);
566

            
567
436198
  Api::IoCallUint64Result result =
568
436198
      receiveMessage(max_rx_datagram_size_with_gro, buffer, output, handle, local_address,
569
436198
                     udp_packet_processor.saveCmsgConfig());
570

            
571
436198
  if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
572
41835
    return result;
573
41835
  }
574

            
575
394363
  const uint64_t gso_size = output.msg_[0].gso_size_;
576
394363
  ENVOY_LOG_MISC(trace, "gro recvmsg bytes {} with gso_size as {}", result.return_value_, gso_size);
577

            
578
394363
  const MonotonicTime receive_time = time_source.monotonicTime();
579

            
580
  // Skip gso segmentation and proceed as a single payload.
581
394363
  if (gso_size == 0u) {
582
387514
    if (num_packets_read != nullptr) {
583
387514
      *num_packets_read += 1;
584
387514
    }
585
387514
    passPayloadToProcessor(
586
387514
        result.return_value_, std::move(buffer), std::move(output.msg_[0].peer_address_),
587
387514
        std::move(output.msg_[0].local_address_), udp_packet_processor, receive_time,
588
387514
        output.msg_[0].tos_, std::move(output.msg_[0].saved_cmsg_));
589
387514
    return result;
590
387514
  }
591

            
592
  // Segment the buffer read by the recvmsg syscall into gso_sized sub buffers.
593
  // TODO(mattklein123): The following code should be optimized to avoid buffer copies, either by
594
  // switching to slices or by using a CoW buffer type.
595
82452
  while (buffer->length() > 0) {
596
75603
    const uint64_t bytes_to_copy = std::min(buffer->length(), gso_size);
597
75603
    Buffer::InstancePtr sub_buffer = std::make_unique<Buffer::OwnedImpl>();
598
75603
    sub_buffer->move(*buffer, bytes_to_copy);
599
75603
    if (num_packets_read != nullptr) {
600
75603
      *num_packets_read += 1;
601
75603
    }
602
75603
    passPayloadToProcessor(bytes_to_copy, std::move(sub_buffer), output.msg_[0].peer_address_,
603
75603
                           output.msg_[0].local_address_, udp_packet_processor, receive_time,
604
75603
                           output.msg_[0].tos_, std::move(output.msg_[0].saved_cmsg_));
605
75603
  }
606

            
607
6849
  return result;
608
394363
}
609

            
610
Api::IoCallUint64Result readFromSocketRecvMmsg(IoHandle& handle,
611
                                               const Address::Instance& local_address,
612
                                               UdpPacketProcessor& udp_packet_processor,
613
                                               TimeSource& time_source, uint32_t* packets_dropped,
614
125082
                                               uint32_t* num_packets_read) {
615
125082
  ASSERT(Api::OsSysCallsSingleton::get().supportsMmsg(),
616
125082
         "cannot use recvmmsg when the platform doesn't support it.");
617
125082
  const auto max_rx_datagram_size = udp_packet_processor.maxDatagramSize();
618
125082
  if (num_packets_read != nullptr) {
619
124961
    *num_packets_read = 0;
620
124961
  }
621

            
622
  // Buffer::ReservationSingleSlice is always passed by value, and can only be constructed
623
  // by Buffer::Instance::reserve(), so this is needed to keep a fixed array
624
  // in which all elements are legally constructed.
625
125082
  struct BufferAndReservation {
626
125082
    BufferAndReservation(uint64_t max_rx_datagram_size)
627
2001311
        : buffer_(std::make_unique<Buffer::OwnedImpl>()),
628
2001311
          reservation_(buffer_->reserveSingleSlice(max_rx_datagram_size, true)) {}
629

            
630
125082
    Buffer::InstancePtr buffer_;
631
125082
    Buffer::ReservationSingleSlice reservation_;
632
125082
  };
633
125082
  constexpr uint32_t num_slices_per_packet = 1u;
634
125082
  absl::InlinedVector<BufferAndReservation, NUM_DATAGRAMS_PER_RECEIVE> buffers;
635
125082
  RawSliceArrays slices(NUM_DATAGRAMS_PER_RECEIVE,
636
125082
                        absl::FixedArray<Buffer::RawSlice>(num_slices_per_packet));
637
2126393
  for (uint32_t i = 0; i < NUM_DATAGRAMS_PER_RECEIVE; i++) {
638
2001311
    buffers.push_back(max_rx_datagram_size);
639
2001311
    slices[i][0] = buffers[i].reservation_.slice();
640
2001311
  }
641

            
642
125082
  IoHandle::RecvMsgOutput output(NUM_DATAGRAMS_PER_RECEIVE, packets_dropped);
643
125082
  ENVOY_LOG_MISC(trace, "starting recvmmsg with packets={} max={}", NUM_DATAGRAMS_PER_RECEIVE,
644
125082
                 max_rx_datagram_size);
645
125082
  Api::IoCallUint64Result result = handle.recvmmsg(slices, local_address.ip()->port(),
646
125082
                                                   udp_packet_processor.saveCmsgConfig(), output);
647
125082
  if (!result.ok()) {
648
48382
    return result;
649
48382
  }
650

            
651
76700
  uint64_t packets_read = result.return_value_;
652
76700
  ENVOY_LOG_MISC(trace, "recvmmsg read {} packets", packets_read);
653
76700
  const MonotonicTime receive_time = time_source.monotonicTime();
654
317466
  for (uint64_t i = 0; i < packets_read; ++i) {
655
240766
    if (output.msg_[i].truncated_and_dropped_) {
656
5
      continue;
657
5
    }
658

            
659
240761
    Buffer::RawSlice* slice = slices[i].data();
660
240761
    const uint64_t msg_len = output.msg_[i].msg_len_;
661
240761
    ASSERT(msg_len <= slice->len_);
662
240761
    ENVOY_LOG_MISC(debug, "Receive a packet with {} bytes from {}", msg_len,
663
240761
                   output.msg_[i].peer_address_->asString());
664

            
665
240761
    buffers[i].reservation_.commit(std::min(max_rx_datagram_size, msg_len));
666

            
667
240761
    if (num_packets_read != nullptr) {
668
240631
      *num_packets_read += 1;
669
240631
    }
670
240761
    passPayloadToProcessor(msg_len, std::move(buffers[i].buffer_), output.msg_[i].peer_address_,
671
240761
                           output.msg_[i].local_address_, udp_packet_processor, receive_time,
672
240761
                           output.msg_[i].tos_, std::move(output.msg_[i].saved_cmsg_));
673
240761
  }
674
76700
  return result;
675
125082
}
676

            
677
Api::IoCallUint64Result readFromSocketRecvMsg(IoHandle& handle,
678
                                              const Address::Instance& local_address,
679
                                              UdpPacketProcessor& udp_packet_processor,
680
                                              TimeSource& time_source, uint32_t* packets_dropped,
681
34
                                              uint32_t* num_packets_read) {
682
34
  if (num_packets_read != nullptr) {
683
34
    *num_packets_read = 0;
684
34
  }
685
34
  Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
686
34
  IoHandle::RecvMsgOutput output(1, packets_dropped);
687

            
688
34
  ENVOY_LOG_MISC(trace, "starting recvmsg with max={}", udp_packet_processor.maxDatagramSize());
689
34
  Api::IoCallUint64Result result =
690
34
      receiveMessage(udp_packet_processor.maxDatagramSize(), buffer, output, handle, local_address,
691
34
                     udp_packet_processor.saveCmsgConfig());
692

            
693
34
  if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
694
19
    return result;
695
19
  }
696

            
697
15
  ENVOY_LOG_MISC(trace, "recvmsg bytes {}", result.return_value_);
698

            
699
15
  if (num_packets_read != nullptr) {
700
15
    *num_packets_read = 1;
701
15
  }
702
15
  passPayloadToProcessor(
703
15
      result.return_value_, std::move(buffer), std::move(output.msg_[0].peer_address_),
704
15
      std::move(output.msg_[0].local_address_), udp_packet_processor, time_source.monotonicTime(),
705
15
      output.msg_[0].tos_, std::move(output.msg_[0].saved_cmsg_));
706
15
  return result;
707
34
}
708

            
709
} // namespace
710

            
711
Api::IoCallUint64Result
712
Utility::readFromSocket(IoHandle& handle, const Address::Instance& local_address,
713
                        UdpPacketProcessor& udp_packet_processor, TimeSource& time_source,
714
                        UdpRecvMsgMethod recv_msg_method, uint32_t* packets_dropped,
715
561293
                        uint32_t* num_packets_read) {
716
561293
  if (recv_msg_method == UdpRecvMsgMethod::RecvMsgWithGro) {
717
436198
    return readFromSocketRecvGro(handle, local_address, udp_packet_processor, time_source,
718
436198
                                 packets_dropped, num_packets_read);
719
444019
  } else if (recv_msg_method == UdpRecvMsgMethod::RecvMmsg) {
720
125082
    return readFromSocketRecvMmsg(handle, local_address, udp_packet_processor, time_source,
721
125082
                                  packets_dropped, num_packets_read);
722
125082
  }
723
34
  return readFromSocketRecvMsg(handle, local_address, udp_packet_processor, time_source,
724
34
                               packets_dropped, num_packets_read);
725
561293
}
726

            
727
Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
728
                                               const Address::Instance& local_address,
729
                                               UdpPacketProcessor& udp_packet_processor,
730
                                               TimeSource& time_source, bool allow_gro,
731
102846
                                               bool allow_mmsg, uint32_t& packets_dropped) {
732
102846
  UdpRecvMsgMethod recv_msg_method = UdpRecvMsgMethod::RecvMsg;
733
102846
  if (allow_gro && handle.supportsUdpGro()) {
734
46685
    recv_msg_method = UdpRecvMsgMethod::RecvMsgWithGro;
735
63790
  } else if (allow_mmsg && handle.supportsMmsg()) {
736
56143
    recv_msg_method = UdpRecvMsgMethod::RecvMmsg;
737
56143
  }
738

            
739
  // Read at least one time, and attempt to read numPacketsExpectedPerEventLoop() packets unless
740
  // this goes over MAX_NUM_PACKETS_PER_EVENT_LOOP.
741
102846
  size_t num_packets_to_read = std::min<size_t>(
742
102846
      MAX_NUM_PACKETS_PER_EVENT_LOOP, udp_packet_processor.numPacketsExpectedPerEventLoop());
743
  // Call socket read at least once and at most num_packets_read to avoid infinite loop.
744
102846
  size_t num_reads = std::max<size_t>(1, num_packets_to_read);
745

            
746
561179
  do {
747
561179
    const uint32_t old_packets_dropped = packets_dropped;
748
561179
    uint32_t num_packets_processed = 0;
749
561179
    Api::IoCallUint64Result result =
750
561179
        Utility::readFromSocket(handle, local_address, udp_packet_processor, time_source,
751
561179
                                recv_msg_method, &packets_dropped, &num_packets_processed);
752

            
753
561179
    if (!result.ok()) {
754
      // No more to read or encountered a system error.
755
90236
      return std::move(result.err_);
756
90236
    }
757

            
758
470943
    if (packets_dropped != old_packets_dropped) {
759
      // The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller
760
      // value. So as long as this count differs from previously recorded value,
761
      // more packets are dropped by kernel.
762
83
      const uint32_t delta =
763
83
          (packets_dropped > old_packets_dropped)
764
83
              ? (packets_dropped - old_packets_dropped)
765
83
              : (packets_dropped + (std::numeric_limits<uint32_t>::max() - old_packets_dropped) +
766
                 1);
767
83
      ENVOY_LOG_EVERY_POW_2_MISC(
768
83
          warn,
769
83
          "Kernel dropped {} datagram(s). Consider increasing receive buffer size and/or "
770
83
          "max datagram size.",
771
83
          delta);
772
83
      udp_packet_processor.onDatagramsDropped(delta);
773
83
    }
774
470943
    if (num_packets_to_read <= num_packets_processed) {
775
12609
      return std::move(result.err_);
776
12609
    }
777
458334
    num_packets_to_read -= num_packets_processed;
778
458334
    --num_reads;
779
458334
    if (num_reads == 0) {
780
2
      return std::move(result.err_);
781
2
    }
782
458334
  } while (true);
783
102846
}
784

            
785
ResolvedUdpSocketConfig::ResolvedUdpSocketConfig(
786
    const envoy::config::core::v3::UdpSocketConfig& config, bool prefer_gro_default)
787
2664
    : max_rx_datagram_size_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_rx_datagram_size,
788
                                                            DEFAULT_UDP_MAX_DATAGRAM_SIZE)),
789
2664
      prefer_gro_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, prefer_gro, prefer_gro_default)) {
790
2664
  if (prefer_gro_ && !Api::OsSysCallsSingleton::get().supportsUdpGro()) {
791
8
    ENVOY_LOG_MISC(
792
8
        warn, "GRO requested but not supported by the OS. Check OS config or disable prefer_gro.");
793
8
  }
794
2664
}
795

            
796
} // namespace Network
797
} // namespace Envoy