1
#include "source/extensions/upstreams/http/udp/upstream_request.h"
2

            
3
#include <cstdint>
4
#include <memory>
5

            
6
#include "envoy/upstream/upstream.h"
7

            
8
#include "source/common/common/assert.h"
9
#include "source/common/common/logger.h"
10
#include "source/common/common/utility.h"
11
#include "source/common/http/codes.h"
12
#include "source/common/http/header_map_impl.h"
13
#include "source/common/http/headers.h"
14
#include "source/common/http/message_impl.h"
15
#include "source/common/network/transport_socket_options_impl.h"
16
#include "source/common/router/router.h"
17
#include "source/extensions/common/proxy_protocol/proxy_protocol_header.h"
18

            
19
#include "quiche/common/masque/connect_udp_datagram_payload.h"
20
#include "quiche/common/simple_buffer_allocator.h"
21

            
22
namespace Envoy {
23
namespace Extensions {
24
namespace Upstreams {
25
namespace Http {
26
namespace Udp {
27

            
28
36
void UdpConnPool::newStream(Router::GenericConnectionPoolCallbacks* callbacks) {
29
36
  Envoy::Network::SocketPtr socket = createSocket(host_);
30
36
  auto source_address_selector = host_->cluster().getUpstreamLocalAddressSelector();
31
36
  auto upstream_local_address = source_address_selector->getUpstreamLocalAddress(
32
36
      host_->address(), /*socket_options=*/nullptr, /*transport_socket_options=*/{});
33
36
  if (!Envoy::Network::Socket::applyOptions(upstream_local_address.socket_options_, *socket,
34
36
                                            envoy::config::core::v3::SocketOption::STATE_PREBIND)) {
35
1
    callbacks->onPoolFailure(ConnectionPool::PoolFailureReason::LocalConnectionFailure,
36
1
                             "Failed to apply socket option for UDP upstream", host_);
37
1
    return;
38
1
  }
39
35
  if (upstream_local_address.address_) {
40
3
    Envoy::Api::SysCallIntResult bind_result = socket->bind(upstream_local_address.address_);
41
3
    if (bind_result.return_value_ < 0) {
42
1
      callbacks->onPoolFailure(ConnectionPool::PoolFailureReason::LocalConnectionFailure,
43
1
                               "Failed to bind for UDP upstream", host_);
44
1
      return;
45
1
    }
46
3
  }
47

            
48
34
  const Network::ConnectionInfoProvider& connection_info_provider =
49
34
      socket->connectionInfoProvider();
50
34
  Router::UpstreamToDownstream& upstream_to_downstream = callbacks->upstreamToDownstream();
51
34
  ASSERT(upstream_to_downstream.connection().has_value());
52
34
  Event::Dispatcher& dispatcher = upstream_to_downstream.connection()->dispatcher();
53
34
  auto upstream =
54
34
      std::make_unique<UdpUpstream>(&upstream_to_downstream, std::move(socket), host_, dispatcher);
55
34
  StreamInfo::StreamInfoImpl stream_info(dispatcher.timeSource(), nullptr,
56
34
                                         StreamInfo::FilterState::LifeSpan::Connection);
57

            
58
34
  callbacks->onPoolReady(std::move(upstream), host_, connection_info_provider, stream_info, {});
59
34
}
60

            
61
UdpUpstream::UdpUpstream(Router::UpstreamToDownstream* upstream_to_downstream,
62
                         Network::SocketPtr socket, Upstream::HostConstSharedPtr host,
63
                         Event::Dispatcher& dispatcher)
64
43
    : upstream_to_downstream_(upstream_to_downstream), socket_(std::move(socket)), host_(host),
65
43
      dispatcher_(dispatcher) {
66
43
  socket_->ioHandle().initializeFileEvent(
67
43
      dispatcher_,
68
43
      [this](uint32_t) {
69
16
        onSocketReadReady();
70
16
        return absl::OkStatus();
71
16
      },
72
43
      Event::PlatformDefaultTriggerType, Event::FileReadyType::Read);
73
43
}
74

            
75
36
void UdpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
76
36
  for (const Buffer::RawSlice& slice : data.getRawSlices()) {
77
28
    absl::string_view mem_slice(static_cast<const char*>(slice.mem_), slice.len_);
78
28
    if (!capsule_parser_.IngestCapsuleFragment(mem_slice)) {
79
1
      ENVOY_LOG_MISC(error, "Capsule ingestion error occured: slice = {}", mem_slice);
80
1
      break;
81
1
    }
82
28
  }
83
36
  if (end_stream) {
84
3
    capsule_parser_.ErrorIfThereIsRemainingBufferedData();
85
3
  }
86
36
}
87

            
88
Envoy::Http::Status UdpUpstream::encodeHeaders(const Envoy::Http::RequestHeaderMap& /*headers*/,
89
38
                                               bool end_stream) {
90
  // For successful CONNECT-UDP handshakes, synthesizes the 200 response headers downstream.
91
38
  Envoy::Http::ResponseHeaderMapPtr response_headers{
92
38
      Envoy::Http::createHeaderMap<Envoy::Http::ResponseHeaderMapImpl>(
93
38
          {{Envoy::Http::Headers::get().Status, "200"},
94
38
           {Envoy::Http::Headers::get().CapsuleProtocol, "?1"}})};
95
38
  if (end_stream) {
96
    // If the request header is the end of the stream, responds with 400 Bad Request. Does not
97
    // return an error code to avoid replying with 503 Service Unavailable.
98
1
    response_headers->setStatus("400");
99
1
    response_headers->remove(Envoy::Http::Headers::get().CapsuleProtocol);
100
1
    upstream_to_downstream_->onResetStream(Envoy::Http::StreamResetReason::ConnectError, "");
101
37
  } else {
102
37
    Api::SysCallIntResult rc = socket_->connect(host_->address());
103
37
    if (SOCKET_FAILURE(rc.return_value_)) {
104
1
      return absl::InternalError("Upstream socket connect failure.");
105
1
    }
106
37
  }
107
  // Indicates the end of stream for the subsequent filters in the chain.
108
37
  upstream_to_downstream_->decodeHeaders(std::move(response_headers), end_stream);
109
37
  return Envoy::Http::okStatus();
110
38
}
111

            
112
32
void UdpUpstream::resetStream() {
113
32
  upstream_to_downstream_ = nullptr;
114
32
  socket_->close();
115
32
}
116

            
117
16
void UdpUpstream::onSocketReadReady() {
118
16
  uint32_t packets_dropped = 0;
119
16
  const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket(
120
16
      socket_->ioHandle(), *socket_->connectionInfoProvider().localAddress(), *this,
121
16
      dispatcher_.timeSource(), /*allow_gro=*/true, /*allow_mmsg=*/true, packets_dropped);
122
16
  if (result == nullptr) {
123
    socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read);
124
    return;
125
  }
126
16
}
127

            
128
// The local and peer addresses are not used in this method since the socket is already bound and
129
// connected to the upstream server in the encodeHeaders method.
130
void UdpUpstream::processPacket(Network::Address::InstanceConstSharedPtr /*local_address*/,
131
                                Network::Address::InstanceConstSharedPtr /*peer_address*/,
132
                                Buffer::InstancePtr buffer, MonotonicTime /*receive_time*/,
133
17
                                uint8_t /*tos*/, Buffer::OwnedImpl /*saved_cmsg*/) {
134
17
  std::string data = buffer->toString();
135
17
  quiche::ConnectUdpDatagramUdpPacketPayload payload(data);
136
17
  quiche::QuicheBuffer serialized_capsule =
137
17
      SerializeCapsule(quiche::Capsule::Datagram(payload.Serialize()), &capsule_buffer_allocator_);
138

            
139
17
  Buffer::InstancePtr capsule_data = std::make_unique<Buffer::OwnedImpl>();
140
17
  capsule_data->add(serialized_capsule.AsStringView());
141
17
  bytes_meter_->addWireBytesReceived(capsule_data->length());
142
17
  upstream_to_downstream_->decodeData(*capsule_data, /*end_stream=*/false);
143
17
}
144

            
145
26
bool UdpUpstream::OnCapsule(const quiche::Capsule& capsule) {
146
26
  quiche::CapsuleType capsule_type = capsule.capsule_type();
147
26
  if (capsule_type != quiche::CapsuleType::DATAGRAM) {
148
    // Silently drops capsules with an unknown type.
149
4
    return true;
150
4
  }
151

            
152
22
  std::unique_ptr<quiche::ConnectUdpDatagramPayload> connect_udp_datagram_payload =
153
22
      quiche::ConnectUdpDatagramPayload::Parse(capsule.datagram_capsule().http_datagram_payload);
154
22
  if (!connect_udp_datagram_payload) {
155
    // Indicates parsing failure to reset the data stream.
156
1
    return false;
157
1
  }
158

            
159
21
  if (connect_udp_datagram_payload->GetType() !=
160
21
      quiche::ConnectUdpDatagramPayload::Type::kUdpPacket) {
161
    // Silently drops Datagrams with an unknown Context ID.
162
4
    return true;
163
4
  }
164

            
165
17
  Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
166
17
  buffer->add(connect_udp_datagram_payload->GetUdpProxyingPayload());
167
17
  bytes_meter_->addWireBytesSent(buffer->length());
168
17
  Api::IoCallUint64Result rc = Network::Utility::writeToSocket(
169
17
      socket_->ioHandle(), *buffer, /*local_ip=*/nullptr, *host_->address());
170
  // TODO(https://github.com/envoyproxy/envoy/issues/23564): Handle some socket errors here.
171
17
  return true;
172
21
}
173

            
174
3
void UdpUpstream::OnCapsuleParseFailure(absl::string_view error_message) {
175
3
  upstream_to_downstream_->onResetStream(Envoy::Http::StreamResetReason::ProtocolError,
176
3
                                         error_message);
177
3
}
178

            
179
} // namespace Udp
180
} // namespace Http
181
} // namespace Upstreams
182
} // namespace Extensions
183
} // namespace Envoy