1
#include "source/common/network/io_socket_handle_impl.h"
2

            
3
#include <memory>
4

            
5
#include "envoy/buffer/buffer.h"
6

            
7
#include "source/common/api/os_sys_calls_impl.h"
8
#include "source/common/buffer/buffer_impl.h"
9
#include "source/common/common/safe_memcpy.h"
10
#include "source/common/common/utility.h"
11
#include "source/common/event/file_event_impl.h"
12
#include "source/common/network/address_impl.h"
13
#include "source/common/network/socket_interface_impl.h"
14

            
15
#include "absl/container/fixed_array.h"
16
#include "absl/types/optional.h"
17

            
18
using Envoy::Api::SysCallIntResult;
19
using Envoy::Api::SysCallSizeResult;
20

            
21
namespace Envoy {
22

            
23
namespace {
24

            
25
982022
constexpr int messageTypeContainsIP() {
26
#ifdef IP_RECVDSTADDR
27
  return IP_RECVDSTADDR;
28
#else
29
982022
  return IP_PKTINFO;
30
982022
#endif
31
982022
}
32

            
33
634865
in_addr addressFromMessage(const cmsghdr& cmsg) {
34
#ifdef IP_RECVDSTADDR
35
  return *reinterpret_cast<const in_addr*>(CMSG_DATA(&cmsg));
36
#else
37
634865
  auto info = reinterpret_cast<const in_pktinfo*>(CMSG_DATA(&cmsg));
38
634865
  return info->ipi_addr;
39
634865
#endif
40
634865
}
41

            
42
561290
constexpr int messageTruncatedOption() {
43
#if defined(__APPLE__)
44
  // OSX does not support passing `MSG_TRUNC` to recvmsg and recvmmsg. This does not effect
45
  // functionality and it primarily used for logging.
46
  return 0;
47
#else
48
561290
  return MSG_TRUNC;
49
561290
#endif
50
561290
}
51

            
52
} // namespace
53

            
54
namespace Network {
55

            
56
155449
IoSocketHandleImpl::~IoSocketHandleImpl() {
57
155449
  if (SOCKET_VALID(fd_)) {
58
40670
    IoSocketHandleImpl::close();
59
40670
  }
60
155449
}
61

            
62
152365
Api::IoCallUint64Result IoSocketHandleImpl::close() {
63
152365
  if (file_event_) {
64
111470
    file_event_.reset();
65
111470
  }
66

            
67
152365
  ASSERT(SOCKET_VALID(fd_));
68
152365
  const int rc = Api::OsSysCallsSingleton::get().close(fd_).return_value_;
69
152365
  SET_SOCKET_INVALID(fd_);
70
152365
  return {static_cast<unsigned long>(rc), Api::IoError::none()};
71
152365
}
72

            
73
Api::IoCallUint64Result IoSocketHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices,
74
1188480
                                                  uint64_t num_slice) {
75
1188480
  absl::FixedArray<iovec> iov(num_slice);
76
1188480
  uint64_t num_slices_to_read = 0;
77
1188480
  uint64_t num_bytes_to_read = 0;
78
9718706
  for (; num_slices_to_read < num_slice && num_bytes_to_read < max_length; num_slices_to_read++) {
79
8530226
    iov[num_slices_to_read].iov_base = slices[num_slices_to_read].mem_;
80
8530226
    const size_t slice_length = std::min(slices[num_slices_to_read].len_,
81
8530226
                                         static_cast<size_t>(max_length - num_bytes_to_read));
82
8530226
    iov[num_slices_to_read].iov_len = slice_length;
83
8530226
    num_bytes_to_read += slice_length;
84
8530226
  }
85
1188480
  ASSERT(num_bytes_to_read <= max_length);
86

            
87
1188480
  if (num_slices_to_read == 1) {
88
    // Avoid paying the VFS overhead when there is only one IO buffer to work with
89
134897
    return sysCallResultToIoCallResult(
90
134897
        Api::OsSysCallsSingleton::get().recv(fd_, iov[0].iov_base, iov[0].iov_len, 0));
91
134897
  }
92

            
93
1053583
  auto result = sysCallResultToIoCallResult(Api::OsSysCallsSingleton::get().readv(
94
1053583
      fd_, iov.begin(), static_cast<int>(num_slices_to_read)));
95
1053583
  return result;
96
1188480
}
97

            
98
Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer,
99
1107144
                                                 absl::optional<uint64_t> max_length_opt) {
100
1107144
  const uint64_t max_length = max_length_opt.value_or(UINT64_MAX);
101
1107144
  if (max_length == 0) {
102
2
    return Api::ioCallUint64ResultNoError();
103
2
  }
104
1107142
  Buffer::Reservation reservation = buffer.reserveForRead();
105
1107142
  Api::IoCallUint64Result result = readv(std::min(reservation.length(), max_length),
106
1107142
                                         reservation.slices(), reservation.numSlices());
107
1107142
  uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0;
108
1107142
  ASSERT(bytes_to_commit <= max_length);
109
1107142
  reservation.commit(bytes_to_commit);
110
1107142
  return result;
111
1107144
}
112

            
113
Api::IoCallUint64Result IoSocketHandleImpl::writev(const Buffer::RawSlice* slices,
114
835944
                                                   uint64_t num_slice) {
115
835944
  absl::FixedArray<iovec> iov(num_slice);
116
835944
  uint64_t num_slices_to_write = 0;
117
2474950
  for (uint64_t i = 0; i < num_slice; i++) {
118
1639007
    if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
119
1639006
      iov[num_slices_to_write].iov_base = slices[i].mem_;
120
1639006
      iov[num_slices_to_write].iov_len = slices[i].len_;
121
1639006
      num_slices_to_write++;
122
1639006
    }
123
1639006
  }
124
835944
  if (num_slices_to_write == 0) {
125
2
    return Api::ioCallUint64ResultNoError();
126
2
  }
127

            
128
835942
  if (num_slices_to_write == 1) {
129
    // Avoid paying the VFS overhead when there is only one IO buffer to work with
130
661312
    return sysCallResultToIoCallResult(
131
661312
        Api::OsSysCallsSingleton::get().send(fd_, iov[0].iov_base, iov[0].iov_len, 0));
132
661312
  }
133

            
134
174630
  auto result = sysCallResultToIoCallResult(
135
174630
      Api::OsSysCallsSingleton::get().writev(fd_, iov.begin(), num_slices_to_write));
136
174630
  return result;
137
835942
}
138

            
139
569269
Api::IoCallUint64Result IoSocketHandleImpl::write(Buffer::Instance& buffer) {
140
569269
  constexpr uint64_t MaxSlices = 16;
141
569269
  Buffer::RawSliceVector slices = buffer.getRawSlices(MaxSlices);
142
569269
  Api::IoCallUint64Result result = writev(slices.begin(), slices.size());
143
569271
  if (result.ok() && result.return_value_ > 0) {
144
566505
    buffer.drain(static_cast<uint64_t>(result.return_value_));
145
566505
  }
146
569269
  return result;
147
569269
}
148

            
149
Api::IoCallUint64Result IoSocketHandleImpl::sendmsg(const Buffer::RawSlice* slices,
150
                                                    uint64_t num_slice, int flags,
151
                                                    const Address::Ip* self_ip,
152
356669
                                                    const Address::Instance& peer_address) {
153
356669
  const auto* address_base = dynamic_cast<const Address::InstanceBase*>(&peer_address);
154
356669
  sockaddr* sock_addr = const_cast<sockaddr*>(address_base->sockAddr());
155
356669
  if (sock_addr == nullptr) {
156
    // Unlikely to happen unless the wrong peer address is passed.
157
1
    return IoSocketError::ioResultSocketInvalidAddress();
158
1
  }
159
356668
  absl::FixedArray<iovec> iov(num_slice);
160
356668
  uint64_t num_slices_to_write = 0;
161
713336
  for (uint64_t i = 0; i < num_slice; i++) {
162
356668
    if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
163
356668
      iov[num_slices_to_write].iov_base = slices[i].mem_;
164
356668
      iov[num_slices_to_write].iov_len = slices[i].len_;
165
356668
      num_slices_to_write++;
166
356668
    }
167
356668
  }
168
356668
  if (num_slices_to_write == 0) {
169
    return Api::ioCallUint64ResultNoError();
170
  }
171

            
172
356668
  msghdr message;
173
356668
  message.msg_name = reinterpret_cast<void*>(sock_addr);
174
356668
  message.msg_namelen = address_base->sockAddrLen();
175
356668
  message.msg_iov = iov.begin();
176
356668
  message.msg_iovlen = num_slices_to_write;
177
356668
  message.msg_flags = 0;
178
356668
  auto& os_syscalls = Api::OsSysCallsSingleton::get();
179
356668
  if (self_ip == nullptr) {
180
563
    message.msg_control = nullptr;
181
563
    message.msg_controllen = 0;
182
563
    const Api::SysCallSizeResult result = os_syscalls.sendmsg(fd_, &message, flags);
183
563
    return sysCallResultToIoCallResult(result);
184
356528
  } else {
185
356105
    const size_t space_v6 = CMSG_SPACE(sizeof(in6_pktinfo));
186
356105
    const size_t space_v4 = CMSG_SPACE(sizeof(in_pktinfo));
187

            
188
    // FreeBSD only needs in_addr size, but allocates more to unify code in two platforms.
189
356105
    const size_t cmsg_space = (self_ip->version() == Address::IpVersion::v4) ? space_v4 : space_v6;
190
    // kSpaceForIp should be big enough to hold both IPv4 and IPv6 packet info.
191
356105
    absl::FixedArray<char> cbuf(cmsg_space);
192
356105
    memset(cbuf.begin(), 0, cmsg_space);
193

            
194
356105
    message.msg_control = cbuf.begin();
195
356105
    message.msg_controllen = cmsg_space;
196
356105
    cmsghdr* const cmsg = CMSG_FIRSTHDR(&message);
197
356105
    RELEASE_ASSERT(cmsg != nullptr, fmt::format("cbuf with size {} is not enough, cmsghdr size {}",
198
356105
                                                sizeof(cbuf), sizeof(cmsghdr)));
199
356105
    if (self_ip->version() == Address::IpVersion::v4) {
200
356099
      cmsg->cmsg_level = IPPROTO_IP;
201
356099
#ifndef IP_SENDSRCADDR
202
356099
      cmsg->cmsg_len = CMSG_LEN(sizeof(in_pktinfo));
203
356099
      cmsg->cmsg_type = IP_PKTINFO;
204
356099
      auto pktinfo = reinterpret_cast<in_pktinfo*>(CMSG_DATA(cmsg));
205
356099
      pktinfo->ipi_ifindex = 0;
206
#ifdef WIN32
207
      pktinfo->ipi_addr.s_addr = self_ip->ipv4()->address();
208
#else
209
356099
      pktinfo->ipi_spec_dst.s_addr = self_ip->ipv4()->address();
210
356099
#endif
211
#else
212
      cmsg->cmsg_type = IP_SENDSRCADDR;
213
      cmsg->cmsg_len = CMSG_LEN(sizeof(in_addr));
214
      *(reinterpret_cast<struct in_addr*>(CMSG_DATA(cmsg))).s_addr = self_ip->ipv4()->address();
215
#endif
216
356105
    } else if (self_ip->version() == Address::IpVersion::v6) {
217
6
      cmsg->cmsg_len = CMSG_LEN(sizeof(in6_pktinfo));
218
6
      cmsg->cmsg_level = IPPROTO_IPV6;
219
6
      cmsg->cmsg_type = IPV6_PKTINFO;
220
6
      auto pktinfo = reinterpret_cast<in6_pktinfo*>(CMSG_DATA(cmsg));
221
6
      pktinfo->ipi6_ifindex = 0;
222
6
      *(reinterpret_cast<absl::uint128*>(pktinfo->ipi6_addr.s6_addr)) = self_ip->ipv6()->address();
223
6
    }
224
356105
    const Api::SysCallSizeResult result = os_syscalls.sendmsg(fd_, &message, flags);
225
356105
    if (result.return_value_ < 0 && result.errno_ == SOCKET_ERROR_INVAL) {
226
1
      ENVOY_LOG(error, fmt::format("EINVAL error. Socket is open: {}, IPv{}.", isOpen(),
227
1
                                   self_ip->version() == Address::IpVersion::v6 ? 6 : 4));
228
1
    }
229
356105
    return sysCallResultToIoCallResult(result);
230
356105
  }
231
356668
}
232

            
233
Address::InstanceConstSharedPtr
234
1270071
IoSocketHandleImpl::getOrCreateEnvoyAddressInstance(sockaddr_storage ss, socklen_t ss_len) {
235
1270071
  if (!recent_received_addresses_) {
236
481510
    return Address::addressFromSockAddrOrDie(ss, ss_len, fd_, socket_v6only_);
237
481510
  }
238
788561
  quic::QuicSocketAddress quic_address(ss);
239
788561
  auto it = std::find_if(
240
788561
      recent_received_addresses_->begin(), recent_received_addresses_->end(),
241
788641
      [&quic_address](const QuicEnvoyAddressPair& pair) { return pair.first == quic_address; });
242
788561
  if (it != recent_received_addresses_->end()) {
243
782650
    Address::InstanceConstSharedPtr cached_addr = it->second;
244
    // Move the entry to the back of the list since it's the most recently accessed entry.
245
782650
    std::rotate(it, it + 1, recent_received_addresses_->end());
246
782650
    return cached_addr;
247
782650
  }
248
5911
  Address::InstanceConstSharedPtr new_address =
249
5911
      Address::addressFromSockAddrOrDie(ss, ss_len, fd_, socket_v6only_);
250
5911
  recent_received_addresses_->push_back(QuicEnvoyAddressPair(quic_address, new_address));
251
5911
  if (recent_received_addresses_->size() > address_cache_max_capacity_) {
252
    // Over capacity so remove the first element in the list, which is the least recently accessed.
253
21
    recent_received_addresses_->erase(recent_received_addresses_->begin());
254
21
  }
255
5911
  return new_address;
256
788561
}
257

            
258
Address::InstanceConstSharedPtr
259
982017
IoSocketHandleImpl::maybeGetDstAddressFromHeader(const cmsghdr& cmsg, uint32_t self_port) {
260
982017
  if (cmsg.cmsg_type == IPV6_PKTINFO) {
261
    auto info = reinterpret_cast<const in6_pktinfo*>(CMSG_DATA(&cmsg));
262
    sockaddr_storage ss;
263
    auto ipv6_addr = reinterpret_cast<sockaddr_in6*>(&ss);
264
    memset(ipv6_addr, 0, sizeof(sockaddr_in6));
265
    ipv6_addr->sin6_family = AF_INET6;
266
    ipv6_addr->sin6_addr = info->ipi6_addr;
267
    ipv6_addr->sin6_port = htons(self_port);
268
    return getOrCreateEnvoyAddressInstance(ss, sizeof(sockaddr_in6));
269
  }
270

            
271
982017
  if (cmsg.cmsg_type == messageTypeContainsIP()) {
272
634863
    sockaddr_storage ss;
273
634863
    auto ipv4_addr = reinterpret_cast<sockaddr_in*>(&ss);
274
634863
    memset(ipv4_addr, 0, sizeof(sockaddr_in));
275
634863
    ipv4_addr->sin_family = AF_INET;
276
634863
    ipv4_addr->sin_addr = addressFromMessage(cmsg);
277
634863
    ipv4_addr->sin_port = htons(self_port);
278
634863
    return getOrCreateEnvoyAddressInstance(ss, sizeof(sockaddr_in));
279
634863
  }
280

            
281
347154
  return nullptr;
282
982017
}
283

            
284
607399
absl::optional<uint32_t> maybeGetPacketsDroppedFromHeader([[maybe_unused]] const cmsghdr& cmsg) {
285
607399
#ifdef SO_RXQ_OVFL
286
607399
  if (cmsg.cmsg_type == SO_RXQ_OVFL) {
287
53542
    return *reinterpret_cast<const uint32_t*>(CMSG_DATA(&cmsg));
288
53542
  }
289
553857
#endif
290
553857
  return absl::nullopt;
291
607399
}
292

            
293
641369
template <typename T> T getUnsignedIntFromHeader(const cmsghdr& cmsg) {
294
641369
  static_assert(std::is_unsigned_v<T>, "return type must be unsigned integral");
295
641369
  T value;
296
641369
  safeMemcpyUnsafeSrc(&value, CMSG_DATA(&cmsg));
297
641369
  return value;
298
641369
}
299

            
300
641360
template <typename T> absl::optional<T> maybeGetUnsignedIntFromHeader(const cmsghdr& cmsg) {
301
641360
  static_assert(std::is_unsigned_v<T>, "return type must be unsigned integral");
302
641360
  switch (cmsg.cmsg_len) {
303
634519
  case CMSG_LEN(sizeof(uint8_t)):
304
634519
    return static_cast<T>(getUnsignedIntFromHeader<uint8_t>(cmsg));
305
2
  case CMSG_LEN(sizeof(uint16_t)):
306
2
    return static_cast<T>(getUnsignedIntFromHeader<uint16_t>(cmsg));
307
6847
  case CMSG_LEN(sizeof(uint32_t)):
308
6847
    return static_cast<T>(getUnsignedIntFromHeader<uint32_t>(cmsg));
309
  case CMSG_LEN(sizeof(uint64_t)):
310
    return static_cast<T>(getUnsignedIntFromHeader<uint64_t>(cmsg));
311
  default:;
312
641360
  }
313
  IS_ENVOY_BUG(
314
      fmt::format("unexpected cmsg_len value for unsigned integer payload: {}", cmsg.cmsg_len));
315
  return absl::nullopt;
316
641360
}
317

            
318
939035
absl::optional<uint8_t> maybeGetTosFromHeader(const cmsghdr& cmsg) {
319
939035
  if (
320
#ifdef __APPLE__
321
      (cmsg.cmsg_level == IPPROTO_IP && cmsg.cmsg_type == IP_RECVTOS) ||
322
#else
323
939035
      (cmsg.cmsg_level == IPPROTO_IP && cmsg.cmsg_type == IP_TOS) ||
324
939035
#endif // __APPLE__
325
939035
      (cmsg.cmsg_level == IPPROTO_IPV6 && cmsg.cmsg_type == IPV6_TCLASS)) {
326
634523
    return maybeGetUnsignedIntFromHeader<uint8_t>(cmsg);
327
634523
  }
328
304512
  return absl::nullopt;
329
939035
}
330

            
331
Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
332
                                                    const uint64_t num_slice, uint32_t self_port,
333
                                                    const UdpSaveCmsgConfig& save_cmsg_config,
334
436214
                                                    RecvMsgOutput& output) {
335
436214
  ASSERT(!output.msg_.empty());
336

            
337
436214
  size_t cmsg_space = cmsg_space_ + save_cmsg_config.expected_size;
338
436214
  absl::FixedArray<char> cbuf(cmsg_space);
339
436214
  memset(cbuf.begin(), 0, cmsg_space);
340

            
341
436214
  absl::FixedArray<iovec> iov(num_slice);
342
436214
  uint64_t num_slices_for_read = 0;
343
872428
  for (uint64_t i = 0; i < num_slice; i++) {
344
436214
    if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
345
436214
      iov[num_slices_for_read].iov_base = slices[i].mem_;
346
436214
      iov[num_slices_for_read].iov_len = slices[i].len_;
347
436214
      ++num_slices_for_read;
348
436214
    }
349
436214
  }
350
436214
  if (num_slices_for_read == 0) {
351
    return Api::ioCallUint64ResultNoError();
352
  }
353

            
354
436214
  sockaddr_storage peer_addr;
355
436214
  msghdr hdr;
356
436214
  hdr.msg_name = &peer_addr;
357
436214
  hdr.msg_namelen = sizeof(sockaddr_storage);
358
436214
  hdr.msg_iov = iov.begin();
359
436214
  hdr.msg_iovlen = num_slices_for_read;
360
436214
  hdr.msg_flags = 0;
361
436214
  hdr.msg_control = cbuf.begin();
362
436214
  hdr.msg_controllen = cmsg_space_;
363
436214
  Api::SysCallSizeResult result =
364
436214
      Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, messageTruncatedOption());
365
436214
  if (result.return_value_ < 0) {
366
41840
    return sysCallResultToIoCallResult(result);
367
41840
  }
368
394374
  if ((hdr.msg_flags & MSG_TRUNC) != 0) {
369
    ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", result.return_value_);
370
    result.return_value_ = 0;
371
    (*output.dropped_packets_)++;
372
    output.msg_[0].truncated_and_dropped_ = true;
373
    return sysCallResultToIoCallResult(result);
374
  }
375

            
376
394374
  RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
377
394374
                 fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen));
378
394374
  RELEASE_ASSERT(hdr.msg_namelen > 0,
379
394374
                 fmt::format("Unable to get remote address from recvmsg() for fd: {}", fd_));
380
394374
  output.msg_[0].peer_address_ = getOrCreateEnvoyAddressInstance(peer_addr, hdr.msg_namelen);
381
394374
  output.msg_[0].gso_size_ = 0;
382

            
383
394374
  if (hdr.msg_controllen > 0) {
384
    // Get overflow, local address and gso_size from control message.
385
1232676
    for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&hdr); cmsg != nullptr;
386
838396
         cmsg = CMSG_NXTHDR(&hdr, cmsg)) {
387
838396
      if (save_cmsg_config.hasConfig() &&
388
838396
          cmsg->cmsg_type == static_cast<int>(save_cmsg_config.type.value()) &&
389
838396
          cmsg->cmsg_level == static_cast<int>(save_cmsg_config.level.value())) {
390
        Buffer::OwnedImpl cmsg_slice{CMSG_DATA(cmsg), cmsg->cmsg_len};
391
        output.msg_[0].saved_cmsg_ = std::move(cmsg_slice);
392
      }
393
838396
      if (output.msg_[0].local_address_ == nullptr) {
394
444116
        Address::InstanceConstSharedPtr addr = maybeGetDstAddressFromHeader(*cmsg, self_port);
395
444116
        if (addr != nullptr) {
396
          // This is a IP packet info message.
397
394277
          output.msg_[0].local_address_ = std::move(addr);
398
394277
          continue;
399
394277
        }
400
444116
      }
401
444119
      if (output.dropped_packets_ != nullptr) {
402
444118
        absl::optional<uint32_t> maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg);
403
444118
        if (maybe_dropped) {
404
42993
          *output.dropped_packets_ = *maybe_dropped;
405
42993
          continue;
406
42993
        }
407
444118
      }
408
401126
#ifdef UDP_GRO
409
401126
      if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) {
410
6849
        absl::optional<uint16_t> maybe_gso = maybeGetUnsignedIntFromHeader<uint16_t>(*cmsg);
411
6849
        if (maybe_gso) {
412
6849
          output.msg_[0].gso_size_ = *maybe_gso;
413
6849
        }
414
6849
      }
415
401126
#endif
416
401126
      absl::optional<uint8_t> maybe_tos = maybeGetTosFromHeader(*cmsg);
417
401126
      if (maybe_tos) {
418
394273
        output.msg_[0].tos_ = *maybe_tos;
419
394273
      }
420
401126
    }
421
394280
  }
422

            
423
394374
  return sysCallResultToIoCallResult(result);
424
394374
}
425

            
426
Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uint32_t self_port,
427
                                                     const UdpSaveCmsgConfig& save_cmsg_config,
428
125085
                                                     RecvMsgOutput& output) {
429
125085
  ASSERT(output.msg_.size() == slices.size());
430
125085
  if (slices.empty()) {
431
    return sysCallResultToIoCallResult(Api::SysCallIntResult{0, SOCKET_ERROR_AGAIN});
432
  }
433
125085
  const uint32_t num_packets_per_mmsg_call = slices.size();
434
125085
  absl::FixedArray<mmsghdr> mmsg_hdr(num_packets_per_mmsg_call);
435
125085
  absl::FixedArray<absl::FixedArray<struct iovec>> iovs(
436
125085
      num_packets_per_mmsg_call, absl::FixedArray<struct iovec>(slices[0].size()));
437
125085
  absl::FixedArray<sockaddr_storage> raw_addresses(num_packets_per_mmsg_call);
438
125085
  size_t cmsg_space = cmsg_space_ + save_cmsg_config.expected_size;
439
125085
  absl::FixedArray<absl::FixedArray<char>> cbufs(num_packets_per_mmsg_call,
440
125085
                                                 absl::FixedArray<char>(cmsg_space));
441

            
442
2126400
  for (uint32_t i = 0; i < num_packets_per_mmsg_call; ++i) {
443
2001315
    memset(&raw_addresses[i], 0, sizeof(sockaddr_storage));
444
2001315
    memset(cbufs[i].data(), 0, cbufs[i].size());
445

            
446
2001315
    mmsg_hdr[i].msg_len = 0;
447

            
448
2001315
    msghdr* hdr = &mmsg_hdr[i].msg_hdr;
449
2001315
    hdr->msg_name = &raw_addresses[i];
450
2001315
    hdr->msg_namelen = sizeof(sockaddr_storage);
451
2001315
    ASSERT(!slices[i].empty());
452

            
453
4002630
    for (size_t j = 0; j < slices[i].size(); ++j) {
454
2001315
      iovs[i][j].iov_base = slices[i][j].mem_;
455
2001315
      iovs[i][j].iov_len = slices[i][j].len_;
456
2001315
    }
457
2001315
    hdr->msg_iov = iovs[i].data();
458
2001315
    hdr->msg_iovlen = slices[i].size();
459
2001315
    hdr->msg_control = cbufs[i].data();
460
2001315
    hdr->msg_controllen = cbufs[i].size();
461
2001315
  }
462

            
463
  // Set MSG_WAITFORONE so that recvmmsg will not waiting for
464
  // |num_packets_per_mmsg_call| packets to arrive before returning when the
465
  // socket is a blocking socket.
466
125085
  const Api::SysCallIntResult result =
467
125085
      Api::OsSysCallsSingleton::get().recvmmsg(fd_, mmsg_hdr.data(), num_packets_per_mmsg_call,
468
125085
                                               messageTruncatedOption() | MSG_WAITFORONE, nullptr);
469

            
470
125085
  if (result.return_value_ <= 0) {
471
48447
    return sysCallResultToIoCallResult(result);
472
48447
  }
473

            
474
76638
  int num_packets_read = result.return_value_;
475

            
476
317406
  for (int i = 0; i < num_packets_read; ++i) {
477
240768
    msghdr& hdr = mmsg_hdr[i].msg_hdr;
478
240768
    if ((hdr.msg_flags & MSG_TRUNC) != 0) {
479
5
      ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len);
480
5
      (*output.dropped_packets_)++;
481
5
      output.msg_[i].truncated_and_dropped_ = true;
482
5
      continue;
483
5
    }
484

            
485
240763
    RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
486
240763
                   fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen));
487
240763
    RELEASE_ASSERT(hdr.msg_namelen > 0,
488
240763
                   fmt::format("Unable to get remote address from recvmmsg() for fd: {}", fd_));
489

            
490
240763
    output.msg_[i].msg_len_ = mmsg_hdr[i].msg_len;
491
    // Get local and peer addresses for each packet.
492
240763
    output.msg_[i].peer_address_ =
493
240763
        getOrCreateEnvoyAddressInstance(raw_addresses[i], hdr.msg_namelen);
494
240763
    if (hdr.msg_controllen > 0) {
495
240633
      struct cmsghdr* cmsg;
496
778619
      for (cmsg = CMSG_FIRSTHDR(&hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&hdr, cmsg)) {
497
537986
        if (save_cmsg_config.hasConfig() &&
498
537986
            cmsg->cmsg_type == static_cast<int>(save_cmsg_config.type.value()) &&
499
537986
            cmsg->cmsg_level == static_cast<int>(save_cmsg_config.level.value())) {
500
7
          Buffer::OwnedImpl cmsg_slice{CMSG_DATA(cmsg), cmsg->cmsg_len};
501
7
          output.msg_[i].saved_cmsg_ = std::move(cmsg_slice);
502
7
        }
503
537986
        Address::InstanceConstSharedPtr addr = maybeGetDstAddressFromHeader(*cmsg, self_port);
504
537986
        absl::optional<uint8_t> maybe_tos = maybeGetTosFromHeader(*cmsg);
505
537986
        if (maybe_tos) {
506
240283
          output.msg_[0].tos_ = *maybe_tos;
507
240283
          continue;
508
240283
        }
509
297703
        if (addr != nullptr) {
510
          // This is a IP packet info message.
511
240631
          output.msg_[i].local_address_ = std::move(addr);
512
240631
          continue;
513
240631
        }
514
297703
      }
515
240633
    }
516
240763
  }
517
  // Get overflow from first packet header.
518
76638
  if (output.dropped_packets_ != nullptr) {
519
76517
    msghdr& hdr = mmsg_hdr[0].msg_hdr;
520
76517
    if (hdr.msg_controllen > 0) {
521
76515
      struct cmsghdr* cmsg;
522
239828
      for (cmsg = CMSG_FIRSTHDR(&hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&hdr, cmsg)) {
523
163313
        absl::optional<uint32_t> maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg);
524
163313
        if (maybe_dropped) {
525
10549
          *output.dropped_packets_ = *maybe_dropped;
526
10549
        }
527
163313
      }
528
76515
    }
529
76517
  }
530
76638
  return sysCallResultToIoCallResult(result);
531
76638
}
532

            
533
18255
Api::IoCallUint64Result IoSocketHandleImpl::recv(void* buffer, size_t length, int flags) {
534
18255
  const Api::SysCallSizeResult result =
535
18255
      Api::OsSysCallsSingleton::get().recv(fd_, buffer, length, flags);
536
18255
  return sysCallResultToIoCallResult(result);
537
18255
}
538

            
539
42403
Api::SysCallIntResult IoSocketHandleImpl::bind(Address::InstanceConstSharedPtr address) {
540
42403
  return Api::OsSysCallsSingleton::get().bind(fd_, address->sockAddr(), address->sockAddrLen());
541
42403
}
542

            
543
37408
Api::SysCallIntResult IoSocketHandleImpl::listen(int backlog) {
544
37408
  return Api::OsSysCallsSingleton::get().listen(fd_, backlog);
545
37408
}
546

            
547
107948
IoHandlePtr IoSocketHandleImpl::accept(struct sockaddr* addr, socklen_t* addrlen) {
548
107948
  auto result = Api::OsSysCallsSingleton::get().accept(fd_, addr, addrlen);
549
107948
  if (SOCKET_INVALID(result.return_value_)) {
550
53882
    return nullptr;
551
53882
  }
552
54066
  return SocketInterfaceImpl::makePlatformSpecificSocket(result.return_value_, socket_v6only_,
553
54066
                                                         domain_, {});
554
107948
}
555

            
556
56831
Api::SysCallIntResult IoSocketHandleImpl::connect(Address::InstanceConstSharedPtr address) {
557
56831
  auto sockaddr_to_use = address->sockAddr();
558
56831
  auto sockaddr_len_to_use = address->sockAddrLen();
559
#if defined(__APPLE__) || defined(__ANDROID_API__)
560
  sockaddr_in6 sin6;
561
  if (sockaddr_to_use->sa_family == AF_INET && Address::forceV6()) {
562
    const sockaddr_in& sin4 = reinterpret_cast<const sockaddr_in&>(*sockaddr_to_use);
563

            
564
    // Android always uses IPv6 dual stack. Convert IPv4 to the IPv6 mapped address when
565
    // connecting.
566
    memset(&sin6, 0, sizeof(sin6));
567
    sin6.sin6_family = AF_INET6;
568
    sin6.sin6_port = sin4.sin_port;
569
#if defined(__ANDROID_API__)
570
    sin6.sin6_addr.s6_addr32[2] = htonl(0xffff);
571
    sin6.sin6_addr.s6_addr32[3] = sin4.sin_addr.s_addr;
572
#elif defined(__APPLE__)
573
    sin6.sin6_addr.__u6_addr.__u6_addr32[2] = htonl(0xffff);
574
    sin6.sin6_addr.__u6_addr.__u6_addr32[3] = sin4.sin_addr.s_addr;
575
#endif
576
    ASSERT(IN6_IS_ADDR_V4MAPPED(&sin6.sin6_addr));
577

            
578
    sockaddr_to_use = reinterpret_cast<sockaddr*>(&sin6);
579
    sockaddr_len_to_use = sizeof(sin6);
580
  }
581
#endif
582

            
583
56831
  auto result = Api::OsSysCallsSingleton::get().connect(fd_, sockaddr_to_use, sockaddr_len_to_use);
584
56831
  if (result.return_value_ != -1) {
585
3253
    was_connected_ = true;
586
3253
  }
587
56831
  return result;
588
56831
}
589

            
590
221
IoHandlePtr IoSocketHandleImpl::duplicate() {
591
221
  auto result = Api::OsSysCallsSingleton::get().duplicate(fd_);
592
221
  RELEASE_ASSERT(result.return_value_ != -1,
593
221
                 fmt::format("duplicate failed for '{}': ({}) {}", fd_, result.errno_,
594
221
                             errorDetails(result.errno_)));
595
221
  return SocketInterfaceImpl::makePlatformSpecificSocket(result.return_value_, socket_v6only_,
596
221
                                                         domain_, {false, addressCacheMaxSize()});
597
221
}
598

            
599
void IoSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
600
151506
                                             Event::FileTriggerType trigger, uint32_t events) {
601
151506
  ASSERT(file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same "
602
151506
                                 "file descriptor. This is not allowed.");
603
151506
  file_event_ = dispatcher.createFileEvent(fd_, cb, trigger, events);
604
151506
}
605

            
606
1605634
void IoSocketHandleImpl::activateFileEvents(uint32_t events) {
607
1605639
  if (file_event_) {
608
1605636
    file_event_->activate(events);
609
1561879
  } else {
610
3
    ENVOY_BUG(false, "Null file_event_");
611
3
  }
612
1605634
}
613

            
614
226035
void IoSocketHandleImpl::enableFileEvents(uint32_t events) {
615
226036
  if (file_event_) {
616
226036
    file_event_->setEnabled(events);
617
218224
  } else {
618
    ENVOY_BUG(false, "Null file_event_");
619
  }
620
226035
}
621

            
622
3058
Api::SysCallIntResult IoSocketHandleImpl::shutdown(int how) {
623
3058
  return Api::OsSysCallsSingleton::get().shutdown(fd_, how);
624
3058
}
625

            
626
} // namespace Network
627
} // namespace Envoy