LCOV - code coverage report
Current view: top level - source/extensions/upstreams/http/udp - upstream_request.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 113 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 10 0.0 %

          Line data    Source code
       1             : #include "source/extensions/upstreams/http/udp/upstream_request.h"
       2             : 
       3             : #include <cstdint>
       4             : #include <memory>
       5             : 
       6             : #include "envoy/upstream/upstream.h"
       7             : 
       8             : #include "source/common/common/assert.h"
       9             : #include "source/common/common/logger.h"
      10             : #include "source/common/common/utility.h"
      11             : #include "source/common/http/codes.h"
      12             : #include "source/common/http/header_map_impl.h"
      13             : #include "source/common/http/headers.h"
      14             : #include "source/common/http/message_impl.h"
      15             : #include "source/common/network/transport_socket_options_impl.h"
      16             : #include "source/common/router/router.h"
      17             : #include "source/extensions/common/proxy_protocol/proxy_protocol_header.h"
      18             : 
      19             : #include "quiche/common/masque/connect_udp_datagram_payload.h"
      20             : #include "quiche/common/simple_buffer_allocator.h"
      21             : 
      22             : namespace Envoy {
      23             : namespace Extensions {
      24             : namespace Upstreams {
      25             : namespace Http {
      26             : namespace Udp {
      27             : 
      28           0 : void UdpConnPool::newStream(Router::GenericConnectionPoolCallbacks* callbacks) {
      29           0 :   Envoy::Network::SocketPtr socket = createSocket(host_);
      30           0 :   auto source_address_selector = host_->cluster().getUpstreamLocalAddressSelector();
      31           0 :   auto upstream_local_address = source_address_selector->getUpstreamLocalAddress(
      32           0 :       host_->address(), /*socket_options=*/nullptr);
      33           0 :   if (!Envoy::Network::Socket::applyOptions(upstream_local_address.socket_options_, *socket,
      34           0 :                                             envoy::config::core::v3::SocketOption::STATE_PREBIND)) {
      35           0 :     callbacks->onPoolFailure(ConnectionPool::PoolFailureReason::LocalConnectionFailure,
      36           0 :                              "Failed to apply socket option for UDP upstream", host_);
      37           0 :     return;
      38           0 :   }
      39           0 :   if (upstream_local_address.address_) {
      40           0 :     Envoy::Api::SysCallIntResult bind_result = socket->bind(upstream_local_address.address_);
      41           0 :     if (bind_result.return_value_ < 0) {
      42           0 :       callbacks->onPoolFailure(ConnectionPool::PoolFailureReason::LocalConnectionFailure,
      43           0 :                                "Failed to bind for UDP upstream", host_);
      44           0 :       return;
      45           0 :     }
      46           0 :   }
      47             : 
      48           0 :   const Network::ConnectionInfoProvider& connection_info_provider =
      49           0 :       socket->connectionInfoProvider();
      50           0 :   Router::UpstreamToDownstream& upstream_to_downstream = callbacks->upstreamToDownstream();
      51           0 :   ASSERT(upstream_to_downstream.connection().has_value());
      52           0 :   Event::Dispatcher& dispatcher = upstream_to_downstream.connection()->dispatcher();
      53           0 :   auto upstream =
      54           0 :       std::make_unique<UdpUpstream>(&upstream_to_downstream, std::move(socket), host_, dispatcher);
      55           0 :   StreamInfo::StreamInfoImpl stream_info(dispatcher.timeSource(), nullptr);
      56             : 
      57           0 :   callbacks->onPoolReady(std::move(upstream), host_, connection_info_provider, stream_info, {});
      58           0 : }
      59             : 
      60             : UdpUpstream::UdpUpstream(Router::UpstreamToDownstream* upstream_to_downstream,
      61             :                          Network::SocketPtr socket, Upstream::HostConstSharedPtr host,
      62             :                          Event::Dispatcher& dispatcher)
      63             :     : upstream_to_downstream_(upstream_to_downstream), socket_(std::move(socket)), host_(host),
      64           0 :       dispatcher_(dispatcher) {
      65           0 :   socket_->ioHandle().initializeFileEvent(
      66           0 :       dispatcher_, [this](uint32_t) { onSocketReadReady(); }, Event::PlatformDefaultTriggerType,
      67           0 :       Event::FileReadyType::Read);
      68           0 : }
      69             : 
      70           0 : void UdpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
      71           0 :   for (const Buffer::RawSlice& slice : data.getRawSlices()) {
      72           0 :     absl::string_view mem_slice(static_cast<const char*>(slice.mem_), slice.len_);
      73           0 :     if (!capsule_parser_.IngestCapsuleFragment(mem_slice)) {
      74           0 :       ENVOY_LOG_MISC(error, "Capsule ingestion error occured: slice = {}", mem_slice);
      75           0 :       break;
      76           0 :     }
      77           0 :   }
      78           0 :   if (end_stream) {
      79           0 :     capsule_parser_.ErrorIfThereIsRemainingBufferedData();
      80           0 :   }
      81           0 : }
      82             : 
      83             : Envoy::Http::Status UdpUpstream::encodeHeaders(const Envoy::Http::RequestHeaderMap& /*headers*/,
      84           0 :                                                bool end_stream) {
      85             :   // For successful CONNECT-UDP handshakes, synthesizes the 200 response headers downstream.
      86           0 :   Envoy::Http::ResponseHeaderMapPtr response_headers{
      87           0 :       Envoy::Http::createHeaderMap<Envoy::Http::ResponseHeaderMapImpl>(
      88           0 :           {{Envoy::Http::Headers::get().Status, "200"},
      89           0 :            {Envoy::Http::Headers::get().CapsuleProtocol, "?1"}})};
      90           0 :   if (end_stream) {
      91             :     // If the request header is the end of the stream, responds with 400 Bad Request. Does not
      92             :     // return an error code to avoid replying with 503 Service Unavailable.
      93           0 :     response_headers->setStatus("400");
      94           0 :     response_headers->remove(Envoy::Http::Headers::get().CapsuleProtocol);
      95           0 :     upstream_to_downstream_->onResetStream(Envoy::Http::StreamResetReason::ConnectError, "");
      96           0 :   } else {
      97           0 :     Api::SysCallIntResult rc = socket_->connect(host_->address());
      98           0 :     if (SOCKET_FAILURE(rc.return_value_)) {
      99           0 :       return absl::InternalError("Upstream socket connect failure.");
     100           0 :     }
     101           0 :   }
     102             :   // Indicates the end of stream for the subsequent filters in the chain.
     103           0 :   upstream_to_downstream_->decodeHeaders(std::move(response_headers), end_stream);
     104           0 :   return Envoy::Http::okStatus();
     105           0 : }
     106             : 
     107           0 : void UdpUpstream::resetStream() {
     108           0 :   upstream_to_downstream_ = nullptr;
     109           0 :   socket_->close();
     110           0 : }
     111             : 
     112           0 : void UdpUpstream::onSocketReadReady() {
     113           0 :   uint32_t packets_dropped = 0;
     114           0 :   const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket(
     115           0 :       socket_->ioHandle(), *socket_->connectionInfoProvider().localAddress(), *this,
     116           0 :       dispatcher_.timeSource(), /*prefer_gro=*/true, packets_dropped);
     117           0 :   if (result == nullptr) {
     118           0 :     socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read);
     119           0 :     return;
     120           0 :   }
     121           0 : }
     122             : 
     123             : // The local and peer addresses are not used in this method since the socket is already bound and
     124             : // connected to the upstream server in the encodeHeaders method.
     125             : void UdpUpstream::processPacket(Network::Address::InstanceConstSharedPtr /*local_address*/,
     126             :                                 Network::Address::InstanceConstSharedPtr /*peer_address*/,
     127           0 :                                 Buffer::InstancePtr buffer, MonotonicTime /*receive_time*/) {
     128           0 :   std::string data = buffer->toString();
     129           0 :   quiche::ConnectUdpDatagramUdpPacketPayload payload(data);
     130           0 :   quiche::QuicheBuffer serialized_capsule =
     131           0 :       SerializeCapsule(quiche::Capsule::Datagram(payload.Serialize()), &capsule_buffer_allocator_);
     132             : 
     133           0 :   Buffer::InstancePtr capsule_data = std::make_unique<Buffer::OwnedImpl>();
     134           0 :   capsule_data->add(serialized_capsule.AsStringView());
     135           0 :   bytes_meter_->addWireBytesReceived(capsule_data->length());
     136           0 :   upstream_to_downstream_->decodeData(*capsule_data, /*end_stream=*/false);
     137           0 : }
     138             : 
     139           0 : bool UdpUpstream::OnCapsule(const quiche::Capsule& capsule) {
     140           0 :   quiche::CapsuleType capsule_type = capsule.capsule_type();
     141           0 :   if (capsule_type != quiche::CapsuleType::DATAGRAM) {
     142             :     // Silently drops capsules with an unknown type.
     143           0 :     return true;
     144           0 :   }
     145             : 
     146           0 :   std::unique_ptr<quiche::ConnectUdpDatagramPayload> connect_udp_datagram_payload =
     147           0 :       quiche::ConnectUdpDatagramPayload::Parse(capsule.datagram_capsule().http_datagram_payload);
     148           0 :   if (!connect_udp_datagram_payload) {
     149             :     // Indicates parsing failure to reset the data stream.
     150           0 :     return false;
     151           0 :   }
     152             : 
     153           0 :   if (connect_udp_datagram_payload->GetType() !=
     154           0 :       quiche::ConnectUdpDatagramPayload::Type::kUdpPacket) {
     155             :     // Silently drops Datagrams with an unknown Context ID.
     156           0 :     return true;
     157           0 :   }
     158             : 
     159           0 :   Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
     160           0 :   buffer->add(connect_udp_datagram_payload->GetUdpProxyingPayload());
     161           0 :   bytes_meter_->addWireBytesSent(buffer->length());
     162           0 :   Api::IoCallUint64Result rc = Network::Utility::writeToSocket(
     163           0 :       socket_->ioHandle(), *buffer, /*local_ip=*/nullptr, *host_->address());
     164             :   // TODO(https://github.com/envoyproxy/envoy/issues/23564): Handle some socket errors here.
     165           0 :   return true;
     166           0 : }
     167             : 
     168           0 : void UdpUpstream::OnCapsuleParseFailure(absl::string_view error_message) {
     169           0 :   upstream_to_downstream_->onResetStream(Envoy::Http::StreamResetReason::ProtocolError,
     170           0 :                                          error_message);
     171           0 : }
     172             : 
     173             : } // namespace Udp
     174             : } // namespace Http
     175             : } // namespace Upstreams
     176             : } // namespace Extensions
     177             : } // namespace Envoy

Generated by: LCOV version 1.15