1
#pragma once
2

            
3
#include <atomic>
4
#include <map>
5
#include <memory>
6

            
7
#include "envoy/access_log/access_log.h"
8
#include "envoy/buffer/buffer.h"
9
#include "envoy/extensions/wasm/v3/wasm.pb.validate.h"
10
#include "envoy/http/filter.h"
11
#include "envoy/stats/sink.h"
12
#include "envoy/upstream/cluster_manager.h"
13

            
14
#include "source/common/common/assert.h"
15
#include "source/common/common/logger.h"
16
#include "source/extensions/common/wasm/plugin.h"
17
#include "source/extensions/filters/common/expr/cel_state.h"
18
#include "source/extensions/filters/common/expr/evaluator.h"
19

            
20
#include "eval/public/activation.h"
21
#include "include/proxy-wasm/wasm.h"
22

            
23
namespace Envoy {
24
namespace Extensions {
25
namespace Common {
26
namespace Wasm {
27

            
28
using proxy_wasm::BufferInterface;
29
using proxy_wasm::CloseType;
30
using proxy_wasm::ContextBase;
31
using proxy_wasm::Pairs;
32
using proxy_wasm::PairsWithStringValues;
33
using proxy_wasm::PluginBase;
34
using proxy_wasm::PluginHandleBase;
35
using proxy_wasm::SharedQueueDequeueToken;
36
using proxy_wasm::SharedQueueEnqueueToken;
37
using proxy_wasm::WasmBase;
38
using proxy_wasm::WasmBufferType;
39
using proxy_wasm::WasmHandleBase;
40
using proxy_wasm::WasmHeaderMapType;
41
using proxy_wasm::WasmResult;
42
using proxy_wasm::WasmStreamType;
43

            
44
using VmConfig = envoy::extensions::wasm::v3::VmConfig;
45
using CapabilityRestrictionConfig = envoy::extensions::wasm::v3::CapabilityRestrictionConfig;
46
using SanitizationConfig = envoy::extensions::wasm::v3::SanitizationConfig;
47
using GrpcService = envoy::config::core::v3::GrpcService;
48

            
49
class PluginHandle;
50
class Wasm;
51

            
52
using PluginBaseSharedPtr = std::shared_ptr<PluginBase>;
53
using PluginHandleBaseSharedPtr = std::shared_ptr<PluginHandleBase>;
54
using PluginHandleSharedPtr = std::shared_ptr<PluginHandle>;
55
using WasmHandleBaseSharedPtr = std::shared_ptr<WasmHandleBase>;
56

            
57
// Opaque context object.
58
class StorageObject {
59
public:
60
3
  virtual ~StorageObject() = default;
61
};
62

            
63
class Buffer : public proxy_wasm::BufferBase {
64
public:
65
2145
  Buffer() = default;
66

            
67
  // proxy_wasm::BufferInterface
68
  size_t size() const override;
69
  WasmResult copyTo(WasmBase* wasm, size_t start, size_t length, uint64_t ptr_ptr,
70
                    uint64_t size_ptr) const override;
71
  WasmResult copyFrom(size_t start, size_t length, std::string_view data) override;
72

            
73
  // proxy_wasm::BufferBase
74
837
  void clear() override {
75
837
    proxy_wasm::BufferBase::clear();
76
837
    const_buffer_instance_ = nullptr;
77
837
    buffer_instance_ = nullptr;
78
837
  }
79
212
  Buffer* set(std::string_view data) {
80
212
    return static_cast<Buffer*>(proxy_wasm::BufferBase::set(data));
81
212
  }
82
10
  Buffer* set(std::unique_ptr<char[]> owned_data, uint32_t owned_data_size) {
83
10
    return static_cast<Buffer*>(
84
10
        proxy_wasm::BufferBase::set(std::move(owned_data), owned_data_size));
85
10
  }
86

            
87
613
  Buffer* set(::Envoy::Buffer::Instance* buffer_instance) {
88
613
    clear();
89
613
    buffer_instance_ = buffer_instance;
90
613
    const_buffer_instance_ = buffer_instance;
91
613
    return this;
92
613
  }
93
2
  Buffer* set(const ::Envoy::Buffer::Instance* buffer_instance) {
94
2
    clear();
95
2
    const_buffer_instance_ = buffer_instance;
96
2
    return this;
97
2
  }
98

            
99
private:
100
  const ::Envoy::Buffer::Instance* const_buffer_instance_{};
101
  ::Envoy::Buffer::Instance* buffer_instance_{};
102
};
103

            
104
// A context which will be the target of callbacks for a particular session
105
// e.g. a handler of a stream.
106
class Context : public proxy_wasm::ContextBase,
107
                public Logger::Loggable<Logger::Id::wasm>,
108
                public AccessLog::Instance,
109
                public Http::StreamFilter,
110
                public Network::ConnectionCallbacks,
111
                public Network::Filter,
112
                public Filters::Common::Expr::StreamActivation,
113
                public std::enable_shared_from_this<Context> {
114
public:
115
  Context();                                          // Testing.
116
  Context(Wasm* wasm);                                // Vm Context.
117
  Context(Wasm* wasm, const PluginSharedPtr& plugin); // Root Context.
118
  Context(Wasm* wasm, uint32_t root_context_id,
119
          PluginHandleSharedPtr plugin_handle); // Stream context.
120
  ~Context() override;
121

            
122
  WasmBase* wasm() const override;
123
  Wasm* envoyWasm() const;
124
  Plugin* plugin() const;
125
  Context* rootContext() const;
126
  Upstream::ClusterManager& clusterManager() const;
127

            
128
  // proxy_wasm::ContextBase
129
  void error(std::string_view message) override;
130

            
131
  // Retrieves the stream info associated with the request (a.k.a active stream).
132
  // It selects a value based on the following order: encoder callback, decoder
133
  // callback, log callback, network read filter callback, network write filter
134
  // callback. As long as any one of the callbacks is invoked, the value should be
135
  // available.
136
  const StreamInfo::StreamInfo* getConstRequestStreamInfo() const;
137
  StreamInfo::StreamInfo* getRequestStreamInfo() const;
138

            
139
  // Retrieves the connection object associated with the request (a.k.a active stream).
140
  // It selects a value based on the following order: encoder callback, decoder
141
  // callback. As long as any one of the callbacks is invoked, the value should be
142
  // available.
143
  const Network::Connection* getConnection() const;
144

            
145
  //
146
  // VM level down-calls into the Wasm code on Context(id == 0).
147
  //
148
  virtual bool validateConfiguration(std::string_view configuration,
149
                                     const std::shared_ptr<PluginBase>& plugin); // deprecated
150

            
151
  // AccessLog::Instance
152
  void log(const Formatter::Context& log_context, const StreamInfo::StreamInfo& info) override;
153

            
154
  uint32_t getLogLevel() override;
155

            
156
  // Network::ConnectionCallbacks
157
  void onEvent(Network::ConnectionEvent event) override;
158
3
  void onAboveWriteBufferHighWatermark() override {}
159
3
  void onBelowWriteBufferLowWatermark() override {}
160

            
161
  // Network::ReadFilter
162
  Network::FilterStatus onNewConnection() override;
163
  Network::FilterStatus onData(::Envoy::Buffer::Instance& data, bool end_stream) override;
164
  void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;
165

            
166
  // Network::WriteFilter
167
  Network::FilterStatus onWrite(::Envoy::Buffer::Instance& data, bool end_stream) override;
168
  void initializeWriteFilterCallbacks(Network::WriteFilterCallbacks& callbacks) override;
169

            
170
  // proxy_wasm::ContextBase
171
  void onDownstreamConnectionClose(CloseType) override;
172
  void onUpstreamConnectionClose(CloseType) override;
173

            
174
  // Http::StreamFilterBase. Note: This calls onDone() in Wasm.
175
  void onDestroy() override;
176

            
177
  // Http::StreamDecoderFilter
178
  Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
179
                                          bool end_stream) override;
180
  Http::FilterDataStatus decodeData(::Envoy::Buffer::Instance& data, bool end_stream) override;
181
  Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override;
182
  Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap& metadata_map) override;
183
  void setDecoderFilterCallbacks(Envoy::Http::StreamDecoderFilterCallbacks& callbacks) override;
184

            
185
  // Http::StreamEncoderFilter
186
  Http::Filter1xxHeadersStatus encode1xxHeaders(Http::ResponseHeaderMap&) override;
187
  Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
188
                                          bool end_stream) override;
189
  Http::FilterDataStatus encodeData(::Envoy::Buffer::Instance& data, bool end_stream) override;
190
  Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap& trailers) override;
191
  Http::FilterMetadataStatus encodeMetadata(Http::MetadataMap& metadata_map) override;
192
  void setEncoderFilterCallbacks(Envoy::Http::StreamEncoderFilterCallbacks& callbacks) override;
193

            
194
  // VM calls out to host.
195
  // proxy_wasm::ContextBase
196

            
197
  // General
198
  WasmResult log(uint32_t level, std::string_view message) override;
199
  uint64_t getCurrentTimeNanoseconds() override;
200
  uint64_t getMonotonicTimeNanoseconds() override;
201
  std::string_view getConfiguration() override;
202
  std::pair<uint32_t, std::string_view> getStatus() override;
203

            
204
  // State accessors
205
  WasmResult getProperty(std::string_view path, std::string* result) override;
206
  WasmResult setProperty(std::string_view path, std::string_view value) override;
207
  WasmResult setEnvoyFilterState(std::string_view path, std::string_view value,
208
                                 StreamInfo::FilterState::LifeSpan life_span);
209
  WasmResult declareProperty(std::string_view path,
210
                             Filters::Common::Expr::CelStatePrototypeConstPtr state_prototype);
211

            
212
  // Continue
213
  WasmResult continueStream(WasmStreamType stream_type) override;
214
  WasmResult closeStream(WasmStreamType stream_type) override;
215
  void failStream(WasmStreamType stream_type) override;
216
  WasmResult sendLocalResponse(uint32_t response_code, std::string_view body_text,
217
                               Pairs additional_headers, uint32_t grpc_status,
218
                               std::string_view details) override;
219
93
  void clearRouteCache() override {
220
93
    if (decoder_callbacks_ && decoder_callbacks_->downstreamCallbacks()) {
221
88
      decoder_callbacks_->downstreamCallbacks()->clearRouteCache();
222
88
    }
223
93
  }
224

            
225
  // Header/Trailer/Metadata Maps
226
  WasmResult addHeaderMapValue(WasmHeaderMapType type, std::string_view key,
227
                               std::string_view value) override;
228
  WasmResult getHeaderMapValue(WasmHeaderMapType type, std::string_view key,
229
                               std::string_view* value) override;
230
  WasmResult getHeaderMapPairs(WasmHeaderMapType type, Pairs* result) override;
231
  WasmResult setHeaderMapPairs(WasmHeaderMapType type, const Pairs& pairs) override;
232

            
233
  WasmResult removeHeaderMapValue(WasmHeaderMapType type, std::string_view key) override;
234
  WasmResult replaceHeaderMapValue(WasmHeaderMapType type, std::string_view key,
235
                                   std::string_view value) override;
236

            
237
  WasmResult getHeaderMapSize(WasmHeaderMapType type, uint32_t* size) override;
238

            
239
  // Buffer
240
  BufferInterface* getBuffer(WasmBufferType type) override;
241
  // TODO: use stream_type.
242
3
  bool endOfStream(WasmStreamType /* stream_type */) override { return end_of_stream_; }
243

            
244
  // HTTP
245
  WasmResult httpCall(std::string_view cluster, const Pairs& request_headers,
246
                      std::string_view request_body, const Pairs& request_trailers,
247
                      int timeout_milliseconds, uint32_t* token_ptr) override;
248

            
249
  // Stats/Metrics
250
  WasmResult defineMetric(uint32_t type, std::string_view name, uint32_t* metric_id_ptr) override;
251
  WasmResult incrementMetric(uint32_t metric_id, int64_t offset) override;
252
  WasmResult recordMetric(uint32_t metric_id, uint64_t value) override;
253
  WasmResult getMetric(uint32_t metric_id, uint64_t* value_ptr) override;
254

            
255
  // gRPC
256
  WasmResult grpcCall(std::string_view grpc_service, std::string_view service_name,
257
                      std::string_view method_name, const Pairs& initial_metadata,
258
                      std::string_view request, std::chrono::milliseconds timeout,
259
                      uint32_t* token_ptr) override;
260
  WasmResult grpcStream(std::string_view grpc_service, std::string_view service_name,
261
                        std::string_view method_name, const Pairs& initial_metadat,
262
                        uint32_t* token_ptr) override;
263

            
264
  WasmResult grpcClose(uint32_t token) override;
265
  WasmResult grpcCancel(uint32_t token) override;
266
  WasmResult grpcSend(uint32_t token, std::string_view message, bool end_stream) override;
267

            
268
  // Envoy specific ABI
269
  void onResolveDns(uint32_t token, Envoy::Network::DnsResolver::ResolutionStatus status,
270
                    std::list<Envoy::Network::DnsResponse>&& response);
271

            
272
  void onStatsUpdate(Envoy::Stats::MetricSnapshot& snapshot);
273

            
274
  // CEL evaluation
275
  absl::optional<google::api::expr::runtime::CelValue>
276
  findValue(absl::string_view name, Protobuf::Arena* arena, bool last) const;
277
  absl::optional<google::api::expr::runtime::CelValue>
278
  FindValue(absl::string_view name, Protobuf::Arena* arena) const override;
279

            
280
  // Foreign function state
281
3
  virtual void setForeignData(absl::string_view data_name, std::unique_ptr<StorageObject> data) {
282
3
    data_storage_[data_name] = std::move(data);
283
3
  }
284
35
  template <typename T> T* getForeignData(absl::string_view data_name) {
285
35
    const auto& it = data_storage_.find(data_name);
286
35
    if (it == data_storage_.end()) {
287
3
      return nullptr;
288
3
    }
289
32
    return dynamic_cast<T*>(it->second.get());
290
35
  }
291

            
292
protected:
293
  friend class Wasm;
294

            
295
  void addAfterVmCallAction(std::function<void()> f);
296
  void onCloseTCP();
297

            
298
  struct AsyncClientHandler : public Http::AsyncClient::Callbacks {
299
    // Http::AsyncClient::Callbacks
300
    void onSuccess(const Http::AsyncClient::Request&,
301
16
                   Envoy::Http::ResponseMessagePtr&& response) override {
302
16
      request_ = nullptr;
303
16
      context_->onHttpCallSuccess(token_, std::move(response));
304
16
    }
305
    void onFailure(const Http::AsyncClient::Request&,
306
3
                   Http::AsyncClient::FailureReason reason) override {
307
3
      request_ = nullptr;
308
3
      context_->onHttpCallFailure(token_, reason);
309
3
    }
310
    void
311
    onBeforeFinalizeUpstreamSpan(Envoy::Tracing::Span& /* span */,
312
13
                                 const Http::ResponseHeaderMap* /* response_headers */) override {}
313

            
314
    Context* context_;
315
    uint32_t token_;
316
    Http::AsyncClient::Request* request_;
317
  };
318

            
319
  struct GrpcCallClientHandler : public Grpc::RawAsyncRequestCallbacks {
320
    // Grpc::AsyncRequestCallbacks
321
5
    void onCreateInitialMetadata(Http::RequestHeaderMap& initial_metadata) override {
322
5
      context_->onGrpcCreateInitialMetadata(token_, initial_metadata);
323
5
    }
324
7
    void onSuccessRaw(::Envoy::Buffer::InstancePtr&& response, Tracing::Span& /* span */) override {
325
7
      request_ = nullptr;
326
7
      context_->onGrpcReceiveWrapper(token_, std::move(response));
327
7
    }
328
    void onFailure(Grpc::Status::GrpcStatus status, const std::string& message,
329
7
                   Tracing::Span& /* span */) override {
330
7
      request_ = nullptr;
331
7
      context_->onGrpcCloseWrapper(token_, status, message);
332
7
    }
333

            
334
    Context* context_;
335
    uint32_t token_;
336
    Grpc::RawAsyncClientSharedPtr client_;
337
    Grpc::AsyncRequest* request_;
338
  };
339

            
340
  struct GrpcStreamClientHandler : public Grpc::RawAsyncStreamCallbacks {
341
    // Grpc::AsyncStreamCallbacks
342
25
    void onCreateInitialMetadata(Http::RequestHeaderMap& initial_metadata) override {
343
25
      context_->onGrpcCreateInitialMetadata(token_, initial_metadata);
344
25
    }
345
25
    void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override {
346
25
      context_->onGrpcReceiveInitialMetadataWrapper(token_, std::move(metadata));
347
25
    }
348
20
    bool onReceiveMessageRaw(::Envoy::Buffer::InstancePtr&& response) override {
349
20
      context_->onGrpcReceiveWrapper(token_, std::move(response));
350
20
      return true;
351
20
    }
352
10
    void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override {
353
10
      context_->onGrpcReceiveTrailingMetadataWrapper(token_, std::move(metadata));
354
10
    }
355
15
    void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override {
356
15
      remote_closed_ = true;
357
15
      stream_ = nullptr;
358
15
      context_->onGrpcCloseWrapper(token_, status, message);
359
15
    }
360

            
361
    Context* context_;
362
    uint32_t token_;
363
    Grpc::RawAsyncClientSharedPtr client_;
364
    Grpc::RawAsyncStream* stream_;
365
    bool local_closed_ = false;
366
    bool remote_closed_ = false;
367
  };
368

            
369
  void onHttpCallSuccess(uint32_t token, Envoy::Http::ResponseMessagePtr&& response);
370
  void onHttpCallFailure(uint32_t token, Http::AsyncClient::FailureReason reason);
371

            
372
  void onGrpcCreateInitialMetadata(uint32_t token, Http::RequestHeaderMap& metadata);
373
  void onGrpcReceiveInitialMetadataWrapper(uint32_t token, Http::HeaderMapPtr&& metadata);
374
  void onGrpcReceiveWrapper(uint32_t token, ::Envoy::Buffer::InstancePtr response);
375
  void onGrpcReceiveTrailingMetadataWrapper(uint32_t token, Http::HeaderMapPtr&& metadata);
376
  void onGrpcCloseWrapper(uint32_t token, const Grpc::Status::GrpcStatus& status,
377
                          const std::string_view message);
378

            
379
  Http::HeaderMap* getMap(WasmHeaderMapType type);
380
  const Http::HeaderMap* getConstMap(WasmHeaderMapType type);
381

            
382
137
  void onHeadersModified(WasmHeaderMapType type) {
383
137
    if (type != WasmHeaderMapType::RequestHeaders ||
384
137
        abi_version_ > proxy_wasm::AbiVersion::ProxyWasm_0_2_1) {
385
53
      return;
386
53
    }
387
84
    clearRouteCache();
388
84
  }
389

            
390
  const LocalInfo::LocalInfo* root_local_info_{nullptr}; // set only for root_context.
391
  PluginHandleSharedPtr plugin_handle_{nullptr};
392

            
393
  uint32_t next_http_call_token_ = 1;
394
  uint32_t next_grpc_token_ = 1; // Odd tokens are for Calls even for Streams.
395

            
396
  // Network callbacks.
397
  Network::ReadFilterCallbacks* network_read_filter_callbacks_{};
398
  Network::WriteFilterCallbacks* network_write_filter_callbacks_{};
399

            
400
  // HTTP callbacks.
401
  Envoy::Http::StreamDecoderFilterCallbacks* decoder_callbacks_{};
402
  Envoy::Http::StreamEncoderFilterCallbacks* encoder_callbacks_{};
403

            
404
  // Status.
405
  uint32_t status_code_{0};
406
  absl::string_view status_message_;
407

            
408
  // Network filter state.
409
  ::Envoy::Buffer::Instance* network_downstream_data_buffer_{};
410
  ::Envoy::Buffer::Instance* network_upstream_data_buffer_{};
411

            
412
  // HTTP filter state.
413
  Http::RequestHeaderMap* request_headers_{};
414
  Http::ResponseHeaderMap* response_headers_{};
415
  ::Envoy::Buffer::Instance* request_body_buffer_{};
416
  ::Envoy::Buffer::Instance* response_body_buffer_{};
417
  Http::RequestTrailerMap* request_trailers_{};
418
  Http::ResponseTrailerMap* response_trailers_{};
419
  Http::MetadataMap* request_metadata_{};
420
  Http::MetadataMap* response_metadata_{};
421

            
422
  // Only available during onHttpCallResponse.
423
  Envoy::Http::ResponseMessagePtr* http_call_response_{};
424

            
425
  Http::HeaderMapPtr grpc_receive_initial_metadata_;
426
  Http::HeaderMapPtr grpc_receive_trailing_metadata_;
427

            
428
  // Only available (non-nullptr) during onGrpcReceive.
429
  ::Envoy::Buffer::InstancePtr grpc_receive_buffer_;
430

            
431
  // Only available (non-nullptr) during grpcCall and grpcStream.
432
  Http::RequestHeaderMapPtr grpc_initial_metadata_;
433

            
434
  // Access log state.
435
  bool access_log_phase_ = false;
436
  const StreamInfo::StreamInfo* access_log_stream_info_{};
437
  const Http::RequestHeaderMap* access_log_request_headers_{};
438
  const Http::ResponseHeaderMap* access_log_response_headers_{};
439
  const Http::ResponseTrailerMap* access_log_response_trailers_{};
440

            
441
  // Temporary state.
442
  Buffer buffer_;
443
  bool buffering_request_body_ = false;
444
  bool buffering_response_body_ = false;
445
  bool end_of_stream_ = false;
446
  bool failure_local_reply_sent_ = false;
447

            
448
  // MB: must be a node-type map as we take persistent references to the entries.
449
  std::map<uint32_t, AsyncClientHandler> http_request_;
450
  std::map<uint32_t, GrpcCallClientHandler> grpc_call_request_;
451
  std::map<uint32_t, GrpcStreamClientHandler> grpc_stream_;
452

            
453
  // Opaque state.
454
  absl::flat_hash_map<std::string, std::unique_ptr<StorageObject>> data_storage_;
455

            
456
  // TCP State.
457
  bool upstream_closed_ = false;
458
  bool downstream_closed_ = false;
459
  bool tcp_connection_closed_ = false;
460

            
461
  // Filter state prototype declaration.
462
  absl::flat_hash_map<std::string, Filters::Common::Expr::CelStatePrototypeConstPtr>
463
      state_prototypes_;
464

            
465
  proxy_wasm::AbiVersion abi_version_{proxy_wasm::AbiVersion::Unknown};
466
};
467
using ContextSharedPtr = std::shared_ptr<Context>;
468

            
469
WasmResult serializeValue(Filters::Common::Expr::CelValue value, std::string* result);
470

            
471
} // namespace Wasm
472
} // namespace Common
473
} // namespace Extensions
474
} // namespace Envoy