Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/extensions/common/wasm/context.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <atomic>
4
#include <map>
5
#include <memory>
6
7
#include "envoy/access_log/access_log.h"
8
#include "envoy/buffer/buffer.h"
9
#include "envoy/extensions/wasm/v3/wasm.pb.validate.h"
10
#include "envoy/http/filter.h"
11
#include "envoy/stats/sink.h"
12
#include "envoy/upstream/cluster_manager.h"
13
14
#include "source/common/common/assert.h"
15
#include "source/common/common/logger.h"
16
#include "source/extensions/common/wasm/plugin.h"
17
#include "source/extensions/filters/common/expr/cel_state.h"
18
#include "source/extensions/filters/common/expr/evaluator.h"
19
20
#include "eval/public/activation.h"
21
#include "include/proxy-wasm/wasm.h"
22
23
namespace Envoy {
24
namespace Extensions {
25
namespace Common {
26
namespace Wasm {
27
28
using proxy_wasm::BufferInterface;
29
using proxy_wasm::CloseType;
30
using proxy_wasm::ContextBase;
31
using proxy_wasm::Pairs;
32
using proxy_wasm::PairsWithStringValues;
33
using proxy_wasm::PluginBase;
34
using proxy_wasm::PluginHandleBase;
35
using proxy_wasm::SharedQueueDequeueToken;
36
using proxy_wasm::SharedQueueEnqueueToken;
37
using proxy_wasm::WasmBase;
38
using proxy_wasm::WasmBufferType;
39
using proxy_wasm::WasmHandleBase;
40
using proxy_wasm::WasmHeaderMapType;
41
using proxy_wasm::WasmResult;
42
using proxy_wasm::WasmStreamType;
43
44
using VmConfig = envoy::extensions::wasm::v3::VmConfig;
45
using CapabilityRestrictionConfig = envoy::extensions::wasm::v3::CapabilityRestrictionConfig;
46
using SanitizationConfig = envoy::extensions::wasm::v3::SanitizationConfig;
47
using GrpcService = envoy::config::core::v3::GrpcService;
48
49
class PluginHandle;
50
class Wasm;
51
52
using PluginBaseSharedPtr = std::shared_ptr<PluginBase>;
53
using PluginHandleBaseSharedPtr = std::shared_ptr<PluginHandleBase>;
54
using PluginHandleSharedPtr = std::shared_ptr<PluginHandle>;
55
using WasmHandleBaseSharedPtr = std::shared_ptr<WasmHandleBase>;
56
57
// Opaque context object.
58
class StorageObject {
59
public:
60
0
  virtual ~StorageObject() = default;
61
};
62
63
class Buffer : public proxy_wasm::BufferBase {
64
public:
65
0
  Buffer() = default;
66
67
  // proxy_wasm::BufferInterface
68
  size_t size() const override;
69
  WasmResult copyTo(WasmBase* wasm, size_t start, size_t length, uint64_t ptr_ptr,
70
                    uint64_t size_ptr) const override;
71
  WasmResult copyFrom(size_t start, size_t length, std::string_view data) override;
72
73
  // proxy_wasm::BufferBase
74
0
  void clear() override {
75
0
    proxy_wasm::BufferBase::clear();
76
0
    const_buffer_instance_ = nullptr;
77
0
    buffer_instance_ = nullptr;
78
0
  }
79
0
  Buffer* set(std::string_view data) {
80
0
    return static_cast<Buffer*>(proxy_wasm::BufferBase::set(data));
81
0
  }
82
0
  Buffer* set(std::unique_ptr<char[]> owned_data, uint32_t owned_data_size) {
83
0
    return static_cast<Buffer*>(
84
0
        proxy_wasm::BufferBase::set(std::move(owned_data), owned_data_size));
85
0
  }
86
87
0
  Buffer* set(::Envoy::Buffer::Instance* buffer_instance) {
88
0
    clear();
89
0
    buffer_instance_ = buffer_instance;
90
0
    const_buffer_instance_ = buffer_instance;
91
0
    return this;
92
0
  }
93
0
  Buffer* set(const ::Envoy::Buffer::Instance* buffer_instance) {
94
0
    clear();
95
0
    const_buffer_instance_ = buffer_instance;
96
0
    return this;
97
0
  }
98
99
private:
100
  const ::Envoy::Buffer::Instance* const_buffer_instance_{};
101
  ::Envoy::Buffer::Instance* buffer_instance_{};
102
};
103
104
// A context which will be the target of callbacks for a particular session
105
// e.g. a handler of a stream.
106
class Context : public proxy_wasm::ContextBase,
107
                public Logger::Loggable<Logger::Id::wasm>,
108
                public AccessLog::Instance,
109
                public Http::StreamFilter,
110
                public Network::ConnectionCallbacks,
111
                public Network::Filter,
112
                public Filters::Common::Expr::StreamActivation,
113
                public std::enable_shared_from_this<Context> {
114
public:
115
  Context();                                          // Testing.
116
  Context(Wasm* wasm);                                // Vm Context.
117
  Context(Wasm* wasm, const PluginSharedPtr& plugin); // Root Context.
118
  Context(Wasm* wasm, uint32_t root_context_id,
119
          PluginHandleSharedPtr plugin_handle); // Stream context.
120
  ~Context() override;
121
122
  Wasm* wasm() const;
123
  Plugin* plugin() const;
124
  Context* rootContext() const;
125
  Upstream::ClusterManager& clusterManager() const;
126
127
  // proxy_wasm::ContextBase
128
  void error(std::string_view message) override;
129
130
  // Retrieves the stream info associated with the request (a.k.a active stream).
131
  // It selects a value based on the following order: encoder callback, decoder
132
  // callback, log callback, network read filter callback, network write filter
133
  // callback. As long as any one of the callbacks is invoked, the value should be
134
  // available.
135
  const StreamInfo::StreamInfo* getConstRequestStreamInfo() const;
136
  StreamInfo::StreamInfo* getRequestStreamInfo() const;
137
138
  // Retrieves the connection object associated with the request (a.k.a active stream).
139
  // It selects a value based on the following order: encoder callback, decoder
140
  // callback. As long as any one of the callbacks is invoked, the value should be
141
  // available.
142
  const Network::Connection* getConnection() const;
143
144
  //
145
  // VM level down-calls into the Wasm code on Context(id == 0).
146
  //
147
  virtual bool validateConfiguration(std::string_view configuration,
148
                                     const std::shared_ptr<PluginBase>& plugin); // deprecated
149
150
  // AccessLog::Instance
151
  void log(const Formatter::HttpFormatterContext& log_context,
152
           const StreamInfo::StreamInfo& info) override;
153
154
  uint32_t getLogLevel() override;
155
156
  // Network::ConnectionCallbacks
157
  void onEvent(Network::ConnectionEvent event) override;
158
0
  void onAboveWriteBufferHighWatermark() override {}
159
0
  void onBelowWriteBufferLowWatermark() override {}
160
161
  // Network::ReadFilter
162
  Network::FilterStatus onNewConnection() override;
163
  Network::FilterStatus onData(::Envoy::Buffer::Instance& data, bool end_stream) override;
164
  void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;
165
166
  // Network::WriteFilter
167
  Network::FilterStatus onWrite(::Envoy::Buffer::Instance& data, bool end_stream) override;
168
  void initializeWriteFilterCallbacks(Network::WriteFilterCallbacks& callbacks) override;
169
170
  // proxy_wasm::ContextBase
171
  void onDownstreamConnectionClose(CloseType) override;
172
  void onUpstreamConnectionClose(CloseType) override;
173
174
  // Http::StreamFilterBase. Note: This calls onDone() in Wasm.
175
  void onDestroy() override;
176
177
  // Http::StreamDecoderFilter
178
  Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
179
                                          bool end_stream) override;
180
  Http::FilterDataStatus decodeData(::Envoy::Buffer::Instance& data, bool end_stream) override;
181
  Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override;
182
  Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap& metadata_map) override;
183
  void setDecoderFilterCallbacks(Envoy::Http::StreamDecoderFilterCallbacks& callbacks) override;
184
185
  // Http::StreamEncoderFilter
186
  Http::Filter1xxHeadersStatus encode1xxHeaders(Http::ResponseHeaderMap&) override;
187
  Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
188
                                          bool end_stream) override;
189
  Http::FilterDataStatus encodeData(::Envoy::Buffer::Instance& data, bool end_stream) override;
190
  Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap& trailers) override;
191
  Http::FilterMetadataStatus encodeMetadata(Http::MetadataMap& metadata_map) override;
192
  void setEncoderFilterCallbacks(Envoy::Http::StreamEncoderFilterCallbacks& callbacks) override;
193
194
  // VM calls out to host.
195
  // proxy_wasm::ContextBase
196
197
  // General
198
  WasmResult log(uint32_t level, std::string_view message) override;
199
  uint64_t getCurrentTimeNanoseconds() override;
200
  uint64_t getMonotonicTimeNanoseconds() override;
201
  std::string_view getConfiguration() override;
202
  std::pair<uint32_t, std::string_view> getStatus() override;
203
204
  // State accessors
205
  WasmResult getProperty(std::string_view path, std::string* result) override;
206
  WasmResult setProperty(std::string_view path, std::string_view value) override;
207
  WasmResult setEnvoyFilterState(std::string_view path, std::string_view value,
208
                                 StreamInfo::FilterState::LifeSpan life_span);
209
  WasmResult declareProperty(std::string_view path,
210
                             Filters::Common::Expr::CelStatePrototypeConstPtr state_prototype);
211
212
  // Continue
213
  WasmResult continueStream(WasmStreamType stream_type) override;
214
  WasmResult closeStream(WasmStreamType stream_type) override;
215
  void failStream(WasmStreamType stream_type) override;
216
  WasmResult sendLocalResponse(uint32_t response_code, std::string_view body_text,
217
                               Pairs additional_headers, uint32_t grpc_status,
218
                               std::string_view details) override;
219
0
  void clearRouteCache() override {
220
0
    if (decoder_callbacks_ && decoder_callbacks_->downstreamCallbacks()) {
221
0
      decoder_callbacks_->downstreamCallbacks()->clearRouteCache();
222
0
    }
223
0
  }
224
225
  // Header/Trailer/Metadata Maps
226
  WasmResult addHeaderMapValue(WasmHeaderMapType type, std::string_view key,
227
                               std::string_view value) override;
228
  WasmResult getHeaderMapValue(WasmHeaderMapType type, std::string_view key,
229
                               std::string_view* value) override;
230
  WasmResult getHeaderMapPairs(WasmHeaderMapType type, Pairs* result) override;
231
  WasmResult setHeaderMapPairs(WasmHeaderMapType type, const Pairs& pairs) override;
232
233
  WasmResult removeHeaderMapValue(WasmHeaderMapType type, std::string_view key) override;
234
  WasmResult replaceHeaderMapValue(WasmHeaderMapType type, std::string_view key,
235
                                   std::string_view value) override;
236
237
  WasmResult getHeaderMapSize(WasmHeaderMapType type, uint32_t* size) override;
238
239
  // Buffer
240
  BufferInterface* getBuffer(WasmBufferType type) override;
241
  // TODO: use stream_type.
242
0
  bool endOfStream(WasmStreamType /* stream_type */) override { return end_of_stream_; }
243
244
  // HTTP
245
  WasmResult httpCall(std::string_view cluster, const Pairs& request_headers,
246
                      std::string_view request_body, const Pairs& request_trailers,
247
                      int timeout_milliseconds, uint32_t* token_ptr) override;
248
249
  // Stats/Metrics
250
  WasmResult defineMetric(uint32_t type, std::string_view name, uint32_t* metric_id_ptr) override;
251
  WasmResult incrementMetric(uint32_t metric_id, int64_t offset) override;
252
  WasmResult recordMetric(uint32_t metric_id, uint64_t value) override;
253
  WasmResult getMetric(uint32_t metric_id, uint64_t* value_ptr) override;
254
255
  // gRPC
256
  WasmResult grpcCall(std::string_view grpc_service, std::string_view service_name,
257
                      std::string_view method_name, const Pairs& initial_metadata,
258
                      std::string_view request, std::chrono::milliseconds timeout,
259
                      uint32_t* token_ptr) override;
260
  WasmResult grpcStream(std::string_view grpc_service, std::string_view service_name,
261
                        std::string_view method_name, const Pairs& initial_metadat,
262
                        uint32_t* token_ptr) override;
263
264
  WasmResult grpcClose(uint32_t token) override;
265
  WasmResult grpcCancel(uint32_t token) override;
266
  WasmResult grpcSend(uint32_t token, std::string_view message, bool end_stream) override;
267
268
  // Envoy specific ABI
269
  void onResolveDns(uint32_t token, Envoy::Network::DnsResolver::ResolutionStatus status,
270
                    std::list<Envoy::Network::DnsResponse>&& response);
271
272
  void onStatsUpdate(Envoy::Stats::MetricSnapshot& snapshot);
273
274
  // CEL evaluation
275
  absl::optional<google::api::expr::runtime::CelValue>
276
  findValue(absl::string_view name, Protobuf::Arena* arena, bool last) const;
277
  absl::optional<google::api::expr::runtime::CelValue>
278
  FindValue(absl::string_view name, Protobuf::Arena* arena) const override;
279
280
  // Foreign function state
281
0
  virtual void setForeignData(absl::string_view data_name, std::unique_ptr<StorageObject> data) {
282
0
    data_storage_[data_name] = std::move(data);
283
0
  }
284
0
  template <typename T> T* getForeignData(absl::string_view data_name) {
285
0
    const auto& it = data_storage_.find(data_name);
286
0
    if (it == data_storage_.end()) {
287
0
      return nullptr;
288
0
    }
289
0
    return dynamic_cast<T*>(it->second.get());
290
0
  }
291
292
protected:
293
  friend class Wasm;
294
295
  void addAfterVmCallAction(std::function<void()> f);
296
  void onCloseTCP();
297
298
  struct AsyncClientHandler : public Http::AsyncClient::Callbacks {
299
    // Http::AsyncClient::Callbacks
300
    void onSuccess(const Http::AsyncClient::Request&,
301
0
                   Envoy::Http::ResponseMessagePtr&& response) override {
302
0
      context_->onHttpCallSuccess(token_, std::move(response));
303
0
    }
304
    void onFailure(const Http::AsyncClient::Request&,
305
0
                   Http::AsyncClient::FailureReason reason) override {
306
0
      context_->onHttpCallFailure(token_, reason);
307
0
    }
308
    void
309
    onBeforeFinalizeUpstreamSpan(Envoy::Tracing::Span& /* span */,
310
0
                                 const Http::ResponseHeaderMap* /* response_headers */) override {}
311
312
    Context* context_;
313
    uint32_t token_;
314
    Http::AsyncClient::Request* request_;
315
  };
316
317
  struct GrpcCallClientHandler : public Grpc::RawAsyncRequestCallbacks {
318
    // Grpc::AsyncRequestCallbacks
319
0
    void onCreateInitialMetadata(Http::RequestHeaderMap& initial_metadata) override {
320
0
      context_->onGrpcCreateInitialMetadata(token_, initial_metadata);
321
0
    }
322
0
    void onSuccessRaw(::Envoy::Buffer::InstancePtr&& response, Tracing::Span& /* span */) override {
323
0
      context_->onGrpcReceiveWrapper(token_, std::move(response));
324
0
    }
325
    void onFailure(Grpc::Status::GrpcStatus status, const std::string& message,
326
0
                   Tracing::Span& /* span */) override {
327
0
      context_->onGrpcCloseWrapper(token_, status, message);
328
0
    }
329
330
    Context* context_;
331
    uint32_t token_;
332
    Grpc::RawAsyncClientSharedPtr client_;
333
    Grpc::AsyncRequest* request_;
334
  };
335
336
  struct GrpcStreamClientHandler : public Grpc::RawAsyncStreamCallbacks {
337
    // Grpc::AsyncStreamCallbacks
338
0
    void onCreateInitialMetadata(Http::RequestHeaderMap& initial_metadata) override {
339
0
      context_->onGrpcCreateInitialMetadata(token_, initial_metadata);
340
0
    }
341
0
    void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override {
342
0
      context_->onGrpcReceiveInitialMetadataWrapper(token_, std::move(metadata));
343
0
    }
344
0
    bool onReceiveMessageRaw(::Envoy::Buffer::InstancePtr&& response) override {
345
0
      context_->onGrpcReceiveWrapper(token_, std::move(response));
346
0
      return true;
347
0
    }
348
0
    void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override {
349
0
      context_->onGrpcReceiveTrailingMetadataWrapper(token_, std::move(metadata));
350
0
    }
351
0
    void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override {
352
0
      remote_closed_ = true;
353
0
      context_->onGrpcCloseWrapper(token_, status, message);
354
0
    }
355
356
    Context* context_;
357
    uint32_t token_;
358
    Grpc::RawAsyncClientSharedPtr client_;
359
    Grpc::RawAsyncStream* stream_;
360
    bool local_closed_ = false;
361
    bool remote_closed_ = false;
362
  };
363
364
  void onHttpCallSuccess(uint32_t token, Envoy::Http::ResponseMessagePtr&& response);
365
  void onHttpCallFailure(uint32_t token, Http::AsyncClient::FailureReason reason);
366
367
  void onGrpcCreateInitialMetadata(uint32_t token, Http::RequestHeaderMap& metadata);
368
  void onGrpcReceiveInitialMetadataWrapper(uint32_t token, Http::HeaderMapPtr&& metadata);
369
  void onGrpcReceiveWrapper(uint32_t token, ::Envoy::Buffer::InstancePtr response);
370
  void onGrpcReceiveTrailingMetadataWrapper(uint32_t token, Http::HeaderMapPtr&& metadata);
371
  void onGrpcCloseWrapper(uint32_t token, const Grpc::Status::GrpcStatus& status,
372
                          const std::string_view message);
373
374
  Http::HeaderMap* getMap(WasmHeaderMapType type);
375
  const Http::HeaderMap* getConstMap(WasmHeaderMapType type);
376
377
  const LocalInfo::LocalInfo* root_local_info_{nullptr}; // set only for root_context.
378
  PluginHandleSharedPtr plugin_handle_{nullptr};
379
380
  uint32_t next_http_call_token_ = 1;
381
  uint32_t next_grpc_token_ = 1; // Odd tokens are for Calls even for Streams.
382
383
  // Network callbacks.
384
  Network::ReadFilterCallbacks* network_read_filter_callbacks_{};
385
  Network::WriteFilterCallbacks* network_write_filter_callbacks_{};
386
387
  // HTTP callbacks.
388
  Envoy::Http::StreamDecoderFilterCallbacks* decoder_callbacks_{};
389
  Envoy::Http::StreamEncoderFilterCallbacks* encoder_callbacks_{};
390
391
  // Status.
392
  uint32_t status_code_{0};
393
  absl::string_view status_message_;
394
395
  // Network filter state.
396
  ::Envoy::Buffer::Instance* network_downstream_data_buffer_{};
397
  ::Envoy::Buffer::Instance* network_upstream_data_buffer_{};
398
399
  // HTTP filter state.
400
  Http::RequestHeaderMap* request_headers_{};
401
  Http::ResponseHeaderMap* response_headers_{};
402
  ::Envoy::Buffer::Instance* request_body_buffer_{};
403
  ::Envoy::Buffer::Instance* response_body_buffer_{};
404
  Http::RequestTrailerMap* request_trailers_{};
405
  Http::ResponseTrailerMap* response_trailers_{};
406
  Http::MetadataMap* request_metadata_{};
407
  Http::MetadataMap* response_metadata_{};
408
409
  // Only available during onHttpCallResponse.
410
  Envoy::Http::ResponseMessagePtr* http_call_response_{};
411
412
  Http::HeaderMapPtr grpc_receive_initial_metadata_{};
413
  Http::HeaderMapPtr grpc_receive_trailing_metadata_{};
414
415
  // Only available (non-nullptr) during onGrpcReceive.
416
  ::Envoy::Buffer::InstancePtr grpc_receive_buffer_;
417
418
  // Only available (non-nullptr) during grpcCall and grpcStream.
419
  Http::RequestHeaderMapPtr grpc_initial_metadata_;
420
421
  // Access log state.
422
  bool access_log_phase_ = false;
423
  const StreamInfo::StreamInfo* access_log_stream_info_{};
424
  const Http::RequestHeaderMap* access_log_request_headers_{};
425
  const Http::ResponseHeaderMap* access_log_response_headers_{};
426
  const Http::ResponseTrailerMap* access_log_response_trailers_{};
427
428
  // Temporary state.
429
  Buffer buffer_;
430
  bool buffering_request_body_ = false;
431
  bool buffering_response_body_ = false;
432
  bool end_of_stream_ = false;
433
  bool local_reply_sent_ = false;
434
  bool local_reply_hold_ = false;
435
  ProtobufWkt::Struct temporary_metadata_;
436
437
  // MB: must be a node-type map as we take persistent references to the entries.
438
  std::map<uint32_t, AsyncClientHandler> http_request_;
439
  std::map<uint32_t, GrpcCallClientHandler> grpc_call_request_;
440
  std::map<uint32_t, GrpcStreamClientHandler> grpc_stream_;
441
442
  // Opaque state.
443
  absl::flat_hash_map<std::string, std::unique_ptr<StorageObject>> data_storage_;
444
445
  // TCP State.
446
  bool upstream_closed_ = false;
447
  bool downstream_closed_ = false;
448
  bool tcp_connection_closed_ = false;
449
450
  // Filter state prototype declaration.
451
  absl::flat_hash_map<std::string, Filters::Common::Expr::CelStatePrototypeConstPtr>
452
      state_prototypes_;
453
};
454
using ContextSharedPtr = std::shared_ptr<Context>;
455
456
WasmResult serializeValue(Filters::Common::Expr::CelValue value, std::string* result);
457
458
} // namespace Wasm
459
} // namespace Common
460
} // namespace Extensions
461
} // namespace Envoy