1
#include "source/common/network/io_socket_handle_impl.h"
2

            
3
#include <memory>
4

            
5
#include "envoy/buffer/buffer.h"
6

            
7
#include "source/common/api/os_sys_calls_impl.h"
8
#include "source/common/buffer/buffer_impl.h"
9
#include "source/common/common/safe_memcpy.h"
10
#include "source/common/common/utility.h"
11
#include "source/common/event/file_event_impl.h"
12
#include "source/common/network/address_impl.h"
13
#include "source/common/network/socket_interface_impl.h"
14

            
15
#include "absl/container/fixed_array.h"
16
#include "absl/types/optional.h"
17

            
18
using Envoy::Api::SysCallIntResult;
19
using Envoy::Api::SysCallSizeResult;
20

            
21
namespace Envoy {
22

            
23
namespace {
24

            
25
953853
constexpr int messageTypeContainsIP() {
26
#ifdef IP_RECVDSTADDR
27
  return IP_RECVDSTADDR;
28
#else
29
953853
  return IP_PKTINFO;
30
953853
#endif
31
953853
}
32

            
33
628240
in_addr addressFromMessage(const cmsghdr& cmsg) {
34
#ifdef IP_RECVDSTADDR
35
  return *reinterpret_cast<const in_addr*>(CMSG_DATA(&cmsg));
36
#else
37
628240
  auto info = reinterpret_cast<const in_pktinfo*>(CMSG_DATA(&cmsg));
38
628240
  return info->ipi_addr;
39
628240
#endif
40
628240
}
41

            
42
552081
constexpr int messageTruncatedOption() {
43
#if defined(__APPLE__)
44
  // OSX does not support passing `MSG_TRUNC` to recvmsg and recvmmsg. This does not effect
45
  // functionality and it primarily used for logging.
46
  return 0;
47
#else
48
552081
  return MSG_TRUNC;
49
552081
#endif
50
552081
}
51

            
52
} // namespace
53

            
54
namespace Network {
55

            
56
155586
IoSocketHandleImpl::~IoSocketHandleImpl() {
57
155586
  if (SOCKET_VALID(fd_)) {
58
40601
    IoSocketHandleImpl::close();
59
40601
  }
60
155586
}
61

            
62
152515
Api::IoCallUint64Result IoSocketHandleImpl::close() {
63
152515
  if (file_event_) {
64
111685
    file_event_.reset();
65
111685
  }
66

            
67
152515
  ASSERT(SOCKET_VALID(fd_));
68
152515
  const int rc = Api::OsSysCallsSingleton::get().close(fd_).return_value_;
69
152515
  SET_SOCKET_INVALID(fd_);
70
152515
  return {static_cast<unsigned long>(rc), Api::IoError::none()};
71
152515
}
72

            
73
Api::IoCallUint64Result IoSocketHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices,
74
1167435
                                                  uint64_t num_slice) {
75
1167435
  absl::FixedArray<iovec> iov(num_slice);
76
1167435
  uint64_t num_slices_to_read = 0;
77
1167435
  uint64_t num_bytes_to_read = 0;
78
9531354
  for (; num_slices_to_read < num_slice && num_bytes_to_read < max_length; num_slices_to_read++) {
79
8363919
    iov[num_slices_to_read].iov_base = slices[num_slices_to_read].mem_;
80
8363919
    const size_t slice_length = std::min(slices[num_slices_to_read].len_,
81
8363919
                                         static_cast<size_t>(max_length - num_bytes_to_read));
82
8363919
    iov[num_slices_to_read].iov_len = slice_length;
83
8363919
    num_bytes_to_read += slice_length;
84
8363919
  }
85
1167435
  ASSERT(num_bytes_to_read <= max_length);
86

            
87
1167435
  if (num_slices_to_read == 1) {
88
    // Avoid paying the VFS overhead when there is only one IO buffer to work with
89
134573
    return sysCallResultToIoCallResult(
90
134573
        Api::OsSysCallsSingleton::get().recv(fd_, iov[0].iov_base, iov[0].iov_len, 0));
91
134573
  }
92

            
93
1032862
  auto result = sysCallResultToIoCallResult(Api::OsSysCallsSingleton::get().readv(
94
1032862
      fd_, iov.begin(), static_cast<int>(num_slices_to_read)));
95
1032862
  return result;
96
1167435
}
97

            
98
Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer,
99
1086385
                                                 absl::optional<uint64_t> max_length_opt) {
100
1086385
  const uint64_t max_length = max_length_opt.value_or(UINT64_MAX);
101
1086385
  if (max_length == 0) {
102
2
    return Api::ioCallUint64ResultNoError();
103
2
  }
104
1086383
  Buffer::Reservation reservation = buffer.reserveForRead();
105
1086383
  Api::IoCallUint64Result result = readv(std::min(reservation.length(), max_length),
106
1086383
                                         reservation.slices(), reservation.numSlices());
107
1086383
  uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0;
108
1086383
  ASSERT(bytes_to_commit <= max_length);
109
1086383
  reservation.commit(bytes_to_commit);
110
1086383
  return result;
111
1086385
}
112

            
113
Api::IoCallUint64Result IoSocketHandleImpl::writev(const Buffer::RawSlice* slices,
114
848266
                                                   uint64_t num_slice) {
115
848266
  absl::FixedArray<iovec> iov(num_slice);
116
848266
  uint64_t num_slices_to_write = 0;
117
2497945
  for (uint64_t i = 0; i < num_slice; i++) {
118
1649684
    if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
119
1649682
      iov[num_slices_to_write].iov_base = slices[i].mem_;
120
1649682
      iov[num_slices_to_write].iov_len = slices[i].len_;
121
1649682
      num_slices_to_write++;
122
1649682
    }
123
1649679
  }
124
848266
  if (num_slices_to_write == 0) {
125
2
    return Api::ioCallUint64ResultNoError();
126
2
  }
127

            
128
848264
  if (num_slices_to_write == 1) {
129
    // Avoid paying the VFS overhead when there is only one IO buffer to work with
130
673545
    return sysCallResultToIoCallResult(
131
673545
        Api::OsSysCallsSingleton::get().send(fd_, iov[0].iov_base, iov[0].iov_len, 0));
132
673545
  }
133

            
134
174719
  auto result = sysCallResultToIoCallResult(
135
174719
      Api::OsSysCallsSingleton::get().writev(fd_, iov.begin(), num_slices_to_write));
136
174719
  return result;
137
848264
}
138

            
139
583021
Api::IoCallUint64Result IoSocketHandleImpl::write(Buffer::Instance& buffer) {
140
583021
  constexpr uint64_t MaxSlices = 16;
141
583021
  Buffer::RawSliceVector slices = buffer.getRawSlices(MaxSlices);
142
583021
  Api::IoCallUint64Result result = writev(slices.begin(), slices.size());
143
583023
  if (result.ok() && result.return_value_ > 0) {
144
580509
    buffer.drain(static_cast<uint64_t>(result.return_value_));
145
580509
  }
146
583021
  return result;
147
583021
}
148

            
149
Api::IoCallUint64Result IoSocketHandleImpl::sendmsg(const Buffer::RawSlice* slices,
150
                                                    uint64_t num_slice, int flags,
151
                                                    const Address::Ip* self_ip,
152
354064
                                                    const Address::Instance& peer_address) {
153
354064
  const auto* address_base = dynamic_cast<const Address::InstanceBase*>(&peer_address);
154
354064
  sockaddr* sock_addr = const_cast<sockaddr*>(address_base->sockAddr());
155
354064
  if (sock_addr == nullptr) {
156
    // Unlikely to happen unless the wrong peer address is passed.
157
1
    return IoSocketError::ioResultSocketInvalidAddress();
158
1
  }
159
354063
  absl::FixedArray<iovec> iov(num_slice);
160
354063
  uint64_t num_slices_to_write = 0;
161
708126
  for (uint64_t i = 0; i < num_slice; i++) {
162
354063
    if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
163
354063
      iov[num_slices_to_write].iov_base = slices[i].mem_;
164
354063
      iov[num_slices_to_write].iov_len = slices[i].len_;
165
354063
      num_slices_to_write++;
166
354063
    }
167
354063
  }
168
354063
  if (num_slices_to_write == 0) {
169
    return Api::ioCallUint64ResultNoError();
170
  }
171

            
172
354063
  msghdr message;
173
354063
  message.msg_name = reinterpret_cast<void*>(sock_addr);
174
354063
  message.msg_namelen = address_base->sockAddrLen();
175
354063
  message.msg_iov = iov.begin();
176
354063
  message.msg_iovlen = num_slices_to_write;
177
354063
  message.msg_flags = 0;
178
354063
  auto& os_syscalls = Api::OsSysCallsSingleton::get();
179
354063
  if (self_ip == nullptr) {
180
563
    message.msg_control = nullptr;
181
563
    message.msg_controllen = 0;
182
563
    const Api::SysCallSizeResult result = os_syscalls.sendmsg(fd_, &message, flags);
183
563
    return sysCallResultToIoCallResult(result);
184
353917
  } else {
185
353500
    const size_t space_v6 = CMSG_SPACE(sizeof(in6_pktinfo));
186
353500
    const size_t space_v4 = CMSG_SPACE(sizeof(in_pktinfo));
187

            
188
    // FreeBSD only needs in_addr size, but allocates more to unify code in two platforms.
189
353500
    const size_t cmsg_space = (self_ip->version() == Address::IpVersion::v4) ? space_v4 : space_v6;
190
    // kSpaceForIp should be big enough to hold both IPv4 and IPv6 packet info.
191
353500
    absl::FixedArray<char> cbuf(cmsg_space);
192
353500
    memset(cbuf.begin(), 0, cmsg_space);
193

            
194
353500
    message.msg_control = cbuf.begin();
195
353500
    message.msg_controllen = cmsg_space;
196
353500
    cmsghdr* const cmsg = CMSG_FIRSTHDR(&message);
197
353500
    RELEASE_ASSERT(cmsg != nullptr, fmt::format("cbuf with size {} is not enough, cmsghdr size {}",
198
353500
                                                sizeof(cbuf), sizeof(cmsghdr)));
199
353500
    if (self_ip->version() == Address::IpVersion::v4) {
200
353494
      cmsg->cmsg_level = IPPROTO_IP;
201
353494
#ifndef IP_SENDSRCADDR
202
353494
      cmsg->cmsg_len = CMSG_LEN(sizeof(in_pktinfo));
203
353494
      cmsg->cmsg_type = IP_PKTINFO;
204
353494
      auto pktinfo = reinterpret_cast<in_pktinfo*>(CMSG_DATA(cmsg));
205
353494
      pktinfo->ipi_ifindex = 0;
206
#ifdef WIN32
207
      pktinfo->ipi_addr.s_addr = self_ip->ipv4()->address();
208
#else
209
353494
      pktinfo->ipi_spec_dst.s_addr = self_ip->ipv4()->address();
210
353494
#endif
211
#else
212
      cmsg->cmsg_type = IP_SENDSRCADDR;
213
      cmsg->cmsg_len = CMSG_LEN(sizeof(in_addr));
214
      *(reinterpret_cast<struct in_addr*>(CMSG_DATA(cmsg))).s_addr = self_ip->ipv4()->address();
215
#endif
216
353500
    } else if (self_ip->version() == Address::IpVersion::v6) {
217
6
      cmsg->cmsg_len = CMSG_LEN(sizeof(in6_pktinfo));
218
6
      cmsg->cmsg_level = IPPROTO_IPV6;
219
6
      cmsg->cmsg_type = IPV6_PKTINFO;
220
6
      auto pktinfo = reinterpret_cast<in6_pktinfo*>(CMSG_DATA(cmsg));
221
6
      pktinfo->ipi6_ifindex = 0;
222
6
      *(reinterpret_cast<absl::uint128*>(pktinfo->ipi6_addr.s6_addr)) = self_ip->ipv6()->address();
223
6
    }
224
353500
    const Api::SysCallSizeResult result = os_syscalls.sendmsg(fd_, &message, flags);
225
353500
    if (result.return_value_ < 0 && result.errno_ == SOCKET_ERROR_INVAL) {
226
1
      ENVOY_LOG(error, fmt::format("EINVAL error. Socket is open: {}, IPv{}.", isOpen(),
227
1
                                   self_ip->version() == Address::IpVersion::v6 ? 6 : 4));
228
1
    }
229
353500
    return sysCallResultToIoCallResult(result);
230
353500
  }
231
354063
}
232

            
233
Address::InstanceConstSharedPtr
234
1256810
IoSocketHandleImpl::getOrCreateEnvoyAddressInstance(sockaddr_storage ss, socklen_t ss_len) {
235
1256810
  if (!recent_received_addresses_) {
236
478981
    return Address::addressFromSockAddrOrDie(ss, ss_len, fd_, socket_v6only_);
237
478981
  }
238
777829
  quic::QuicSocketAddress quic_address(ss);
239
777829
  auto it = std::find_if(
240
777829
      recent_received_addresses_->begin(), recent_received_addresses_->end(),
241
777908
      [&quic_address](const QuicEnvoyAddressPair& pair) { return pair.first == quic_address; });
242
777829
  if (it != recent_received_addresses_->end()) {
243
771897
    Address::InstanceConstSharedPtr cached_addr = it->second;
244
    // Move the entry to the back of the list since it's the most recently accessed entry.
245
771897
    std::rotate(it, it + 1, recent_received_addresses_->end());
246
771897
    return cached_addr;
247
771897
  }
248
5932
  Address::InstanceConstSharedPtr new_address =
249
5932
      Address::addressFromSockAddrOrDie(ss, ss_len, fd_, socket_v6only_);
250
5932
  recent_received_addresses_->push_back(QuicEnvoyAddressPair(quic_address, new_address));
251
5932
  if (recent_received_addresses_->size() > address_cache_max_capacity_) {
252
    // Over capacity so remove the first element in the list, which is the least recently accessed.
253
21
    recent_received_addresses_->erase(recent_received_addresses_->begin());
254
21
  }
255
5932
  return new_address;
256
777829
}
257

            
258
Address::InstanceConstSharedPtr
259
953857
IoSocketHandleImpl::maybeGetDstAddressFromHeader(const cmsghdr& cmsg, uint32_t self_port) {
260
953857
  if (cmsg.cmsg_type == IPV6_PKTINFO) {
261
    auto info = reinterpret_cast<const in6_pktinfo*>(CMSG_DATA(&cmsg));
262
    sockaddr_storage ss;
263
    auto ipv6_addr = reinterpret_cast<sockaddr_in6*>(&ss);
264
    memset(ipv6_addr, 0, sizeof(sockaddr_in6));
265
    ipv6_addr->sin6_family = AF_INET6;
266
    ipv6_addr->sin6_addr = info->ipi6_addr;
267
    ipv6_addr->sin6_port = htons(self_port);
268
    return getOrCreateEnvoyAddressInstance(ss, sizeof(sockaddr_in6));
269
  }
270

            
271
953857
  if (cmsg.cmsg_type == messageTypeContainsIP()) {
272
628242
    sockaddr_storage ss;
273
628242
    auto ipv4_addr = reinterpret_cast<sockaddr_in*>(&ss);
274
628242
    memset(ipv4_addr, 0, sizeof(sockaddr_in));
275
628242
    ipv4_addr->sin_family = AF_INET;
276
628242
    ipv4_addr->sin_addr = addressFromMessage(cmsg);
277
628242
    ipv4_addr->sin_port = htons(self_port);
278
628242
    return getOrCreateEnvoyAddressInstance(ss, sizeof(sockaddr_in));
279
628242
  }
280

            
281
325615
  return nullptr;
282
953857
}
283

            
284
566547
absl::optional<uint32_t> maybeGetPacketsDroppedFromHeader([[maybe_unused]] const cmsghdr& cmsg) {
285
566547
#ifdef SO_RXQ_OVFL
286
566547
  if (cmsg.cmsg_type == SO_RXQ_OVFL) {
287
22870
    return *reinterpret_cast<const uint32_t*>(CMSG_DATA(&cmsg));
288
22870
  }
289
543677
#endif
290
543677
  return absl::nullopt;
291
566547
}
292

            
293
634808
template <typename T> T getUnsignedIntFromHeader(const cmsghdr& cmsg) {
294
634808
  static_assert(std::is_unsigned_v<T>, "return type must be unsigned integral");
295
634808
  T value;
296
634808
  safeMemcpyUnsafeSrc(&value, CMSG_DATA(&cmsg));
297
634808
  return value;
298
634808
}
299

            
300
634815
template <typename T> absl::optional<T> maybeGetUnsignedIntFromHeader(const cmsghdr& cmsg) {
301
634815
  static_assert(std::is_unsigned_v<T>, "return type must be unsigned integral");
302
634815
  switch (cmsg.cmsg_len) {
303
627894
  case CMSG_LEN(sizeof(uint8_t)):
304
627894
    return static_cast<T>(getUnsignedIntFromHeader<uint8_t>(cmsg));
305
2
  case CMSG_LEN(sizeof(uint16_t)):
306
2
    return static_cast<T>(getUnsignedIntFromHeader<uint16_t>(cmsg));
307
6921
  case CMSG_LEN(sizeof(uint32_t)):
308
6921
    return static_cast<T>(getUnsignedIntFromHeader<uint32_t>(cmsg));
309
  case CMSG_LEN(sizeof(uint64_t)):
310
    return static_cast<T>(getUnsignedIntFromHeader<uint64_t>(cmsg));
311
  default:;
312
634815
  }
313
  IS_ENVOY_BUG(
314
      fmt::format("unexpected cmsg_len value for unsigned integer payload: {}", cmsg.cmsg_len));
315
  return absl::nullopt;
316
634815
}
317

            
318
941192
absl::optional<uint8_t> maybeGetTosFromHeader(const cmsghdr& cmsg) {
319
941192
  if (
320
#ifdef __APPLE__
321
      (cmsg.cmsg_level == IPPROTO_IP && cmsg.cmsg_type == IP_RECVTOS) ||
322
#else
323
941192
      (cmsg.cmsg_level == IPPROTO_IP && cmsg.cmsg_type == IP_TOS) ||
324
941192
#endif // __APPLE__
325
941192
      (cmsg.cmsg_level == IPPROTO_IPV6 && cmsg.cmsg_type == IPV6_TCLASS)) {
326
627893
    return maybeGetUnsignedIntFromHeader<uint8_t>(cmsg);
327
627893
  }
328
313299
  return absl::nullopt;
329
941192
}
330

            
331
Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
332
                                                    const uint64_t num_slice, uint32_t self_port,
333
                                                    const UdpSaveCmsgConfig& save_cmsg_config,
334
432404
                                                    RecvMsgOutput& output) {
335
432404
  ASSERT(!output.msg_.empty());
336

            
337
432404
  size_t cmsg_space = cmsg_space_ + save_cmsg_config.expected_size;
338
432404
  absl::FixedArray<char> cbuf(cmsg_space);
339
432404
  memset(cbuf.begin(), 0, cmsg_space);
340

            
341
432404
  absl::FixedArray<iovec> iov(num_slice);
342
432404
  uint64_t num_slices_for_read = 0;
343
864808
  for (uint64_t i = 0; i < num_slice; i++) {
344
432404
    if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
345
432404
      iov[num_slices_for_read].iov_base = slices[i].mem_;
346
432404
      iov[num_slices_for_read].iov_len = slices[i].len_;
347
432404
      ++num_slices_for_read;
348
432404
    }
349
432404
  }
350
432404
  if (num_slices_for_read == 0) {
351
    return Api::ioCallUint64ResultNoError();
352
  }
353

            
354
432404
  sockaddr_storage peer_addr;
355
432404
  msghdr hdr;
356
432404
  hdr.msg_name = &peer_addr;
357
432404
  hdr.msg_namelen = sizeof(sockaddr_storage);
358
432404
  hdr.msg_iov = iov.begin();
359
432404
  hdr.msg_iovlen = num_slices_for_read;
360
432404
  hdr.msg_flags = 0;
361
432404
  hdr.msg_control = cbuf.begin();
362
432404
  hdr.msg_controllen = cmsg_space_;
363
432404
  Api::SysCallSizeResult result =
364
432404
      Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, messageTruncatedOption());
365
432404
  if (result.return_value_ < 0) {
366
43409
    return sysCallResultToIoCallResult(result);
367
43409
  }
368
388995
  if ((hdr.msg_flags & MSG_TRUNC) != 0) {
369
    ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", result.return_value_);
370
    result.return_value_ = 0;
371
    (*output.dropped_packets_)++;
372
    output.msg_[0].truncated_and_dropped_ = true;
373
    return sysCallResultToIoCallResult(result);
374
  }
375

            
376
388995
  RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
377
388995
                 fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen));
378
388995
  RELEASE_ASSERT(hdr.msg_namelen > 0,
379
388995
                 fmt::format("Unable to get remote address from recvmsg() for fd: {}", fd_));
380
388995
  output.msg_[0].peer_address_ = getOrCreateEnvoyAddressInstance(peer_addr, hdr.msg_namelen);
381
388995
  output.msg_[0].gso_size_ = 0;
382

            
383
388995
  if (hdr.msg_controllen > 0) {
384
    // Get overflow, local address and gso_size from control message.
385
1186325
    for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&hdr); cmsg != nullptr;
386
797424
         cmsg = CMSG_NXTHDR(&hdr, cmsg)) {
387
797424
      if (save_cmsg_config.hasConfig() &&
388
797424
          cmsg->cmsg_type == static_cast<int>(save_cmsg_config.type.value()) &&
389
797424
          cmsg->cmsg_level == static_cast<int>(save_cmsg_config.level.value())) {
390
        Buffer::OwnedImpl cmsg_slice{CMSG_DATA(cmsg), cmsg->cmsg_len};
391
        output.msg_[0].saved_cmsg_ = std::move(cmsg_slice);
392
      }
393
797424
      if (output.msg_[0].local_address_ == nullptr) {
394
408523
        Address::InstanceConstSharedPtr addr = maybeGetDstAddressFromHeader(*cmsg, self_port);
395
408523
        if (addr != nullptr) {
396
          // This is a IP packet info message.
397
388899
          output.msg_[0].local_address_ = std::move(addr);
398
388899
          continue;
399
388899
        }
400
408523
      }
401
408525
      if (output.dropped_packets_ != nullptr) {
402
408525
        absl::optional<uint32_t> maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg);
403
408525
        if (maybe_dropped) {
404
12705
          *output.dropped_packets_ = *maybe_dropped;
405
12705
          continue;
406
12705
        }
407
408525
      }
408
395820
#ifdef UDP_GRO
409
395820
      if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) {
410
6923
        absl::optional<uint16_t> maybe_gso = maybeGetUnsignedIntFromHeader<uint16_t>(*cmsg);
411
6923
        if (maybe_gso) {
412
6923
          output.msg_[0].gso_size_ = *maybe_gso;
413
6923
        }
414
6923
      }
415
395820
#endif
416
395820
      absl::optional<uint8_t> maybe_tos = maybeGetTosFromHeader(*cmsg);
417
395820
      if (maybe_tos) {
418
388895
        output.msg_[0].tos_ = *maybe_tos;
419
388895
      }
420
395820
    }
421
388901
  }
422

            
423
388995
  return sysCallResultToIoCallResult(result);
424
388995
}
425

            
426
Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uint32_t self_port,
427
                                                     const UdpSaveCmsgConfig& save_cmsg_config,
428
119681
                                                     RecvMsgOutput& output) {
429
119681
  ASSERT(output.msg_.size() == slices.size());
430
119681
  if (slices.empty()) {
431
    return sysCallResultToIoCallResult(Api::SysCallIntResult{0, SOCKET_ERROR_AGAIN});
432
  }
433
119681
  const uint32_t num_packets_per_mmsg_call = slices.size();
434
119681
  absl::FixedArray<mmsghdr> mmsg_hdr(num_packets_per_mmsg_call);
435
119681
  absl::FixedArray<absl::FixedArray<struct iovec>> iovs(
436
119681
      num_packets_per_mmsg_call, absl::FixedArray<struct iovec>(slices[0].size()));
437
119681
  absl::FixedArray<sockaddr_storage> raw_addresses(num_packets_per_mmsg_call);
438
119681
  size_t cmsg_space = cmsg_space_ + save_cmsg_config.expected_size;
439
119681
  absl::FixedArray<absl::FixedArray<char>> cbufs(num_packets_per_mmsg_call,
440
119681
                                                 absl::FixedArray<char>(cmsg_space));
441

            
442
2034524
  for (uint32_t i = 0; i < num_packets_per_mmsg_call; ++i) {
443
1914843
    memset(&raw_addresses[i], 0, sizeof(sockaddr_storage));
444
1914843
    memset(cbufs[i].data(), 0, cbufs[i].size());
445

            
446
1914843
    mmsg_hdr[i].msg_len = 0;
447

            
448
1914843
    msghdr* hdr = &mmsg_hdr[i].msg_hdr;
449
1914843
    hdr->msg_name = &raw_addresses[i];
450
1914843
    hdr->msg_namelen = sizeof(sockaddr_storage);
451
1914843
    ASSERT(!slices[i].empty());
452

            
453
3829688
    for (size_t j = 0; j < slices[i].size(); ++j) {
454
1914845
      iovs[i][j].iov_base = slices[i][j].mem_;
455
1914845
      iovs[i][j].iov_len = slices[i][j].len_;
456
1914845
    }
457
1914843
    hdr->msg_iov = iovs[i].data();
458
1914843
    hdr->msg_iovlen = slices[i].size();
459
1914843
    hdr->msg_control = cbufs[i].data();
460
1914843
    hdr->msg_controllen = cbufs[i].size();
461
1914843
  }
462

            
463
  // Set MSG_WAITFORONE so that recvmmsg will not waiting for
464
  // |num_packets_per_mmsg_call| packets to arrive before returning when the
465
  // socket is a blocking socket.
466
119681
  const Api::SysCallIntResult result =
467
119681
      Api::OsSysCallsSingleton::get().recvmmsg(fd_, mmsg_hdr.data(), num_packets_per_mmsg_call,
468
119681
                                               messageTruncatedOption() | MSG_WAITFORONE, nullptr);
469

            
470
119681
  if (result.return_value_ <= 0) {
471
45487
    return sysCallResultToIoCallResult(result);
472
45487
  }
473

            
474
74194
  int num_packets_read = result.return_value_;
475

            
476
313697
  for (int i = 0; i < num_packets_read; ++i) {
477
239503
    msghdr& hdr = mmsg_hdr[i].msg_hdr;
478
239503
    if ((hdr.msg_flags & MSG_TRUNC) != 0) {
479
5
      ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len);
480
5
      (*output.dropped_packets_)++;
481
5
      output.msg_[i].truncated_and_dropped_ = true;
482
5
      continue;
483
5
    }
484

            
485
239498
    RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
486
239498
                   fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen));
487
239498
    RELEASE_ASSERT(hdr.msg_namelen > 0,
488
239498
                   fmt::format("Unable to get remote address from recvmmsg() for fd: {}", fd_));
489

            
490
239498
    output.msg_[i].msg_len_ = mmsg_hdr[i].msg_len;
491
    // Get local and peer addresses for each packet.
492
239498
    output.msg_[i].peer_address_ =
493
239498
        getOrCreateEnvoyAddressInstance(raw_addresses[i], hdr.msg_namelen);
494
239498
    if (hdr.msg_controllen > 0) {
495
239369
      struct cmsghdr* cmsg;
496
784762
      for (cmsg = CMSG_FIRSTHDR(&hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&hdr, cmsg)) {
497
545393
        if (save_cmsg_config.hasConfig() &&
498
545393
            cmsg->cmsg_type == static_cast<int>(save_cmsg_config.type.value()) &&
499
545393
            cmsg->cmsg_level == static_cast<int>(save_cmsg_config.level.value())) {
500
7
          Buffer::OwnedImpl cmsg_slice{CMSG_DATA(cmsg), cmsg->cmsg_len};
501
7
          output.msg_[i].saved_cmsg_ = std::move(cmsg_slice);
502
7
        }
503
545393
        Address::InstanceConstSharedPtr addr = maybeGetDstAddressFromHeader(*cmsg, self_port);
504
545393
        absl::optional<uint8_t> maybe_tos = maybeGetTosFromHeader(*cmsg);
505
545393
        if (maybe_tos) {
506
239019
          output.msg_[0].tos_ = *maybe_tos;
507
239019
          continue;
508
239019
        }
509
306374
        if (addr != nullptr) {
510
          // This is a IP packet info message.
511
239367
          output.msg_[i].local_address_ = std::move(addr);
512
239367
          continue;
513
239367
        }
514
306374
      }
515
239369
    }
516
239498
  }
517
  // Get overflow from first packet header.
518
74194
  if (output.dropped_packets_ != nullptr) {
519
74073
    msghdr& hdr = mmsg_hdr[0].msg_hdr;
520
74073
    if (hdr.msg_controllen > 0) {
521
74071
      struct cmsghdr* cmsg;
522
232112
      for (cmsg = CMSG_FIRSTHDR(&hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&hdr, cmsg)) {
523
158041
        absl::optional<uint32_t> maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg);
524
158041
        if (maybe_dropped) {
525
10165
          *output.dropped_packets_ = *maybe_dropped;
526
10165
        }
527
158041
      }
528
74071
    }
529
74073
  }
530
74194
  return sysCallResultToIoCallResult(result);
531
74194
}
532

            
533
18259
Api::IoCallUint64Result IoSocketHandleImpl::recv(void* buffer, size_t length, int flags) {
534
18259
  const Api::SysCallSizeResult result =
535
18259
      Api::OsSysCallsSingleton::get().recv(fd_, buffer, length, flags);
536
18259
  return sysCallResultToIoCallResult(result);
537
18259
}
538

            
539
42334
Api::SysCallIntResult IoSocketHandleImpl::bind(Address::InstanceConstSharedPtr address) {
540
42334
  return Api::OsSysCallsSingleton::get().bind(fd_, address->sockAddr(), address->sockAddrLen());
541
42334
}
542

            
543
37335
Api::SysCallIntResult IoSocketHandleImpl::listen(int backlog) {
544
37335
  return Api::OsSysCallsSingleton::get().listen(fd_, backlog);
545
37335
}
546

            
547
107626
IoHandlePtr IoSocketHandleImpl::accept(struct sockaddr* addr, socklen_t* addrlen) {
548
107626
  auto result = Api::OsSysCallsSingleton::get().accept(fd_, addr, addrlen);
549
107626
  if (SOCKET_INVALID(result.return_value_)) {
550
53469
    return nullptr;
551
53469
  }
552
54157
  return SocketInterfaceImpl::makePlatformSpecificSocket(result.return_value_, socket_v6only_,
553
54157
                                                         domain_, {});
554
107626
}
555

            
556
56943
Api::SysCallIntResult IoSocketHandleImpl::connect(Address::InstanceConstSharedPtr address) {
557
56943
  auto sockaddr_to_use = address->sockAddr();
558
56943
  auto sockaddr_len_to_use = address->sockAddrLen();
559
#if defined(__APPLE__) || defined(__ANDROID_API__)
560
  sockaddr_in6 sin6;
561
  if (sockaddr_to_use->sa_family == AF_INET && Address::forceV6()) {
562
    const sockaddr_in& sin4 = reinterpret_cast<const sockaddr_in&>(*sockaddr_to_use);
563

            
564
    // Android always uses IPv6 dual stack. Convert IPv4 to the IPv6 mapped address when
565
    // connecting.
566
    memset(&sin6, 0, sizeof(sin6));
567
    sin6.sin6_family = AF_INET6;
568
    sin6.sin6_port = sin4.sin_port;
569
#if defined(__ANDROID_API__)
570
    sin6.sin6_addr.s6_addr32[2] = htonl(0xffff);
571
    sin6.sin6_addr.s6_addr32[3] = sin4.sin_addr.s_addr;
572
#elif defined(__APPLE__)
573
    sin6.sin6_addr.__u6_addr.__u6_addr32[2] = htonl(0xffff);
574
    sin6.sin6_addr.__u6_addr.__u6_addr32[3] = sin4.sin_addr.s_addr;
575
#endif
576
    ASSERT(IN6_IS_ADDR_V4MAPPED(&sin6.sin6_addr));
577

            
578
    sockaddr_to_use = reinterpret_cast<sockaddr*>(&sin6);
579
    sockaddr_len_to_use = sizeof(sin6);
580
  }
581
#endif
582

            
583
56943
  auto result = Api::OsSysCallsSingleton::get().connect(fd_, sockaddr_to_use, sockaddr_len_to_use);
584
56943
  if (result.return_value_ != -1) {
585
3252
    was_connected_ = true;
586
3252
  }
587
56943
  return result;
588
56943
}
589

            
590
221
IoHandlePtr IoSocketHandleImpl::duplicate() {
591
221
  auto result = Api::OsSysCallsSingleton::get().duplicate(fd_);
592
221
  RELEASE_ASSERT(result.return_value_ != -1,
593
221
                 fmt::format("duplicate failed for '{}': ({}) {}", fd_, result.errno_,
594
221
                             errorDetails(result.errno_)));
595
221
  return SocketInterfaceImpl::makePlatformSpecificSocket(result.return_value_, socket_v6only_,
596
221
                                                         domain_, {false, addressCacheMaxSize()});
597
221
}
598

            
599
void IoSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
600
151635
                                             Event::FileTriggerType trigger, uint32_t events) {
601
151635
  ASSERT(file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same "
602
151635
                                 "file descriptor. This is not allowed.");
603
151635
  file_event_ = dispatcher.createFileEvent(fd_, cb, trigger, events);
604
151635
}
605

            
606
1621461
void IoSocketHandleImpl::activateFileEvents(uint32_t events) {
607
1621467
  if (file_event_) {
608
1621463
    file_event_->activate(events);
609
1559868
  } else {
610
4
    ENVOY_BUG(false, "Null file_event_");
611
4
  }
612
1621461
}
613

            
614
226399
void IoSocketHandleImpl::enableFileEvents(uint32_t events) {
615
226399
  if (file_event_) {
616
226399
    file_event_->setEnabled(events);
617
226399
  } else {
618
    ENVOY_BUG(false, "Null file_event_");
619
  }
620
226399
}
621

            
622
2951
Api::SysCallIntResult IoSocketHandleImpl::shutdown(int how) {
623
2951
  return Api::OsSysCallsSingleton::get().shutdown(fd_, how);
624
2951
}
625

            
626
} // namespace Network
627
} // namespace Envoy