Line data Source code
1 : #include "source/extensions/upstreams/http/udp/upstream_request.h" 2 : 3 : #include <cstdint> 4 : #include <memory> 5 : 6 : #include "envoy/upstream/upstream.h" 7 : 8 : #include "source/common/common/assert.h" 9 : #include "source/common/common/logger.h" 10 : #include "source/common/common/utility.h" 11 : #include "source/common/http/codes.h" 12 : #include "source/common/http/header_map_impl.h" 13 : #include "source/common/http/headers.h" 14 : #include "source/common/http/message_impl.h" 15 : #include "source/common/network/transport_socket_options_impl.h" 16 : #include "source/common/router/router.h" 17 : #include "source/extensions/common/proxy_protocol/proxy_protocol_header.h" 18 : 19 : #include "quiche/common/masque/connect_udp_datagram_payload.h" 20 : #include "quiche/common/simple_buffer_allocator.h" 21 : 22 : namespace Envoy { 23 : namespace Extensions { 24 : namespace Upstreams { 25 : namespace Http { 26 : namespace Udp { 27 : 28 0 : void UdpConnPool::newStream(Router::GenericConnectionPoolCallbacks* callbacks) { 29 0 : Envoy::Network::SocketPtr socket = createSocket(host_); 30 0 : auto source_address_selector = host_->cluster().getUpstreamLocalAddressSelector(); 31 0 : auto upstream_local_address = source_address_selector->getUpstreamLocalAddress( 32 0 : host_->address(), /*socket_options=*/nullptr); 33 0 : if (!Envoy::Network::Socket::applyOptions(upstream_local_address.socket_options_, *socket, 34 0 : envoy::config::core::v3::SocketOption::STATE_PREBIND)) { 35 0 : callbacks->onPoolFailure(ConnectionPool::PoolFailureReason::LocalConnectionFailure, 36 0 : "Failed to apply socket option for UDP upstream", host_); 37 0 : return; 38 0 : } 39 0 : if (upstream_local_address.address_) { 40 0 : Envoy::Api::SysCallIntResult bind_result = socket->bind(upstream_local_address.address_); 41 0 : if (bind_result.return_value_ < 0) { 42 0 : callbacks->onPoolFailure(ConnectionPool::PoolFailureReason::LocalConnectionFailure, 43 0 : "Failed to bind for UDP upstream", host_); 44 0 : return; 45 0 : } 46 0 : } 47 : 48 0 : const Network::ConnectionInfoProvider& connection_info_provider = 49 0 : socket->connectionInfoProvider(); 50 0 : Router::UpstreamToDownstream& upstream_to_downstream = callbacks->upstreamToDownstream(); 51 0 : ASSERT(upstream_to_downstream.connection().has_value()); 52 0 : Event::Dispatcher& dispatcher = upstream_to_downstream.connection()->dispatcher(); 53 0 : auto upstream = 54 0 : std::make_unique<UdpUpstream>(&upstream_to_downstream, std::move(socket), host_, dispatcher); 55 0 : StreamInfo::StreamInfoImpl stream_info(dispatcher.timeSource(), nullptr); 56 : 57 0 : callbacks->onPoolReady(std::move(upstream), host_, connection_info_provider, stream_info, {}); 58 0 : } 59 : 60 : UdpUpstream::UdpUpstream(Router::UpstreamToDownstream* upstream_to_downstream, 61 : Network::SocketPtr socket, Upstream::HostConstSharedPtr host, 62 : Event::Dispatcher& dispatcher) 63 : : upstream_to_downstream_(upstream_to_downstream), socket_(std::move(socket)), host_(host), 64 0 : dispatcher_(dispatcher) { 65 0 : socket_->ioHandle().initializeFileEvent( 66 0 : dispatcher_, [this](uint32_t) { onSocketReadReady(); }, Event::PlatformDefaultTriggerType, 67 0 : Event::FileReadyType::Read); 68 0 : } 69 : 70 0 : void UdpUpstream::encodeData(Buffer::Instance& data, bool end_stream) { 71 0 : for (const Buffer::RawSlice& slice : data.getRawSlices()) { 72 0 : absl::string_view mem_slice(static_cast<const char*>(slice.mem_), slice.len_); 73 0 : if (!capsule_parser_.IngestCapsuleFragment(mem_slice)) { 74 0 : ENVOY_LOG_MISC(error, "Capsule ingestion error occured: slice = {}", mem_slice); 75 0 : break; 76 0 : } 77 0 : } 78 0 : if (end_stream) { 79 0 : capsule_parser_.ErrorIfThereIsRemainingBufferedData(); 80 0 : } 81 0 : } 82 : 83 : Envoy::Http::Status UdpUpstream::encodeHeaders(const Envoy::Http::RequestHeaderMap& /*headers*/, 84 0 : bool end_stream) { 85 : // For successful CONNECT-UDP handshakes, synthesizes the 200 response headers downstream. 86 0 : Envoy::Http::ResponseHeaderMapPtr response_headers{ 87 0 : Envoy::Http::createHeaderMap<Envoy::Http::ResponseHeaderMapImpl>( 88 0 : {{Envoy::Http::Headers::get().Status, "200"}, 89 0 : {Envoy::Http::Headers::get().CapsuleProtocol, "?1"}})}; 90 0 : if (end_stream) { 91 : // If the request header is the end of the stream, responds with 400 Bad Request. Does not 92 : // return an error code to avoid replying with 503 Service Unavailable. 93 0 : response_headers->setStatus("400"); 94 0 : response_headers->remove(Envoy::Http::Headers::get().CapsuleProtocol); 95 0 : upstream_to_downstream_->onResetStream(Envoy::Http::StreamResetReason::ConnectError, ""); 96 0 : } else { 97 0 : Api::SysCallIntResult rc = socket_->connect(host_->address()); 98 0 : if (SOCKET_FAILURE(rc.return_value_)) { 99 0 : return absl::InternalError("Upstream socket connect failure."); 100 0 : } 101 0 : } 102 : // Indicates the end of stream for the subsequent filters in the chain. 103 0 : upstream_to_downstream_->decodeHeaders(std::move(response_headers), end_stream); 104 0 : return Envoy::Http::okStatus(); 105 0 : } 106 : 107 0 : void UdpUpstream::resetStream() { 108 0 : upstream_to_downstream_ = nullptr; 109 0 : socket_->close(); 110 0 : } 111 : 112 0 : void UdpUpstream::onSocketReadReady() { 113 0 : uint32_t packets_dropped = 0; 114 0 : const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket( 115 0 : socket_->ioHandle(), *socket_->connectionInfoProvider().localAddress(), *this, 116 0 : dispatcher_.timeSource(), /*prefer_gro=*/true, packets_dropped); 117 0 : if (result == nullptr) { 118 0 : socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read); 119 0 : return; 120 0 : } 121 0 : } 122 : 123 : // The local and peer addresses are not used in this method since the socket is already bound and 124 : // connected to the upstream server in the encodeHeaders method. 125 : void UdpUpstream::processPacket(Network::Address::InstanceConstSharedPtr /*local_address*/, 126 : Network::Address::InstanceConstSharedPtr /*peer_address*/, 127 0 : Buffer::InstancePtr buffer, MonotonicTime /*receive_time*/) { 128 0 : std::string data = buffer->toString(); 129 0 : quiche::ConnectUdpDatagramUdpPacketPayload payload(data); 130 0 : quiche::QuicheBuffer serialized_capsule = 131 0 : SerializeCapsule(quiche::Capsule::Datagram(payload.Serialize()), &capsule_buffer_allocator_); 132 : 133 0 : Buffer::InstancePtr capsule_data = std::make_unique<Buffer::OwnedImpl>(); 134 0 : capsule_data->add(serialized_capsule.AsStringView()); 135 0 : bytes_meter_->addWireBytesReceived(capsule_data->length()); 136 0 : upstream_to_downstream_->decodeData(*capsule_data, /*end_stream=*/false); 137 0 : } 138 : 139 0 : bool UdpUpstream::OnCapsule(const quiche::Capsule& capsule) { 140 0 : quiche::CapsuleType capsule_type = capsule.capsule_type(); 141 0 : if (capsule_type != quiche::CapsuleType::DATAGRAM) { 142 : // Silently drops capsules with an unknown type. 143 0 : return true; 144 0 : } 145 : 146 0 : std::unique_ptr<quiche::ConnectUdpDatagramPayload> connect_udp_datagram_payload = 147 0 : quiche::ConnectUdpDatagramPayload::Parse(capsule.datagram_capsule().http_datagram_payload); 148 0 : if (!connect_udp_datagram_payload) { 149 : // Indicates parsing failure to reset the data stream. 150 0 : return false; 151 0 : } 152 : 153 0 : if (connect_udp_datagram_payload->GetType() != 154 0 : quiche::ConnectUdpDatagramPayload::Type::kUdpPacket) { 155 : // Silently drops Datagrams with an unknown Context ID. 156 0 : return true; 157 0 : } 158 : 159 0 : Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>(); 160 0 : buffer->add(connect_udp_datagram_payload->GetUdpProxyingPayload()); 161 0 : bytes_meter_->addWireBytesSent(buffer->length()); 162 0 : Api::IoCallUint64Result rc = Network::Utility::writeToSocket( 163 0 : socket_->ioHandle(), *buffer, /*local_ip=*/nullptr, *host_->address()); 164 : // TODO(https://github.com/envoyproxy/envoy/issues/23564): Handle some socket errors here. 165 0 : return true; 166 0 : } 167 : 168 0 : void UdpUpstream::OnCapsuleParseFailure(absl::string_view error_message) { 169 0 : upstream_to_downstream_->onResetStream(Envoy::Http::StreamResetReason::ProtocolError, 170 0 : error_message); 171 0 : } 172 : 173 : } // namespace Udp 174 : } // namespace Http 175 : } // namespace Upstreams 176 : } // namespace Extensions 177 : } // namespace Envoy