1
#include "source/common/network/socket_interface_impl.h"
2

            
3
#include "envoy/common/exception.h"
4
#include "envoy/extensions/network/socket_interface/v3/default_socket_interface.pb.h"
5
#include "envoy/extensions/network/socket_interface/v3/default_socket_interface.pb.validate.h"
6

            
7
#include "source/common/api/os_sys_calls_impl.h"
8
#include "source/common/common/assert.h"
9
#include "source/common/common/utility.h"
10
#include "source/common/network/address_impl.h"
11
#include "source/common/network/io_socket_handle_impl.h"
12
#include "source/common/network/win32_socket_handle_impl.h"
13

            
14
#if defined(__linux__) && !defined(__ANDROID_API__) && defined(ENVOY_ENABLE_IO_URING)
15
#include "source/common/io/io_uring_impl.h"
16
#include "source/common/io/io_uring_worker_factory_impl.h"
17
#include "source/common/network/io_uring_socket_handle_impl.h"
18
#endif
19

            
20
namespace Envoy {
21
namespace Network {
22

            
23
namespace {
24
247536
[[maybe_unused]] bool hasIoUringWorkerFactory(Io::IoUringWorkerFactory* io_uring_worker_factory) {
25
247536
  return io_uring_worker_factory != nullptr && io_uring_worker_factory->currentThreadRegistered() &&
26
247536
         io_uring_worker_factory->getIoUringWorker() != absl::nullopt;
27
247536
}
28
} // namespace
29

            
30
6
void DefaultSocketInterfaceExtension::onWorkerThreadInitialized() {
31
6
  if (io_uring_worker_factory_ != nullptr) {
32
1
    io_uring_worker_factory_->onWorkerThreadInitialized();
33
1
  }
34
6
}
35

            
36
IoHandlePtr SocketInterfaceImpl::makePlatformSpecificSocket(
37
    int socket_fd, bool socket_v6only, absl::optional<int> domain,
38
    const SocketCreationOptions& options,
39
150405
    [[maybe_unused]] Io::IoUringWorkerFactory* io_uring_worker_factory) {
40
  if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
41
    return std::make_unique<Win32SocketHandleImpl>(socket_fd, socket_v6only, domain);
42
  }
43
150405
#if defined(__linux__) && !defined(__ANDROID_API__) && defined(ENVOY_ENABLE_IO_URING)
44
  // Only create IoUringSocketHandleImpl when the IoUringWorkerFactory has been created and it has
45
  // been registered in the TLS, initialized. There are cases that test may create threads before
46
  // IoUringWorkerFactory has been added to the TLS and got initialized.
47
150405
  if (hasIoUringWorkerFactory(io_uring_worker_factory)) {
48
2
    return std::make_unique<IoUringSocketHandleImpl>(*io_uring_worker_factory, socket_fd,
49
2
                                                     socket_v6only, domain);
50
2
  }
51
150403
#endif
52
150403
  return std::make_unique<IoSocketHandleImpl>(socket_fd, socket_v6only, domain,
53
150403
                                              options.max_addresses_cache_size_);
54
150405
}
55

            
56
IoHandlePtr SocketInterfaceImpl::makeSocket(int socket_fd, bool socket_v6only,
57
                                            Socket::Type socket_type, absl::optional<int> domain,
58
95895
                                            const SocketCreationOptions& options) const {
59
95895
  if (socket_type == Socket::Type::Datagram) {
60
6076
    return makePlatformSpecificSocket(socket_fd, socket_v6only, domain, options, nullptr);
61
6076
  }
62
89819
  return makePlatformSpecificSocket(socket_fd, socket_v6only, domain, options,
63
89819
                                    io_uring_worker_factory_.lock().get());
64
95895
}
65

            
66
IoHandlePtr SocketInterfaceImpl::socket(Socket::Type socket_type, Address::Type addr_type,
67
                                        Address::IpVersion version, bool socket_v6only,
68
97131
                                        const SocketCreationOptions& options) const {
69
97131
  int protocol = 0;
70
#if defined(__APPLE__) || defined(WIN32)
71
  ASSERT(!options.mptcp_enabled_, "MPTCP is only supported on Linux");
72
  int flags = 0;
73
#else
74
97131
  int flags = SOCK_NONBLOCK;
75

            
76
  // When io_uring is enabled, SOCK_NONBLOCK becomes redundant. io_uring can multiplex sockets on
77
  // its own, and the EAGAIN caused by SOCK_NONBLOCK can lead to unnecessary event triggers.
78
97131
  if (hasIoUringWorkerFactory(io_uring_worker_factory_.lock().get()) &&
79
97131
      socket_type == Socket::Type::Stream) {
80
1
    flags = 0;
81
1
  }
82

            
83
97131
  if (options.mptcp_enabled_) {
84
2
    ASSERT(socket_type == Socket::Type::Stream);
85
2
    ASSERT(addr_type == Address::Type::Ip);
86
2
    protocol = IPPROTO_MPTCP;
87
2
  }
88
97131
#endif
89

            
90
97131
  if (socket_type == Socket::Type::Stream) {
91
91028
    flags |= SOCK_STREAM;
92
91703
  } else {
93
6103
    flags |= SOCK_DGRAM;
94
6103
  }
95

            
96
97131
  int domain;
97
97131
  if (addr_type == Address::Type::Ip) {
98
97046
    if (version == Address::IpVersion::v6 || Address::forceV6()) {
99
464
      domain = AF_INET6;
100
97035
    } else {
101
96582
      ASSERT(version == Address::IpVersion::v4);
102
96582
      domain = AF_INET;
103
96582
    }
104
97052
  } else if (addr_type == Address::Type::Pipe) {
105
85
    domain = AF_UNIX;
106
85
  } else {
107
    ASSERT(addr_type == Address::Type::EnvoyInternal);
108
    PANIC("not implemented");
109
    // TODO(lambdai): Add InternalIoSocketHandleImpl to support internal address.
110
    return nullptr;
111
  }
112

            
113
97131
  const Api::SysCallSocketResult result =
114
97131
      Api::OsSysCallsSingleton::get().socket(domain, flags, protocol);
115
97131
  if (!SOCKET_VALID(result.return_value_)) {
116
12
    IS_ENVOY_BUG(fmt::format("socket(2) failed, got error: {}", errorDetails(result.errno_)));
117
12
    return nullptr;
118
12
  }
119
97119
  IoHandlePtr io_handle =
120
97119
      makeSocket(result.return_value_, socket_v6only, socket_type, domain, options);
121

            
122
#if defined(__APPLE__) || defined(WIN32)
123
  // Cannot set SOCK_NONBLOCK as a ::socket flag.
124
  const int rc = io_handle->setBlocking(false).return_value_;
125
  if (SOCKET_FAILURE(result.return_value_)) {
126
    IS_ENVOY_BUG(fmt::format("Unable to set socket non-blocking: got error: {}", rc));
127
    return nullptr;
128
  }
129
#endif
130

            
131
97119
  return io_handle;
132
97131
}
133

            
134
IoHandlePtr SocketInterfaceImpl::socket(Socket::Type socket_type,
135
                                        const Address::InstanceConstSharedPtr addr,
136
97131
                                        const SocketCreationOptions& options) const {
137
97131
  Address::IpVersion ip_version = addr->ip() ? addr->ip()->version() : Address::IpVersion::v4;
138
97131
  int v6only = 0;
139
97131
  if (addr->type() == Address::Type::Ip && ip_version == Address::IpVersion::v6) {
140
464
    v6only = addr->ip()->ipv6()->v6only();
141
464
  }
142

            
143
97131
  IoHandlePtr io_handle =
144
97131
      SocketInterfaceImpl::socket(socket_type, addr->type(), ip_version, v6only, options);
145
97131
  if (io_handle && addr->type() == Address::Type::Ip && ip_version == Address::IpVersion::v6 &&
146
97131
      !Address::forceV6()) {
147
    // Setting IPV6_V6ONLY restricts the IPv6 socket to IPv6 connections only.
148
464
    const Api::SysCallIntResult result = io_handle->setOption(
149
464
        IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char*>(&v6only), sizeof(v6only));
150
464
    ENVOY_BUG(!SOCKET_FAILURE(result.return_value_),
151
464
              fmt::format("Unable to set socket v6-only: got error: {}", result.return_value_));
152
464
  }
153
97131
  return io_handle;
154
97131
}
155

            
156
92566
bool SocketInterfaceImpl::ipFamilySupported(int domain) {
157
92566
  Api::OsSysCalls& os_sys_calls = Api::OsSysCallsSingleton::get();
158
92566
  const Api::SysCallSocketResult result = os_sys_calls.socket(domain, SOCK_STREAM, 0);
159
92566
  if (SOCKET_VALID(result.return_value_)) {
160
92560
    RELEASE_ASSERT(
161
92560
        os_sys_calls.close(result.return_value_).return_value_ == 0,
162
92560
        fmt::format("Fail to close fd: response code {}", errorDetails(result.return_value_)));
163
92560
  }
164
92566
  return SOCKET_VALID(result.return_value_);
165
92566
}
166

            
167
Server::BootstrapExtensionPtr SocketInterfaceImpl::createBootstrapExtension(
168
    [[maybe_unused]] const Protobuf::Message& config,
169
6
    [[maybe_unused]] Server::Configuration::ServerFactoryContext& context) {
170
6
#if defined(__linux__) && !defined(__ANDROID_API__) && defined(ENVOY_ENABLE_IO_URING)
171
6
  const auto& message = MessageUtil::downcastAndValidate<
172
6
      const envoy::extensions::network::socket_interface::v3::DefaultSocketInterface&>(
173
6
      config, context.messageValidationVisitor());
174
6
  if (message.has_io_uring_options() && Io::isIoUringSupported()) {
175
1
    const auto& options = message.io_uring_options();
176
1
    std::shared_ptr<Io::IoUringWorkerFactoryImpl> io_uring_worker_factory =
177
1
        std::make_shared<Io::IoUringWorkerFactoryImpl>(
178
1
            PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, io_uring_size, 1000),
179
1
            options.enable_submission_queue_polling(),
180
1
            PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, read_buffer_size, 8192),
181
1
            PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, write_timeout_ms, 1000),
182
1
            context.threadLocal());
183
1
    io_uring_worker_factory_ = io_uring_worker_factory;
184

            
185
1
    return std::make_unique<DefaultSocketInterfaceExtension>(*this, io_uring_worker_factory);
186
5
  } else {
187
5
    return std::make_unique<DefaultSocketInterfaceExtension>(*this, nullptr);
188
5
  }
189
#else
190
  return std::make_unique<DefaultSocketInterfaceExtension>(*this, nullptr);
191
#endif
192
6
}
193

            
194
480
ProtobufTypes::MessagePtr SocketInterfaceImpl::createEmptyConfigProto() {
195
480
  return std::make_unique<
196
480
      envoy::extensions::network::socket_interface::v3::DefaultSocketInterface>();
197
480
}
198

            
199
REGISTER_FACTORY(SocketInterfaceImpl, Server::Configuration::BootstrapExtensionFactory);
200

            
201
static SocketInterfaceLoader* socket_interface_ =
202
    new SocketInterfaceLoader(std::make_unique<SocketInterfaceImpl>());
203

            
204
} // namespace Network
205
} // namespace Envoy