Line data Source code
1 : #include "source/common/network/utility.h"
2 :
3 : #include <cstdint>
4 : #include <list>
5 : #include <memory>
6 : #include <sstream>
7 : #include <string>
8 : #include <vector>
9 :
10 : #include "envoy/buffer/buffer.h"
11 : #include "envoy/common/exception.h"
12 : #include "envoy/common/platform.h"
13 : #include "envoy/config/core/v3/address.pb.h"
14 : #include "envoy/network/address.h"
15 : #include "envoy/network/connection.h"
16 :
17 : #include "source/common/api/os_sys_calls_impl.h"
18 : #include "source/common/buffer/buffer_impl.h"
19 : #include "source/common/common/assert.h"
20 : #include "source/common/common/cleanup.h"
21 : #include "source/common/common/fmt.h"
22 : #include "source/common/common/utility.h"
23 : #include "source/common/network/address_impl.h"
24 : #include "source/common/network/io_socket_error_impl.h"
25 : #include "source/common/network/socket_option_impl.h"
26 : #include "source/common/protobuf/protobuf.h"
27 : #include "source/common/protobuf/utility.h"
28 : #include "source/common/runtime/runtime_features.h"
29 :
30 : #include "absl/container/fixed_array.h"
31 : #include "absl/strings/match.h"
32 : #include "absl/strings/string_view.h"
33 :
34 : namespace Envoy {
35 : namespace Network {
36 :
37 185676 : Address::InstanceConstSharedPtr instanceOrNull(StatusOr<Address::InstanceConstSharedPtr> address) {
38 185676 : if (address.ok()) {
39 185676 : return *address;
40 185676 : }
41 0 : return nullptr;
42 185676 : }
43 :
44 0 : std::string Utility::urlFromDatagramAddress(const Address::Instance& addr) {
45 0 : if (addr.ip() != nullptr) {
46 0 : return absl::StrCat(UDP_SCHEME, addr.asStringView());
47 0 : } else {
48 0 : return absl::StrCat(UNIX_SCHEME, addr.asStringView());
49 0 : }
50 0 : }
51 :
52 184290 : Address::InstanceConstSharedPtr Utility::resolveUrl(const std::string& url) {
53 184290 : if (urlIsTcpScheme(url)) {
54 184057 : return parseInternetAddressAndPort(url.substr(TCP_SCHEME.size()));
55 184057 : } else if (urlIsUdpScheme(url)) {
56 8 : return parseInternetAddressAndPort(url.substr(UDP_SCHEME.size()));
57 225 : } else if (urlIsUnixScheme(url)) {
58 100 : return std::make_shared<Address::PipeInstance>(url.substr(UNIX_SCHEME.size()));
59 125 : } else {
60 125 : throwEnvoyExceptionOrPanic(absl::StrCat("unknown protocol scheme: ", url));
61 125 : }
62 184290 : }
63 :
64 0 : StatusOr<Socket::Type> Utility::socketTypeFromUrl(const std::string& url) {
65 0 : if (urlIsTcpScheme(url)) {
66 0 : return Socket::Type::Stream;
67 0 : } else if (urlIsUdpScheme(url)) {
68 0 : return Socket::Type::Datagram;
69 0 : } else if (urlIsUnixScheme(url)) {
70 0 : return Socket::Type::Stream;
71 0 : } else {
72 0 : return absl::InvalidArgumentError(absl::StrCat("unknown protocol scheme: ", url));
73 0 : }
74 0 : }
75 :
76 184290 : bool Utility::urlIsTcpScheme(absl::string_view url) { return absl::StartsWith(url, TCP_SCHEME); }
77 :
78 233 : bool Utility::urlIsUdpScheme(absl::string_view url) { return absl::StartsWith(url, UDP_SCHEME); }
79 :
80 225 : bool Utility::urlIsUnixScheme(absl::string_view url) { return absl::StartsWith(url, UNIX_SCHEME); }
81 :
82 : namespace {
83 :
84 : Api::IoCallUint64Result receiveMessage(uint64_t max_rx_datagram_size, Buffer::InstancePtr& buffer,
85 : IoHandle::RecvMsgOutput& output, IoHandle& handle,
86 420 : const Address::Instance& local_address) {
87 :
88 420 : auto reservation = buffer->reserveSingleSlice(max_rx_datagram_size);
89 420 : Buffer::RawSlice slice = reservation.slice();
90 420 : Api::IoCallUint64Result result = handle.recvmsg(&slice, 1, local_address.ip()->port(), output);
91 :
92 420 : if (result.ok()) {
93 371 : reservation.commit(std::min(max_rx_datagram_size, result.return_value_));
94 371 : }
95 :
96 420 : return result;
97 420 : }
98 :
99 186252 : StatusOr<sockaddr_in> parseV4Address(const std::string& ip_address, uint16_t port) {
100 186252 : sockaddr_in sa4;
101 186252 : memset(&sa4, 0, sizeof(sa4));
102 186252 : if (inet_pton(AF_INET, ip_address.c_str(), &sa4.sin_addr) != 1) {
103 1027 : return absl::FailedPreconditionError("failed parsing ipv4");
104 1027 : }
105 185225 : sa4.sin_family = AF_INET;
106 185225 : sa4.sin_port = htons(port);
107 185225 : return sa4;
108 186252 : }
109 :
110 1029 : StatusOr<sockaddr_in6> parseV6Address(const std::string& ip_address, uint16_t port) {
111 : // Parse IPv6 with optional scope using getaddrinfo().
112 1029 : struct addrinfo hints;
113 1029 : memset(&hints, 0, sizeof(hints));
114 1029 : struct addrinfo* res = nullptr;
115 : // Suppresses any potentially lengthy network host address lookups and inhibit the invocation of
116 : // a name resolution service.
117 1029 : hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
118 1029 : hints.ai_family = AF_INET6;
119 : // Given that we don't specify a service but we use getaddrinfo() to only parse the node
120 : // address, specifying the socket type allows to hint the getaddrinfo() to return only an
121 : // element with the below socket type. The behavior though remains platform dependent and anyway
122 : // we consume only the first element (if the call succeeds).
123 1029 : hints.ai_socktype = SOCK_DGRAM;
124 1029 : hints.ai_protocol = IPPROTO_UDP;
125 1029 : const Api::SysCallIntResult rc = Api::OsSysCallsSingleton::get().getaddrinfo(
126 1029 : ip_address.c_str(), /*service=*/nullptr, &hints, &res);
127 1029 : if (rc.return_value_ != 0) {
128 578 : return absl::FailedPreconditionError(fmt::format("getaddrinfo error: {}", rc.return_value_));
129 578 : }
130 451 : sockaddr_in6 sa6 = *reinterpret_cast<sockaddr_in6*>(res->ai_addr);
131 451 : freeaddrinfo(res);
132 451 : sa6.sin6_port = htons(port);
133 451 : return sa6;
134 1029 : }
135 :
136 : } // namespace
137 :
138 : Address::InstanceConstSharedPtr Utility::parseInternetAddressNoThrow(const std::string& ip_address,
139 2279 : uint16_t port, bool v6only) {
140 2279 : StatusOr<sockaddr_in> sa4 = parseV4Address(ip_address, port);
141 2279 : if (sa4.ok()) {
142 1283 : return instanceOrNull(
143 1283 : Address::InstanceFactory::createInstancePtr<Address::Ipv4Instance>(&sa4.value()));
144 1283 : }
145 :
146 996 : StatusOr<sockaddr_in6> sa6 = parseV6Address(ip_address, port);
147 996 : if (sa6.ok()) {
148 435 : return instanceOrNull(
149 435 : Address::InstanceFactory::createInstancePtr<Address::Ipv6Instance>(*sa6, v6only));
150 435 : }
151 561 : return nullptr;
152 996 : }
153 :
154 : Address::InstanceConstSharedPtr Utility::parseInternetAddress(const std::string& ip_address,
155 2039 : uint16_t port, bool v6only) {
156 2039 : const Address::InstanceConstSharedPtr address =
157 2039 : parseInternetAddressNoThrow(ip_address, port, v6only);
158 2039 : if (address == nullptr) {
159 515 : throwWithMalformedIp(ip_address);
160 515 : }
161 2039 : return address;
162 2039 : }
163 :
164 : Address::InstanceConstSharedPtr
165 184348 : Utility::parseInternetAddressAndPortNoThrow(const std::string& ip_address, bool v6only) {
166 184348 : if (ip_address.empty()) {
167 8 : return nullptr;
168 8 : }
169 184340 : if (ip_address[0] == '[') {
170 : // Appears to be an IPv6 address. Find the "]:" that separates the address from the port.
171 97 : const auto pos = ip_address.rfind("]:");
172 97 : if (pos == std::string::npos) {
173 33 : return nullptr;
174 33 : }
175 64 : const auto ip_str = ip_address.substr(1, pos - 1);
176 64 : const auto port_str = ip_address.substr(pos + 2);
177 64 : uint64_t port64 = 0;
178 64 : if (port_str.empty() || !absl::SimpleAtoi(port_str, &port64) || port64 > 65535) {
179 31 : return nullptr;
180 31 : }
181 33 : StatusOr<sockaddr_in6> sa6 = parseV6Address(ip_str, port64);
182 33 : if (sa6.ok()) {
183 16 : return instanceOrNull(
184 16 : Address::InstanceFactory::createInstancePtr<Address::Ipv6Instance>(*sa6, v6only));
185 16 : }
186 17 : return nullptr;
187 33 : }
188 : // Treat it as an IPv4 address followed by a port.
189 184243 : const auto pos = ip_address.rfind(':');
190 184243 : if (pos == std::string::npos) {
191 169 : return nullptr;
192 169 : }
193 184074 : const auto ip_str = ip_address.substr(0, pos);
194 184074 : const auto port_str = ip_address.substr(pos + 1);
195 184074 : uint64_t port64 = 0;
196 184074 : if (port_str.empty() || !absl::SimpleAtoi(port_str, &port64) || port64 > 65535) {
197 101 : return nullptr;
198 101 : }
199 183973 : StatusOr<sockaddr_in> sa4 = parseV4Address(ip_str, port64);
200 183973 : if (sa4.ok()) {
201 183942 : return instanceOrNull(
202 183942 : Address::InstanceFactory::createInstancePtr<Address::Ipv4Instance>(&sa4.value()));
203 183942 : }
204 31 : return nullptr;
205 183973 : }
206 :
207 : Address::InstanceConstSharedPtr Utility::parseInternetAddressAndPort(const std::string& ip_address,
208 184348 : bool v6only) {
209 :
210 184348 : const Address::InstanceConstSharedPtr address =
211 184348 : parseInternetAddressAndPortNoThrow(ip_address, v6only);
212 184348 : if (address == nullptr) {
213 390 : throwWithMalformedIp(ip_address);
214 390 : }
215 184348 : return address;
216 184348 : }
217 :
218 0 : Address::InstanceConstSharedPtr Utility::copyInternetAddressAndPort(const Address::Ip& ip) {
219 0 : if (ip.version() == Address::IpVersion::v4) {
220 0 : return std::make_shared<Address::Ipv4Instance>(ip.addressAsString(), ip.port());
221 0 : }
222 0 : return std::make_shared<Address::Ipv6Instance>(ip.addressAsString(), ip.port());
223 0 : }
224 :
225 905 : void Utility::throwWithMalformedIp(absl::string_view ip_address) {
226 905 : throwEnvoyExceptionOrPanic(absl::StrCat("malformed IP address: ", ip_address));
227 905 : }
228 :
229 : // TODO(hennna): Currently getLocalAddress does not support choosing between
230 : // multiple interfaces and addresses not returned by getifaddrs. In addition,
231 : // the default is to return a loopback address of type version. This function may
232 : // need to be updated in the future. Discussion can be found at Github issue #939.
233 98 : Address::InstanceConstSharedPtr Utility::getLocalAddress(const Address::IpVersion version) {
234 98 : Address::InstanceConstSharedPtr ret;
235 98 : if (Api::OsSysCallsSingleton::get().supportsGetifaddrs()) {
236 98 : Api::InterfaceAddressVector interface_addresses{};
237 :
238 98 : const Api::SysCallIntResult rc =
239 98 : Api::OsSysCallsSingleton::get().getifaddrs(interface_addresses);
240 98 : if (rc.return_value_ != 0) {
241 0 : ENVOY_LOG_MISC(debug, fmt::format("getifaddrs error: {}", rc.errno_));
242 98 : } else {
243 : // man getifaddrs(3)
244 196 : for (const auto& interface_address : interface_addresses) {
245 196 : if (!isLoopbackAddress(*interface_address.interface_addr_) &&
246 196 : interface_address.interface_addr_->ip()->version() == version) {
247 98 : ret = interface_address.interface_addr_;
248 98 : if (ret->ip()->version() == Address::IpVersion::v6) {
249 0 : ret = ret->ip()->ipv6()->addressWithoutScopeId();
250 0 : }
251 98 : break;
252 98 : }
253 196 : }
254 98 : }
255 98 : }
256 :
257 : // If the local address is not found above, then return the loopback address by default.
258 98 : if (ret == nullptr) {
259 0 : if (version == Address::IpVersion::v4) {
260 0 : ret = std::make_shared<Address::Ipv4Instance>("127.0.0.1");
261 0 : } else if (version == Address::IpVersion::v6) {
262 0 : ret = std::make_shared<Address::Ipv6Instance>("::1");
263 0 : }
264 0 : }
265 98 : return ret;
266 98 : }
267 :
268 0 : bool Utility::isSameIpOrLoopback(const ConnectionInfoProvider& connection_info_provider) {
269 : // These are local:
270 : // - Pipes
271 : // - Sockets to a loopback address
272 : // - Sockets where the local and remote address (ignoring port) are the same
273 0 : const auto& remote_address = connection_info_provider.remoteAddress();
274 0 : if (remote_address->type() == Address::Type::Pipe || isLoopbackAddress(*remote_address)) {
275 0 : return true;
276 0 : }
277 0 : const auto local_ip = connection_info_provider.localAddress()->ip();
278 0 : const auto remote_ip = remote_address->ip();
279 0 : if (remote_ip != nullptr && local_ip != nullptr &&
280 0 : remote_ip->addressAsString() == local_ip->addressAsString()) {
281 0 : return true;
282 0 : }
283 0 : return false;
284 0 : }
285 :
286 201 : bool Utility::isInternalAddress(const Address::Instance& address) {
287 201 : if (address.type() != Address::Type::Ip) {
288 0 : return false;
289 0 : }
290 :
291 201 : if (address.ip()->version() == Address::IpVersion::v4) {
292 : // Handle the RFC1918 space for IPV4. Also count loopback as internal.
293 201 : const uint32_t address4 = address.ip()->ipv4()->address();
294 201 : const uint8_t* address4_bytes = reinterpret_cast<const uint8_t*>(&address4);
295 201 : if ((address4_bytes[0] == 10) || (address4_bytes[0] == 192 && address4_bytes[1] == 168) ||
296 201 : (address4_bytes[0] == 172 && address4_bytes[1] >= 16 && address4_bytes[1] <= 31) ||
297 201 : address4 == htonl(INADDR_LOOPBACK)) {
298 179 : return true;
299 201 : } else {
300 22 : return false;
301 22 : }
302 201 : }
303 :
304 : // Local IPv6 address prefix defined in RFC4193. Local addresses have prefix FC00::/7.
305 : // Currently, the FD00::/8 prefix is locally assigned and FC00::/8 may be defined in the
306 : // future.
307 0 : static_assert(sizeof(absl::uint128) == sizeof(in6addr_loopback),
308 0 : "sizeof(absl::uint128) != sizeof(in6addr_loopback)");
309 0 : const absl::uint128 address6 = address.ip()->ipv6()->address();
310 0 : const uint8_t* address6_bytes = reinterpret_cast<const uint8_t*>(&address6);
311 0 : if (address6_bytes[0] == 0xfd ||
312 0 : memcmp(&address6, &in6addr_loopback, sizeof(in6addr_loopback)) == 0) {
313 0 : return true;
314 0 : }
315 :
316 0 : return false;
317 0 : }
318 :
319 316 : bool Utility::isLoopbackAddress(const Address::Instance& address) {
320 316 : if (address.type() != Address::Type::Ip) {
321 0 : return false;
322 0 : }
323 :
324 316 : if (address.ip()->version() == Address::IpVersion::v4) {
325 : // Compare to the canonical v4 loopback address: 127.0.0.1.
326 316 : return address.ip()->ipv4()->address() == htonl(INADDR_LOOPBACK);
327 316 : } else if (address.ip()->version() == Address::IpVersion::v6) {
328 0 : static_assert(sizeof(absl::uint128) == sizeof(in6addr_loopback),
329 0 : "sizeof(absl::uint128) != sizeof(in6addr_loopback)");
330 0 : absl::uint128 addr = address.ip()->ipv6()->address();
331 0 : return 0 == memcmp(&addr, &in6addr_loopback, sizeof(in6addr_loopback));
332 0 : }
333 0 : IS_ENVOY_BUG("unexpected address type");
334 0 : return false;
335 316 : }
336 :
337 530 : Address::InstanceConstSharedPtr Utility::getCanonicalIpv4LoopbackAddress() {
338 530 : CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
339 530 : new Address::Ipv4Instance("127.0.0.1", 0, nullptr));
340 530 : }
341 :
342 2 : Address::InstanceConstSharedPtr Utility::getIpv6LoopbackAddress() {
343 2 : CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
344 2 : new Address::Ipv6Instance("::1", 0, nullptr));
345 2 : }
346 :
347 0 : Address::InstanceConstSharedPtr Utility::getIpv4AnyAddress() {
348 0 : CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
349 0 : new Address::Ipv4Instance(static_cast<uint32_t>(0)));
350 0 : }
351 :
352 0 : Address::InstanceConstSharedPtr Utility::getIpv6AnyAddress() {
353 0 : CONSTRUCT_ON_FIRST_USE(Address::InstanceConstSharedPtr,
354 0 : new Address::Ipv6Instance(static_cast<uint32_t>(0)));
355 0 : }
356 :
357 423 : const std::string& Utility::getIpv4CidrCatchAllAddress() {
358 423 : CONSTRUCT_ON_FIRST_USE(std::string, "0.0.0.0/0");
359 423 : }
360 :
361 423 : const std::string& Utility::getIpv6CidrCatchAllAddress() {
362 423 : CONSTRUCT_ON_FIRST_USE(std::string, "::/0");
363 423 : }
364 :
365 : Address::InstanceConstSharedPtr Utility::getAddressWithPort(const Address::Instance& address,
366 71 : uint32_t port) {
367 71 : switch (address.ip()->version()) {
368 69 : case Address::IpVersion::v4:
369 69 : return std::make_shared<Address::Ipv4Instance>(address.ip()->addressAsString(), port);
370 2 : case Address::IpVersion::v6:
371 2 : return std::make_shared<Address::Ipv6Instance>(address.ip()->addressAsString(), port);
372 71 : }
373 0 : PANIC("not handled");
374 0 : }
375 :
376 107 : Address::InstanceConstSharedPtr Utility::getOriginalDst(Socket& sock) {
377 107 : #ifdef SOL_IP
378 :
379 107 : if (sock.addressType() != Address::Type::Ip) {
380 0 : return nullptr;
381 0 : }
382 :
383 107 : auto ipVersion = sock.ipVersion();
384 107 : if (!ipVersion.has_value()) {
385 0 : return nullptr;
386 0 : }
387 :
388 107 : SocketOptionName opt_dst;
389 107 : SocketOptionName opt_tp;
390 107 : if (*ipVersion == Address::IpVersion::v4) {
391 99 : opt_dst = ENVOY_SOCKET_SO_ORIGINAL_DST;
392 99 : opt_tp = ENVOY_SOCKET_IP_TRANSPARENT;
393 99 : } else {
394 8 : opt_dst = ENVOY_SOCKET_IP6T_SO_ORIGINAL_DST;
395 8 : opt_tp = ENVOY_SOCKET_IPV6_TRANSPARENT;
396 8 : }
397 :
398 107 : sockaddr_storage orig_addr;
399 107 : memset(&orig_addr, 0, sizeof(orig_addr));
400 107 : socklen_t addr_len = sizeof(sockaddr_storage);
401 107 : int status =
402 107 : sock.getSocketOption(opt_dst.level(), opt_dst.option(), &orig_addr, &addr_len).return_value_;
403 :
404 107 : if (status != 0) {
405 0 : if (Api::OsSysCallsSingleton::get().supportsIpTransparent(*ipVersion)) {
406 0 : socklen_t flag_len = sizeof(int);
407 0 : int is_tp;
408 0 : status =
409 0 : sock.getSocketOption(opt_tp.level(), opt_tp.option(), &is_tp, &flag_len).return_value_;
410 0 : if (status == 0 && is_tp) {
411 0 : return sock.ioHandle().localAddress();
412 0 : }
413 0 : }
414 0 : return nullptr;
415 0 : }
416 :
417 107 : return Address::addressFromSockAddrOrDie(orig_addr, 0, -1, true /* default for v6 constructor */);
418 :
419 : #else
420 : // TODO(zuercher): determine if connection redirection is possible under macOS (c.f. pfctl and
421 : // divert), and whether it's possible to find the learn destination address.
422 : UNREFERENCED_PARAMETER(sock);
423 : return nullptr;
424 : #endif
425 107 : }
426 :
427 281 : void Utility::parsePortRangeList(absl::string_view string, std::list<PortRange>& list) {
428 281 : const auto ranges = StringUtil::splitToken(string, ",");
429 3658 : for (const auto& s : ranges) {
430 3658 : const std::string s_string{s};
431 3658 : std::stringstream ss(s_string);
432 3658 : uint32_t min = 0;
433 3658 : uint32_t max = 0;
434 :
435 3658 : if (absl::StrContains(s, '-')) {
436 15 : char dash = 0;
437 15 : ss >> min;
438 15 : ss >> dash;
439 15 : ss >> max;
440 3643 : } else {
441 3643 : ss >> min;
442 3643 : max = min;
443 3643 : }
444 :
445 3658 : if (s.empty() || (min > 65535) || (max > 65535) || ss.fail() || !ss.eof()) {
446 268 : throwEnvoyExceptionOrPanic(fmt::format("invalid port number or range '{}'", s_string));
447 268 : }
448 :
449 3390 : list.emplace_back(PortRange(min, max));
450 3390 : }
451 281 : }
452 :
453 0 : bool Utility::portInRangeList(const Address::Instance& address, const std::list<PortRange>& list) {
454 0 : if (address.type() != Address::Type::Ip) {
455 0 : return false;
456 0 : }
457 :
458 0 : for (const PortRange& p : list) {
459 0 : if (p.contains(address.ip()->port())) {
460 0 : return true;
461 0 : }
462 0 : }
463 0 : return false;
464 0 : }
465 :
466 423 : absl::uint128 Utility::Ip6ntohl(const absl::uint128& address) {
467 423 : #ifdef ABSL_IS_LITTLE_ENDIAN
468 423 : return flipOrder(address);
469 : #else
470 : return address;
471 : #endif
472 423 : }
473 :
474 0 : absl::uint128 Utility::Ip6htonl(const absl::uint128& address) {
475 0 : #ifdef ABSL_IS_LITTLE_ENDIAN
476 0 : return flipOrder(address);
477 : #else
478 : return address;
479 : #endif
480 0 : }
481 :
482 423 : absl::uint128 Utility::flipOrder(const absl::uint128& input) {
483 423 : absl::uint128 result{0};
484 423 : absl::uint128 data = input;
485 7191 : for (int i = 0; i < 16; i++) {
486 6768 : result <<= 8;
487 6768 : result |= (data & 0x000000000000000000000000000000FF);
488 6768 : data >>= 8;
489 6768 : }
490 423 : return result;
491 423 : }
492 :
493 : Address::InstanceConstSharedPtr
494 562 : Utility::protobufAddressToAddress(const envoy::config::core::v3::Address& proto_address) {
495 562 : switch (proto_address.address_case()) {
496 281 : case envoy::config::core::v3::Address::AddressCase::kSocketAddress:
497 281 : return Utility::parseInternetAddress(proto_address.socket_address().address(),
498 281 : proto_address.socket_address().port_value(),
499 281 : !proto_address.socket_address().ipv4_compat());
500 281 : case envoy::config::core::v3::Address::AddressCase::kPipe:
501 281 : return std::make_shared<Address::PipeInstance>(proto_address.pipe().path(),
502 281 : proto_address.pipe().mode());
503 0 : case envoy::config::core::v3::Address::AddressCase::kEnvoyInternalAddress:
504 0 : return std::make_shared<Address::EnvoyInternalInstance>(
505 0 : proto_address.envoy_internal_address().server_listener_name(),
506 0 : proto_address.envoy_internal_address().endpoint_id());
507 0 : case envoy::config::core::v3::Address::AddressCase::ADDRESS_NOT_SET:
508 0 : PANIC_DUE_TO_PROTO_UNSET;
509 562 : }
510 0 : PANIC_DUE_TO_CORRUPT_ENUM;
511 0 : }
512 :
513 : void Utility::addressToProtobufAddress(const Address::Instance& address,
514 331 : envoy::config::core::v3::Address& proto_address) {
515 331 : if (address.type() == Address::Type::Pipe) {
516 311 : proto_address.mutable_pipe()->set_path(address.asString());
517 311 : } else if (address.type() == Address::Type::Ip) {
518 20 : auto* socket_address = proto_address.mutable_socket_address();
519 20 : socket_address->set_address(address.ip()->addressAsString());
520 20 : socket_address->set_port_value(address.ip()->port());
521 20 : } else {
522 0 : ASSERT(address.type() == Address::Type::EnvoyInternal);
523 0 : auto* internal_address = proto_address.mutable_envoy_internal_address();
524 0 : internal_address->set_server_listener_name(address.envoyInternalAddress()->addressId());
525 0 : internal_address->set_endpoint_id(address.envoyInternalAddress()->endpointId());
526 0 : }
527 331 : }
528 :
529 : Socket::Type
530 142 : Utility::protobufAddressSocketType(const envoy::config::core::v3::Address& proto_address) {
531 142 : switch (proto_address.address_case()) {
532 128 : case envoy::config::core::v3::Address::AddressCase::kSocketAddress: {
533 128 : const auto protocol = proto_address.socket_address().protocol();
534 128 : switch (protocol) {
535 0 : PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
536 128 : case envoy::config::core::v3::SocketAddress::TCP:
537 128 : return Socket::Type::Stream;
538 0 : case envoy::config::core::v3::SocketAddress::UDP:
539 0 : return Socket::Type::Datagram;
540 128 : }
541 128 : }
542 0 : PANIC_DUE_TO_CORRUPT_ENUM;
543 14 : case envoy::config::core::v3::Address::AddressCase::kPipe:
544 14 : return Socket::Type::Stream;
545 0 : case envoy::config::core::v3::Address::AddressCase::kEnvoyInternalAddress:
546 : // Currently internal address supports stream operation only.
547 0 : return Socket::Type::Stream;
548 0 : case envoy::config::core::v3::Address::AddressCase::ADDRESS_NOT_SET:
549 0 : PANIC_DUE_TO_PROTO_UNSET;
550 142 : }
551 0 : PANIC_DUE_TO_CORRUPT_ENUM;
552 0 : }
553 :
554 : Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, const Buffer::Instance& buffer,
555 : const Address::Ip* local_ip,
556 668 : const Address::Instance& peer_address) {
557 668 : Buffer::RawSliceVector slices = buffer.getRawSlices();
558 668 : return writeToSocket(handle, slices.data(), slices.size(), local_ip, peer_address);
559 668 : }
560 :
561 : Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, Buffer::RawSlice* slices,
562 : uint64_t num_slices, const Address::Ip* local_ip,
563 668 : const Address::Instance& peer_address) {
564 668 : Api::IoCallUint64Result send_result(
565 668 : /*rc=*/0, /*err=*/Api::IoError::none());
566 668 : do {
567 668 : send_result = handle.sendmsg(slices, num_slices, 0, local_ip, peer_address);
568 668 : } while (!send_result.ok() &&
569 : // Send again if interrupted.
570 668 : send_result.err_->getErrorCode() == Api::IoError::IoErrorCode::Interrupt);
571 :
572 668 : if (send_result.ok()) {
573 668 : ENVOY_LOG_MISC(trace, "sendmsg bytes {}", send_result.return_value_);
574 668 : } else {
575 0 : ENVOY_LOG_MISC(debug, "sendmsg failed with error code {}: {}",
576 0 : static_cast<int>(send_result.err_->getErrorCode()),
577 0 : send_result.err_->getErrorDetails());
578 0 : }
579 668 : return send_result;
580 668 : }
581 :
582 : void passPayloadToProcessor(uint64_t bytes_read, Buffer::InstancePtr buffer,
583 : Address::InstanceConstSharedPtr peer_addess,
584 : Address::InstanceConstSharedPtr local_address,
585 668 : UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time) {
586 668 : RELEASE_ASSERT(
587 668 : peer_addess != nullptr,
588 668 : fmt::format("Unable to get remote address on the socket bount to local address: {} ",
589 668 : local_address->asString()));
590 :
591 : // Unix domain sockets are not supported
592 668 : RELEASE_ASSERT(peer_addess->type() == Address::Type::Ip,
593 668 : fmt::format("Unsupported remote address: {} local address: {}, receive size: "
594 668 : "{}",
595 668 : peer_addess->asString(), local_address->asString(), bytes_read));
596 668 : udp_packet_processor.processPacket(std::move(local_address), std::move(peer_addess),
597 668 : std::move(buffer), receive_time);
598 668 : }
599 :
600 : Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
601 : const Address::Instance& local_address,
602 : UdpPacketProcessor& udp_packet_processor,
603 : MonotonicTime receive_time, bool use_gro,
604 504 : uint32_t* packets_dropped) {
605 :
606 504 : if (use_gro) {
607 239 : Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
608 239 : IoHandle::RecvMsgOutput output(1, packets_dropped);
609 :
610 : // TODO(yugant): Avoid allocating 24k for each read by getting memory from UdpPacketProcessor
611 239 : const uint64_t max_rx_datagram_size_with_gro =
612 239 : NUM_DATAGRAMS_PER_RECEIVE * udp_packet_processor.maxDatagramSize();
613 239 : ENVOY_LOG_MISC(trace, "starting gro recvmsg with max={}", max_rx_datagram_size_with_gro);
614 :
615 239 : Api::IoCallUint64Result result =
616 239 : receiveMessage(max_rx_datagram_size_with_gro, buffer, output, handle, local_address);
617 :
618 239 : if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
619 26 : return result;
620 26 : }
621 :
622 213 : const uint64_t gso_size = output.msg_[0].gso_size_;
623 213 : ENVOY_LOG_MISC(trace, "gro recvmsg bytes {} with gso_size as {}", result.return_value_,
624 213 : gso_size);
625 :
626 : // Skip gso segmentation and proceed as a single payload.
627 213 : if (gso_size == 0u) {
628 213 : passPayloadToProcessor(
629 213 : result.return_value_, std::move(buffer), std::move(output.msg_[0].peer_address_),
630 213 : std::move(output.msg_[0].local_address_), udp_packet_processor, receive_time);
631 213 : return result;
632 213 : }
633 :
634 : // Segment the buffer read by the recvmsg syscall into gso_sized sub buffers.
635 : // TODO(mattklein123): The following code should be optimized to avoid buffer copies, either by
636 : // switching to slices or by using a CoW buffer type.
637 0 : while (buffer->length() > 0) {
638 0 : const uint64_t bytes_to_copy = std::min(buffer->length(), gso_size);
639 0 : Buffer::InstancePtr sub_buffer = std::make_unique<Buffer::OwnedImpl>();
640 0 : sub_buffer->move(*buffer, bytes_to_copy);
641 0 : passPayloadToProcessor(bytes_to_copy, std::move(sub_buffer), output.msg_[0].peer_address_,
642 0 : output.msg_[0].local_address_, udp_packet_processor, receive_time);
643 0 : }
644 :
645 0 : return result;
646 213 : }
647 :
648 265 : if (handle.supportsMmsg()) {
649 84 : const auto max_rx_datagram_size = udp_packet_processor.maxDatagramSize();
650 :
651 : // Buffer::ReservationSingleSlice is always passed by value, and can only be constructed
652 : // by Buffer::Instance::reserve(), so this is needed to keep a fixed array
653 : // in which all elements are legally constructed.
654 84 : struct BufferAndReservation {
655 84 : BufferAndReservation(uint64_t max_rx_datagram_size)
656 84 : : buffer_(std::make_unique<Buffer::OwnedImpl>()),
657 1344 : reservation_(buffer_->reserveSingleSlice(max_rx_datagram_size, true)) {}
658 :
659 84 : Buffer::InstancePtr buffer_;
660 84 : Buffer::ReservationSingleSlice reservation_;
661 84 : };
662 84 : constexpr uint32_t num_slices_per_packet = 1u;
663 84 : absl::InlinedVector<BufferAndReservation, NUM_DATAGRAMS_PER_RECEIVE> buffers;
664 84 : RawSliceArrays slices(NUM_DATAGRAMS_PER_RECEIVE,
665 84 : absl::FixedArray<Buffer::RawSlice>(num_slices_per_packet));
666 1428 : for (uint32_t i = 0; i < NUM_DATAGRAMS_PER_RECEIVE; i++) {
667 1344 : buffers.push_back(max_rx_datagram_size);
668 1344 : slices[i][0] = buffers[i].reservation_.slice();
669 1344 : }
670 :
671 84 : IoHandle::RecvMsgOutput output(NUM_DATAGRAMS_PER_RECEIVE, packets_dropped);
672 84 : ENVOY_LOG_MISC(trace, "starting recvmmsg with packets={} max={}", NUM_DATAGRAMS_PER_RECEIVE,
673 84 : max_rx_datagram_size);
674 84 : Api::IoCallUint64Result result = handle.recvmmsg(slices, local_address.ip()->port(), output);
675 84 : if (!result.ok()) {
676 42 : return result;
677 42 : }
678 :
679 42 : uint64_t packets_read = result.return_value_;
680 42 : ENVOY_LOG_MISC(trace, "recvmmsg read {} packets", packets_read);
681 339 : for (uint64_t i = 0; i < packets_read; ++i) {
682 297 : if (output.msg_[i].truncated_and_dropped_) {
683 0 : continue;
684 0 : }
685 :
686 297 : Buffer::RawSlice* slice = slices[i].data();
687 297 : const uint64_t msg_len = output.msg_[i].msg_len_;
688 297 : ASSERT(msg_len <= slice->len_);
689 297 : ENVOY_LOG_MISC(debug, "Receive a packet with {} bytes from {}", msg_len,
690 297 : output.msg_[i].peer_address_->asString());
691 :
692 297 : buffers[i].reservation_.commit(std::min(max_rx_datagram_size, msg_len));
693 :
694 297 : passPayloadToProcessor(msg_len, std::move(buffers[i].buffer_), output.msg_[i].peer_address_,
695 297 : output.msg_[i].local_address_, udp_packet_processor, receive_time);
696 297 : }
697 42 : return result;
698 84 : }
699 :
700 181 : Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
701 181 : IoHandle::RecvMsgOutput output(1, packets_dropped);
702 :
703 181 : ENVOY_LOG_MISC(trace, "starting recvmsg with max={}", udp_packet_processor.maxDatagramSize());
704 181 : Api::IoCallUint64Result result =
705 181 : receiveMessage(udp_packet_processor.maxDatagramSize(), buffer, output, handle, local_address);
706 :
707 181 : if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
708 23 : return result;
709 23 : }
710 :
711 158 : ENVOY_LOG_MISC(trace, "recvmsg bytes {}", result.return_value_);
712 :
713 158 : passPayloadToProcessor(
714 158 : result.return_value_, std::move(buffer), std::move(output.msg_[0].peer_address_),
715 158 : std::move(output.msg_[0].local_address_), udp_packet_processor, receive_time);
716 158 : return result;
717 181 : }
718 :
719 : Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
720 : const Address::Instance& local_address,
721 : UdpPacketProcessor& udp_packet_processor,
722 : TimeSource& time_source, bool prefer_gro,
723 91 : uint32_t& packets_dropped) {
724 : // Read at least one time, and attempt to read numPacketsExpectedPerEventLoop() packets unless
725 : // this goes over MAX_NUM_PACKETS_PER_EVENT_LOOP.
726 91 : size_t num_packets_to_read = std::min<size_t>(
727 91 : MAX_NUM_PACKETS_PER_EVENT_LOOP, udp_packet_processor.numPacketsExpectedPerEventLoop());
728 91 : const bool use_gro = prefer_gro && handle.supportsUdpGro();
729 91 : size_t num_reads =
730 91 : use_gro ? (num_packets_to_read / NUM_DATAGRAMS_PER_RECEIVE)
731 91 : : (handle.supportsMmsg() ? (num_packets_to_read / NUM_DATAGRAMS_PER_RECEIVE)
732 65 : : num_packets_to_read);
733 : // Make sure to read at least once.
734 91 : num_reads = std::max<size_t>(1, num_reads);
735 504 : do {
736 504 : const uint32_t old_packets_dropped = packets_dropped;
737 504 : const MonotonicTime receive_time = time_source.monotonicTime();
738 504 : Api::IoCallUint64Result result = Utility::readFromSocket(
739 504 : handle, local_address, udp_packet_processor, receive_time, use_gro, &packets_dropped);
740 :
741 504 : if (!result.ok()) {
742 : // No more to read or encountered a system error.
743 91 : return std::move(result.err_);
744 91 : }
745 :
746 413 : if (packets_dropped != old_packets_dropped) {
747 : // The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller
748 : // value. So as long as this count differs from previously recorded value,
749 : // more packets are dropped by kernel.
750 0 : const uint32_t delta =
751 0 : (packets_dropped > old_packets_dropped)
752 0 : ? (packets_dropped - old_packets_dropped)
753 0 : : (packets_dropped + (std::numeric_limits<uint32_t>::max() - old_packets_dropped) +
754 0 : 1);
755 0 : ENVOY_LOG_EVERY_POW_2_MISC(
756 0 : warn,
757 0 : "Kernel dropped {} datagram(s). Consider increasing receive buffer size and/or "
758 0 : "max datagram size.",
759 0 : delta);
760 0 : udp_packet_processor.onDatagramsDropped(delta);
761 0 : }
762 413 : --num_reads;
763 413 : if (num_reads == 0) {
764 0 : return std::move(result.err_);
765 0 : }
766 413 : } while (true);
767 91 : }
768 :
769 : ResolvedUdpSocketConfig::ResolvedUdpSocketConfig(
770 : const envoy::config::core::v3::UdpSocketConfig& config, bool prefer_gro_default)
771 : : max_rx_datagram_size_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_rx_datagram_size,
772 : DEFAULT_UDP_MAX_DATAGRAM_SIZE)),
773 91 : prefer_gro_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, prefer_gro, prefer_gro_default)) {
774 91 : if (prefer_gro_ && !Api::OsSysCallsSingleton::get().supportsUdpGro()) {
775 0 : ENVOY_LOG_MISC(
776 0 : warn, "GRO requested but not supported by the OS. Check OS config or disable prefer_gro.");
777 0 : }
778 91 : }
779 :
780 : } // namespace Network
781 : } // namespace Envoy
|