1
#include "source/extensions/upstreams/http/dynamic_modules/upstream_request.h"
2

            
3
#include <cstdint>
4
#include <memory>
5

            
6
#include "envoy/upstream/upstream.h"
7

            
8
#include "source/common/buffer/buffer_impl.h"
9
#include "source/common/common/assert.h"
10
#include "source/common/http/header_map_impl.h"
11
#include "source/common/http/headers.h"
12

            
13
namespace Envoy {
14
namespace Extensions {
15
namespace Upstreams {
16
namespace Http {
17
namespace DynamicModules {
18

            
19
// =============================================================================
20
// BridgeConfig
21
// =============================================================================
22

            
23
BridgeConfig::BridgeConfig(Envoy::Extensions::DynamicModules::DynamicModulePtr module)
24
62
    : dynamic_module_(std::move(module)) {}
25

            
26
62
BridgeConfig::~BridgeConfig() {
27
62
  if (on_bridge_config_destroy_ && in_module_config_) {
28
60
    (*on_bridge_config_destroy_)(in_module_config_);
29
60
  }
30
62
}
31

            
32
absl::StatusOr<std::shared_ptr<BridgeConfig>>
33
BridgeConfig::create(const std::string& bridge_name, const std::string& bridge_config,
34
62
                     Envoy::Extensions::DynamicModules::DynamicModulePtr module) {
35
62
  auto config = std::shared_ptr<BridgeConfig>(new BridgeConfig(std::move(module)));
36

            
37
62
#define RESOLVE_OR_RETURN(field, symbol)                                                           \
38
496
  {                                                                                                \
39
496
    auto result = config->dynamic_module_->getFunctionPointer<decltype(field)>(symbol);            \
40
496
    RETURN_IF_NOT_OK_REF(result.status());                                                         \
41
496
    config->field = result.value();                                                                \
42
496
  }
43

            
44
62
  RESOLVE_OR_RETURN(on_bridge_config_new_,
45
62
                    "envoy_dynamic_module_on_upstream_http_tcp_bridge_config_new");
46
62
  RESOLVE_OR_RETURN(on_bridge_config_destroy_,
47
62
                    "envoy_dynamic_module_on_upstream_http_tcp_bridge_config_destroy");
48
62
  RESOLVE_OR_RETURN(on_bridge_new_, "envoy_dynamic_module_on_upstream_http_tcp_bridge_new");
49
62
  RESOLVE_OR_RETURN(on_bridge_encode_headers_,
50
62
                    "envoy_dynamic_module_on_upstream_http_tcp_bridge_encode_headers");
51
62
  RESOLVE_OR_RETURN(on_bridge_encode_data_,
52
62
                    "envoy_dynamic_module_on_upstream_http_tcp_bridge_encode_data");
53
62
  RESOLVE_OR_RETURN(on_bridge_encode_trailers_,
54
62
                    "envoy_dynamic_module_on_upstream_http_tcp_bridge_encode_trailers");
55
62
  RESOLVE_OR_RETURN(on_bridge_on_upstream_data_,
56
62
                    "envoy_dynamic_module_on_upstream_http_tcp_bridge_on_upstream_data");
57
62
  RESOLVE_OR_RETURN(on_bridge_destroy_, "envoy_dynamic_module_on_upstream_http_tcp_bridge_destroy");
58

            
59
62
#undef RESOLVE_OR_RETURN
60

            
61
62
  const envoy_dynamic_module_type_envoy_buffer name_buf = {bridge_name.data(), bridge_name.size()};
62
62
  const envoy_dynamic_module_type_envoy_buffer config_buf = {bridge_config.data(),
63
62
                                                             bridge_config.size()};
64
62
  config->in_module_config_ =
65
62
      (*config->on_bridge_config_new_)(static_cast<void*>(config.get()), name_buf, config_buf);
66
62
  if (config->in_module_config_ == nullptr) {
67
2
    return absl::InvalidArgumentError("failed to initialize dynamic module bridge configuration");
68
2
  }
69

            
70
60
  return config;
71
62
}
72

            
73
// =============================================================================
74
// TcpConnPool
75
// =============================================================================
76

            
77
TcpConnPool::TcpConnPool(Upstream::HostConstSharedPtr host,
78
                         Upstream::ThreadLocalCluster& thread_local_cluster,
79
                         Upstream::ResourcePriority priority, Upstream::LoadBalancerContext* ctx,
80
                         BridgeConfigSharedPtr config)
81
10
    : config_(std::move(config)) {
82
10
  conn_pool_data_ = thread_local_cluster.tcpConnPool(host, priority, ctx);
83
10
}
84

            
85
10
TcpConnPool::~TcpConnPool() {
86
10
  ENVOY_BUG(upstream_handle_ == nullptr, "upstream_handle not null");
87
10
  resetUpstreamHandleIfSet();
88
10
}
89

            
90
5
void TcpConnPool::newStream(Router::GenericConnectionPoolCallbacks* callbacks) {
91
5
  callbacks_ = callbacks;
92
5
  upstream_handle_ = conn_pool_data_.value().newConnection(*this);
93
5
}
94

            
95
4
bool TcpConnPool::cancelAnyPendingStream() { return resetUpstreamHandleIfSet(); }
96

            
97
4
Upstream::HostDescriptionConstSharedPtr TcpConnPool::host() const {
98
4
  return conn_pool_data_.value().host();
99
4
}
100

            
101
7
bool TcpConnPool::valid() const { return conn_pool_data_.has_value(); }
102

            
103
void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
104
                                absl::string_view transport_failure_reason,
105
1
                                Upstream::HostDescriptionConstSharedPtr host) {
106
1
  upstream_handle_ = nullptr;
107
1
  callbacks_->onPoolFailure(reason, transport_failure_reason, host);
108
1
}
109

            
110
void TcpConnPool::onPoolReady(Envoy::Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
111
3
                              Upstream::HostDescriptionConstSharedPtr host) {
112
3
  upstream_handle_ = nullptr;
113
3
  Network::Connection& latched_conn = conn_data->connection();
114
3
  auto upstream = std::make_unique<HttpTcpBridge>(&callbacks_->upstreamToDownstream(),
115
3
                                                  std::move(conn_data), config_);
116
3
  callbacks_->onPoolReady(std::move(upstream), host, latched_conn.connectionInfoProvider(),
117
3
                          latched_conn.streamInfo(), {});
118
3
}
119

            
120
14
bool TcpConnPool::resetUpstreamHandleIfSet() {
121
14
  if (upstream_handle_) {
122
1
    upstream_handle_->cancel(Envoy::Tcp::ConnectionPool::CancelPolicy::Default);
123
1
    upstream_handle_ = nullptr;
124
1
    return true;
125
1
  }
126
13
  return false;
127
14
}
128

            
129
// =============================================================================
130
// HttpTcpBridge
131
// =============================================================================
132

            
133
HttpTcpBridge::HttpTcpBridge(Router::UpstreamToDownstream* upstream_request,
134
                             Envoy::Tcp::ConnectionPool::ConnectionDataPtr&& upstream,
135
                             BridgeConfigSharedPtr config)
136
55
    : upstream_request_(upstream_request), upstream_conn_data_(std::move(upstream)),
137
55
      config_(std::move(config)) {
138
55
  upstream_conn_data_->addUpstreamCallbacks(*this);
139

            
140
55
  in_module_bridge_ =
141
55
      (*config_->on_bridge_new_)(config_->in_module_config_, static_cast<void*>(this));
142
55
  if (in_module_bridge_ == nullptr) {
143
4
    ENVOY_LOG(error, "dynamic module bridge creation returned nullptr");
144
4
  }
145
55
}
146

            
147
55
HttpTcpBridge::~HttpTcpBridge() {
148
55
  if (in_module_bridge_ != nullptr) {
149
51
    (*config_->on_bridge_destroy_)(in_module_bridge_);
150
51
    in_module_bridge_ = nullptr;
151
51
  }
152
55
}
153

            
154
Envoy::Http::Status HttpTcpBridge::encodeHeaders(const Envoy::Http::RequestHeaderMap& headers,
155
19
                                                 bool end_stream) {
156
19
  if (in_module_bridge_ == nullptr) {
157
1
    return absl::InternalError("dynamic module bridge is null");
158
1
  }
159

            
160
18
  request_headers_ = &headers;
161
18
  downstream_complete_ = end_stream;
162

            
163
18
  (*config_->on_bridge_encode_headers_)(static_cast<void*>(this), in_module_bridge_, end_stream);
164

            
165
18
  return Envoy::Http::okStatus();
166
19
}
167

            
168
8
void HttpTcpBridge::encodeData(Buffer::Instance& data, bool end_stream) {
169
8
  if (in_module_bridge_ == nullptr) {
170
1
    return;
171
1
  }
172
7
  downstream_complete_ = end_stream;
173

            
174
  // Move into a local buffer so the module reads from a stable copy. The module is expected
175
  // to forward the data via send_upstream_data, which writes to the connection and drains
176
  // naturally.
177
7
  Buffer::OwnedImpl local_buffer;
178
7
  local_buffer.move(data);
179
7
  request_buffer_ = &local_buffer;
180

            
181
  // The module callback may trigger decodeData with end_stream=true (e.g., via sendResponse),
182
  // which can cause the router to destroy this object. Do not access any member variables after
183
  // this call.
184
7
  (*config_->on_bridge_encode_data_)(static_cast<void*>(this), in_module_bridge_, end_stream);
185
7
}
186

            
187
3
void HttpTcpBridge::encodeTrailers(const Envoy::Http::RequestTrailerMap&) {
188
3
  if (in_module_bridge_ == nullptr) {
189
1
    return;
190
1
  }
191
2
  downstream_complete_ = true;
192

            
193
2
  (*config_->on_bridge_encode_trailers_)(static_cast<void*>(this), in_module_bridge_);
194
2
}
195

            
196
2
void HttpTcpBridge::readDisable(bool disable) {
197
2
  if (upstream_conn_data_->connection().state() != Network::Connection::State::Open) {
198
1
    return;
199
1
  }
200
1
  upstream_conn_data_->connection().readDisable(disable);
201
1
}
202

            
203
5
void HttpTcpBridge::resetStream() {
204
5
  upstream_request_ = nullptr;
205
5
  upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush,
206
5
                                          "dynamic_module_bridge_reset_stream");
207
5
}
208

            
209
7
void HttpTcpBridge::onUpstreamData(Buffer::Instance& data, bool end_stream) {
210
7
  if (in_module_bridge_ == nullptr || upstream_request_ == nullptr) {
211
2
    return;
212
2
  }
213

            
214
  // Move data into a local buffer before calling the module. The module callback may trigger
215
  // downstream processing that re-enables upstream reads, causing a re-entrant onUpstreamData
216
  // call. Moving the data first ensures the connection's read buffer is empty, preventing the
217
  // same data from being delivered twice.
218
5
  Buffer::OwnedImpl local_buffer;
219
5
  local_buffer.move(data);
220

            
221
5
  response_buffer_ = &local_buffer;
222
5
  bytes_meter_->addWireBytesReceived(local_buffer.length());
223

            
224
  // The module callback may trigger decodeData with end_stream=true, which can cause the router
225
  // to call resetStream() and ultimately destroy this object. Do not access any member variables
226
  // after this call.
227
5
  (*config_->on_bridge_on_upstream_data_)(static_cast<void*>(this), in_module_bridge_, end_stream);
228
5
}
229

            
230
5
void HttpTcpBridge::onEvent(Network::ConnectionEvent event) {
231
5
  if ((event == Network::ConnectionEvent::LocalClose ||
232
5
       event == Network::ConnectionEvent::RemoteClose) &&
233
5
      upstream_request_ != nullptr) {
234
3
    upstream_request_->onResetStream(Envoy::Http::StreamResetReason::ConnectionTermination, "");
235
3
  }
236
5
}
237

            
238
2
void HttpTcpBridge::onAboveWriteBufferHighWatermark() {
239
2
  if (upstream_request_) {
240
1
    upstream_request_->onAboveWriteBufferHighWatermark();
241
1
  }
242
2
}
243

            
244
2
void HttpTcpBridge::onBelowWriteBufferLowWatermark() {
245
2
  if (upstream_request_) {
246
1
    upstream_request_->onBelowWriteBufferLowWatermark();
247
1
  }
248
2
}
249

            
250
Envoy::Http::ResponseHeaderMapPtr
251
HttpTcpBridge::buildResponseHeaders(uint32_t status_code,
252
                                    envoy_dynamic_module_type_module_http_header* headers_vector,
253
10
                                    size_t headers_vector_size) {
254
10
  auto headers = Envoy::Http::ResponseHeaderMapImpl::create();
255
10
  headers->setStatus(status_code);
256
10
  if (headers_vector != nullptr) {
257
2
    for (size_t i = 0; i < headers_vector_size; i++) {
258
1
      const auto& header = headers_vector[i];
259
1
      const absl::string_view key(static_cast<const char*>(header.key_ptr), header.key_length);
260
1
      const absl::string_view value(static_cast<const char*>(header.value_ptr),
261
1
                                    header.value_length);
262
1
      headers->addCopy(Envoy::Http::LowerCaseString(key), value);
263
1
    }
264
1
  }
265
10
  return headers;
266
10
}
267

            
268
4
void HttpTcpBridge::sendUpstreamData(absl::string_view data, bool end_stream) {
269
4
  if (upstream_conn_data_ == nullptr) {
270
    return;
271
  }
272
4
  Buffer::OwnedImpl buffer;
273
4
  if (!data.empty()) {
274
3
    buffer.add(data);
275
3
  }
276
4
  if (buffer.length() > 0 || end_stream) {
277
4
    if (end_stream) {
278
1
      upstream_conn_data_->connection().enableHalfClose(true);
279
1
    }
280
4
    bytes_meter_->addWireBytesSent(buffer.length());
281
4
    upstream_conn_data_->connection().write(buffer, end_stream);
282
4
  }
283
4
}
284

            
285
void HttpTcpBridge::sendResponse(uint32_t status_code,
286
                                 envoy_dynamic_module_type_module_http_header* headers_vector,
287
7
                                 size_t headers_vector_size, absl::string_view body) {
288
7
  if (upstream_request_ == nullptr) {
289
1
    return;
290
1
  }
291
6
  auto headers = buildResponseHeaders(status_code, headers_vector, headers_vector_size);
292
6
  if (!body.empty()) {
293
4
    upstream_request_->decodeHeaders(std::move(headers), false);
294
4
    Buffer::OwnedImpl body_buffer(body);
295
4
    upstream_request_->decodeData(body_buffer, true);
296
4
  } else {
297
2
    upstream_request_->decodeHeaders(std::move(headers), true);
298
2
  }
299
6
}
300

            
301
void HttpTcpBridge::sendResponseHeaders(
302
    uint32_t status_code, envoy_dynamic_module_type_module_http_header* headers_vector,
303
5
    size_t headers_vector_size, bool end_stream) {
304
5
  if (upstream_request_ == nullptr) {
305
1
    return;
306
1
  }
307
4
  auto headers = buildResponseHeaders(status_code, headers_vector, headers_vector_size);
308
4
  upstream_request_->decodeHeaders(std::move(headers), end_stream);
309
4
}
310

            
311
5
void HttpTcpBridge::sendResponseData(absl::string_view data, bool end_stream) {
312
5
  if (upstream_request_ == nullptr) {
313
1
    return;
314
1
  }
315
4
  Buffer::OwnedImpl buffer(data);
316
4
  upstream_request_->decodeData(buffer, end_stream);
317
4
}
318

            
319
void HttpTcpBridge::sendResponseTrailers(
320
2
    envoy_dynamic_module_type_module_http_header* trailers_vector, size_t trailers_vector_size) {
321
2
  if (upstream_request_ == nullptr) {
322
1
    return;
323
1
  }
324
1
  auto trailers = Envoy::Http::ResponseTrailerMapImpl::create();
325
1
  if (trailers_vector != nullptr) {
326
3
    for (size_t i = 0; i < trailers_vector_size; i++) {
327
2
      const auto& trailer = trailers_vector[i];
328
2
      const absl::string_view key(static_cast<const char*>(trailer.key_ptr), trailer.key_length);
329
2
      const absl::string_view value(static_cast<const char*>(trailer.value_ptr),
330
2
                                    trailer.value_length);
331
2
      trailers->addCopy(Envoy::Http::LowerCaseString(key), value);
332
2
    }
333
1
  }
334
1
  upstream_request_->decodeTrailers(std::move(trailers));
335
1
}
336

            
337
} // namespace DynamicModules
338
} // namespace Http
339
} // namespace Upstreams
340
} // namespace Extensions
341
} // namespace Envoy