Line data Source code
1 : #pragma once
2 :
3 : #include <atomic>
4 : #include <map>
5 : #include <memory>
6 :
7 : #include "envoy/access_log/access_log.h"
8 : #include "envoy/buffer/buffer.h"
9 : #include "envoy/extensions/wasm/v3/wasm.pb.validate.h"
10 : #include "envoy/http/filter.h"
11 : #include "envoy/stats/sink.h"
12 : #include "envoy/upstream/cluster_manager.h"
13 :
14 : #include "source/common/common/assert.h"
15 : #include "source/common/common/logger.h"
16 : #include "source/extensions/common/wasm/plugin.h"
17 : #include "source/extensions/filters/common/expr/cel_state.h"
18 : #include "source/extensions/filters/common/expr/evaluator.h"
19 :
20 : #include "eval/public/activation.h"
21 : #include "include/proxy-wasm/wasm.h"
22 :
23 : namespace Envoy {
24 : namespace Extensions {
25 : namespace Common {
26 : namespace Wasm {
27 :
28 : using proxy_wasm::BufferInterface;
29 : using proxy_wasm::CloseType;
30 : using proxy_wasm::ContextBase;
31 : using proxy_wasm::Pairs;
32 : using proxy_wasm::PairsWithStringValues;
33 : using proxy_wasm::PluginBase;
34 : using proxy_wasm::PluginHandleBase;
35 : using proxy_wasm::SharedQueueDequeueToken;
36 : using proxy_wasm::SharedQueueEnqueueToken;
37 : using proxy_wasm::WasmBase;
38 : using proxy_wasm::WasmBufferType;
39 : using proxy_wasm::WasmHandleBase;
40 : using proxy_wasm::WasmHeaderMapType;
41 : using proxy_wasm::WasmResult;
42 : using proxy_wasm::WasmStreamType;
43 :
44 : using VmConfig = envoy::extensions::wasm::v3::VmConfig;
45 : using CapabilityRestrictionConfig = envoy::extensions::wasm::v3::CapabilityRestrictionConfig;
46 : using SanitizationConfig = envoy::extensions::wasm::v3::SanitizationConfig;
47 : using GrpcService = envoy::config::core::v3::GrpcService;
48 :
49 : class PluginHandle;
50 : class Wasm;
51 :
52 : using PluginBaseSharedPtr = std::shared_ptr<PluginBase>;
53 : using PluginHandleBaseSharedPtr = std::shared_ptr<PluginHandleBase>;
54 : using PluginHandleSharedPtr = std::shared_ptr<PluginHandle>;
55 : using WasmHandleBaseSharedPtr = std::shared_ptr<WasmHandleBase>;
56 :
57 : // Opaque context object.
58 : class StorageObject {
59 : public:
60 0 : virtual ~StorageObject() = default;
61 : };
62 :
63 : class Buffer : public proxy_wasm::BufferBase {
64 : public:
65 0 : Buffer() = default;
66 :
67 : // proxy_wasm::BufferInterface
68 : size_t size() const override;
69 : WasmResult copyTo(WasmBase* wasm, size_t start, size_t length, uint64_t ptr_ptr,
70 : uint64_t size_ptr) const override;
71 : WasmResult copyFrom(size_t start, size_t length, std::string_view data) override;
72 :
73 : // proxy_wasm::BufferBase
74 0 : void clear() override {
75 0 : proxy_wasm::BufferBase::clear();
76 0 : const_buffer_instance_ = nullptr;
77 0 : buffer_instance_ = nullptr;
78 0 : }
79 0 : Buffer* set(std::string_view data) {
80 0 : return static_cast<Buffer*>(proxy_wasm::BufferBase::set(data));
81 0 : }
82 0 : Buffer* set(std::unique_ptr<char[]> owned_data, uint32_t owned_data_size) {
83 0 : return static_cast<Buffer*>(
84 0 : proxy_wasm::BufferBase::set(std::move(owned_data), owned_data_size));
85 0 : }
86 :
87 0 : Buffer* set(::Envoy::Buffer::Instance* buffer_instance) {
88 0 : clear();
89 0 : buffer_instance_ = buffer_instance;
90 0 : const_buffer_instance_ = buffer_instance;
91 0 : return this;
92 0 : }
93 0 : Buffer* set(const ::Envoy::Buffer::Instance* buffer_instance) {
94 0 : clear();
95 0 : const_buffer_instance_ = buffer_instance;
96 0 : return this;
97 0 : }
98 :
99 : private:
100 : const ::Envoy::Buffer::Instance* const_buffer_instance_{};
101 : ::Envoy::Buffer::Instance* buffer_instance_{};
102 : };
103 :
104 : // A context which will be the target of callbacks for a particular session
105 : // e.g. a handler of a stream.
106 : class Context : public proxy_wasm::ContextBase,
107 : public Logger::Loggable<Logger::Id::wasm>,
108 : public AccessLog::Instance,
109 : public Http::StreamFilter,
110 : public Network::ConnectionCallbacks,
111 : public Network::Filter,
112 : public Filters::Common::Expr::StreamActivation,
113 : public std::enable_shared_from_this<Context> {
114 : public:
115 : Context(); // Testing.
116 : Context(Wasm* wasm); // Vm Context.
117 : Context(Wasm* wasm, const PluginSharedPtr& plugin); // Root Context.
118 : Context(Wasm* wasm, uint32_t root_context_id,
119 : PluginHandleSharedPtr plugin_handle); // Stream context.
120 : ~Context() override;
121 :
122 : Wasm* wasm() const;
123 : Plugin* plugin() const;
124 : Context* rootContext() const;
125 : Upstream::ClusterManager& clusterManager() const;
126 :
127 : // proxy_wasm::ContextBase
128 : void error(std::string_view message) override;
129 :
130 : // Retrieves the stream info associated with the request (a.k.a active stream).
131 : // It selects a value based on the following order: encoder callback, decoder
132 : // callback, log callback, network read filter callback, network write filter
133 : // callback. As long as any one of the callbacks is invoked, the value should be
134 : // available.
135 : const StreamInfo::StreamInfo* getConstRequestStreamInfo() const;
136 : StreamInfo::StreamInfo* getRequestStreamInfo() const;
137 :
138 : // Retrieves the connection object associated with the request (a.k.a active stream).
139 : // It selects a value based on the following order: encoder callback, decoder
140 : // callback. As long as any one of the callbacks is invoked, the value should be
141 : // available.
142 : const Network::Connection* getConnection() const;
143 :
144 : //
145 : // VM level down-calls into the Wasm code on Context(id == 0).
146 : //
147 : virtual bool validateConfiguration(std::string_view configuration,
148 : const std::shared_ptr<PluginBase>& plugin); // deprecated
149 :
150 : // AccessLog::Instance
151 : void log(const Formatter::HttpFormatterContext& log_context,
152 : const StreamInfo::StreamInfo& info) override;
153 :
154 : uint32_t getLogLevel() override;
155 :
156 : // Network::ConnectionCallbacks
157 : void onEvent(Network::ConnectionEvent event) override;
158 0 : void onAboveWriteBufferHighWatermark() override {}
159 0 : void onBelowWriteBufferLowWatermark() override {}
160 :
161 : // Network::ReadFilter
162 : Network::FilterStatus onNewConnection() override;
163 : Network::FilterStatus onData(::Envoy::Buffer::Instance& data, bool end_stream) override;
164 : void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;
165 :
166 : // Network::WriteFilter
167 : Network::FilterStatus onWrite(::Envoy::Buffer::Instance& data, bool end_stream) override;
168 : void initializeWriteFilterCallbacks(Network::WriteFilterCallbacks& callbacks) override;
169 :
170 : // proxy_wasm::ContextBase
171 : void onDownstreamConnectionClose(CloseType) override;
172 : void onUpstreamConnectionClose(CloseType) override;
173 :
174 : // Http::StreamFilterBase. Note: This calls onDone() in Wasm.
175 : void onDestroy() override;
176 :
177 : // Http::StreamDecoderFilter
178 : Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
179 : bool end_stream) override;
180 : Http::FilterDataStatus decodeData(::Envoy::Buffer::Instance& data, bool end_stream) override;
181 : Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override;
182 : Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap& metadata_map) override;
183 : void setDecoderFilterCallbacks(Envoy::Http::StreamDecoderFilterCallbacks& callbacks) override;
184 :
185 : // Http::StreamEncoderFilter
186 : Http::Filter1xxHeadersStatus encode1xxHeaders(Http::ResponseHeaderMap&) override;
187 : Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
188 : bool end_stream) override;
189 : Http::FilterDataStatus encodeData(::Envoy::Buffer::Instance& data, bool end_stream) override;
190 : Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap& trailers) override;
191 : Http::FilterMetadataStatus encodeMetadata(Http::MetadataMap& metadata_map) override;
192 : void setEncoderFilterCallbacks(Envoy::Http::StreamEncoderFilterCallbacks& callbacks) override;
193 :
194 : // VM calls out to host.
195 : // proxy_wasm::ContextBase
196 :
197 : // General
198 : WasmResult log(uint32_t level, std::string_view message) override;
199 : uint64_t getCurrentTimeNanoseconds() override;
200 : uint64_t getMonotonicTimeNanoseconds() override;
201 : std::string_view getConfiguration() override;
202 : std::pair<uint32_t, std::string_view> getStatus() override;
203 :
204 : // State accessors
205 : WasmResult getProperty(std::string_view path, std::string* result) override;
206 : WasmResult setProperty(std::string_view path, std::string_view value) override;
207 : WasmResult setEnvoyFilterState(std::string_view path, std::string_view value,
208 : StreamInfo::FilterState::LifeSpan life_span);
209 : WasmResult declareProperty(std::string_view path,
210 : Filters::Common::Expr::CelStatePrototypeConstPtr state_prototype);
211 :
212 : // Continue
213 : WasmResult continueStream(WasmStreamType stream_type) override;
214 : WasmResult closeStream(WasmStreamType stream_type) override;
215 : void failStream(WasmStreamType stream_type) override;
216 : WasmResult sendLocalResponse(uint32_t response_code, std::string_view body_text,
217 : Pairs additional_headers, uint32_t grpc_status,
218 : std::string_view details) override;
219 0 : void clearRouteCache() override {
220 0 : if (decoder_callbacks_) {
221 0 : decoder_callbacks_->downstreamCallbacks()->clearRouteCache();
222 0 : }
223 0 : }
224 :
225 : // Header/Trailer/Metadata Maps
226 : WasmResult addHeaderMapValue(WasmHeaderMapType type, std::string_view key,
227 : std::string_view value) override;
228 : WasmResult getHeaderMapValue(WasmHeaderMapType type, std::string_view key,
229 : std::string_view* value) override;
230 : WasmResult getHeaderMapPairs(WasmHeaderMapType type, Pairs* result) override;
231 : WasmResult setHeaderMapPairs(WasmHeaderMapType type, const Pairs& pairs) override;
232 :
233 : WasmResult removeHeaderMapValue(WasmHeaderMapType type, std::string_view key) override;
234 : WasmResult replaceHeaderMapValue(WasmHeaderMapType type, std::string_view key,
235 : std::string_view value) override;
236 :
237 : WasmResult getHeaderMapSize(WasmHeaderMapType type, uint32_t* size) override;
238 :
239 : // Buffer
240 : BufferInterface* getBuffer(WasmBufferType type) override;
241 : // TODO: use stream_type.
242 0 : bool endOfStream(WasmStreamType /* stream_type */) override { return end_of_stream_; }
243 :
244 : // HTTP
245 : WasmResult httpCall(std::string_view cluster, const Pairs& request_headers,
246 : std::string_view request_body, const Pairs& request_trailers,
247 : int timeout_milliseconds, uint32_t* token_ptr) override;
248 :
249 : // Stats/Metrics
250 : WasmResult defineMetric(uint32_t type, std::string_view name, uint32_t* metric_id_ptr) override;
251 : WasmResult incrementMetric(uint32_t metric_id, int64_t offset) override;
252 : WasmResult recordMetric(uint32_t metric_id, uint64_t value) override;
253 : WasmResult getMetric(uint32_t metric_id, uint64_t* value_ptr) override;
254 :
255 : // gRPC
256 : WasmResult grpcCall(std::string_view grpc_service, std::string_view service_name,
257 : std::string_view method_name, const Pairs& initial_metadata,
258 : std::string_view request, std::chrono::milliseconds timeout,
259 : uint32_t* token_ptr) override;
260 : WasmResult grpcStream(std::string_view grpc_service, std::string_view service_name,
261 : std::string_view method_name, const Pairs& initial_metadat,
262 : uint32_t* token_ptr) override;
263 :
264 : WasmResult grpcClose(uint32_t token) override;
265 : WasmResult grpcCancel(uint32_t token) override;
266 : WasmResult grpcSend(uint32_t token, std::string_view message, bool end_stream) override;
267 :
268 : // Envoy specific ABI
269 : void onResolveDns(uint32_t token, Envoy::Network::DnsResolver::ResolutionStatus status,
270 : std::list<Envoy::Network::DnsResponse>&& response);
271 :
272 : void onStatsUpdate(Envoy::Stats::MetricSnapshot& snapshot);
273 :
274 : // CEL evaluation
275 : absl::optional<google::api::expr::runtime::CelValue>
276 : findValue(absl::string_view name, Protobuf::Arena* arena, bool last) const;
277 : absl::optional<google::api::expr::runtime::CelValue>
278 : FindValue(absl::string_view name, Protobuf::Arena* arena) const override;
279 :
280 : // Foreign function state
281 0 : virtual void setForeignData(absl::string_view data_name, std::unique_ptr<StorageObject> data) {
282 0 : data_storage_[data_name] = std::move(data);
283 0 : }
284 0 : template <typename T> T* getForeignData(absl::string_view data_name) {
285 0 : const auto& it = data_storage_.find(data_name);
286 0 : if (it == data_storage_.end()) {
287 0 : return nullptr;
288 0 : }
289 0 : return dynamic_cast<T*>(it->second.get());
290 0 : }
291 :
292 : protected:
293 : friend class Wasm;
294 :
295 : void addAfterVmCallAction(std::function<void()> f);
296 : void onCloseTCP();
297 :
298 : struct AsyncClientHandler : public Http::AsyncClient::Callbacks {
299 : // Http::AsyncClient::Callbacks
300 : void onSuccess(const Http::AsyncClient::Request&,
301 0 : Envoy::Http::ResponseMessagePtr&& response) override {
302 0 : context_->onHttpCallSuccess(token_, std::move(response));
303 0 : }
304 : void onFailure(const Http::AsyncClient::Request&,
305 0 : Http::AsyncClient::FailureReason reason) override {
306 0 : context_->onHttpCallFailure(token_, reason);
307 0 : }
308 : void
309 : onBeforeFinalizeUpstreamSpan(Envoy::Tracing::Span& /* span */,
310 0 : const Http::ResponseHeaderMap* /* response_headers */) override {}
311 :
312 : Context* context_;
313 : uint32_t token_;
314 : Http::AsyncClient::Request* request_;
315 : };
316 :
317 : struct GrpcCallClientHandler : public Grpc::RawAsyncRequestCallbacks {
318 : // Grpc::AsyncRequestCallbacks
319 0 : void onCreateInitialMetadata(Http::RequestHeaderMap& initial_metadata) override {
320 0 : context_->onGrpcCreateInitialMetadata(token_, initial_metadata);
321 0 : }
322 0 : void onSuccessRaw(::Envoy::Buffer::InstancePtr&& response, Tracing::Span& /* span */) override {
323 0 : context_->onGrpcReceiveWrapper(token_, std::move(response));
324 0 : }
325 : void onFailure(Grpc::Status::GrpcStatus status, const std::string& message,
326 0 : Tracing::Span& /* span */) override {
327 0 : context_->onGrpcCloseWrapper(token_, status, message);
328 0 : }
329 :
330 : Context* context_;
331 : uint32_t token_;
332 : Grpc::RawAsyncClientSharedPtr client_;
333 : Grpc::AsyncRequest* request_;
334 : };
335 :
336 : struct GrpcStreamClientHandler : public Grpc::RawAsyncStreamCallbacks {
337 : // Grpc::AsyncStreamCallbacks
338 0 : void onCreateInitialMetadata(Http::RequestHeaderMap& initial_metadata) override {
339 0 : context_->onGrpcCreateInitialMetadata(token_, initial_metadata);
340 0 : }
341 0 : void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override {
342 0 : context_->onGrpcReceiveInitialMetadataWrapper(token_, std::move(metadata));
343 0 : }
344 0 : bool onReceiveMessageRaw(::Envoy::Buffer::InstancePtr&& response) override {
345 0 : context_->onGrpcReceiveWrapper(token_, std::move(response));
346 0 : return true;
347 0 : }
348 0 : void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override {
349 0 : context_->onGrpcReceiveTrailingMetadataWrapper(token_, std::move(metadata));
350 0 : }
351 0 : void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override {
352 0 : remote_closed_ = true;
353 0 : context_->onGrpcCloseWrapper(token_, status, message);
354 0 : }
355 :
356 : Context* context_;
357 : uint32_t token_;
358 : Grpc::RawAsyncClientSharedPtr client_;
359 : Grpc::RawAsyncStream* stream_;
360 : bool local_closed_ = false;
361 : bool remote_closed_ = false;
362 : };
363 :
364 : void onHttpCallSuccess(uint32_t token, Envoy::Http::ResponseMessagePtr&& response);
365 : void onHttpCallFailure(uint32_t token, Http::AsyncClient::FailureReason reason);
366 :
367 : void onGrpcCreateInitialMetadata(uint32_t token, Http::RequestHeaderMap& metadata);
368 : void onGrpcReceiveInitialMetadataWrapper(uint32_t token, Http::HeaderMapPtr&& metadata);
369 : void onGrpcReceiveWrapper(uint32_t token, ::Envoy::Buffer::InstancePtr response);
370 : void onGrpcReceiveTrailingMetadataWrapper(uint32_t token, Http::HeaderMapPtr&& metadata);
371 : void onGrpcCloseWrapper(uint32_t token, const Grpc::Status::GrpcStatus& status,
372 : const std::string_view message);
373 :
374 : Http::HeaderMap* getMap(WasmHeaderMapType type);
375 : const Http::HeaderMap* getConstMap(WasmHeaderMapType type);
376 :
377 : const LocalInfo::LocalInfo* root_local_info_{nullptr}; // set only for root_context.
378 : PluginHandleSharedPtr plugin_handle_{nullptr};
379 :
380 : uint32_t next_http_call_token_ = 1;
381 : uint32_t next_grpc_token_ = 1; // Odd tokens are for Calls even for Streams.
382 :
383 : // Network callbacks.
384 : Network::ReadFilterCallbacks* network_read_filter_callbacks_{};
385 : Network::WriteFilterCallbacks* network_write_filter_callbacks_{};
386 :
387 : // HTTP callbacks.
388 : Envoy::Http::StreamDecoderFilterCallbacks* decoder_callbacks_{};
389 : Envoy::Http::StreamEncoderFilterCallbacks* encoder_callbacks_{};
390 :
391 : // Status.
392 : uint32_t status_code_{0};
393 : absl::string_view status_message_;
394 :
395 : // Network filter state.
396 : ::Envoy::Buffer::Instance* network_downstream_data_buffer_{};
397 : ::Envoy::Buffer::Instance* network_upstream_data_buffer_{};
398 :
399 : // HTTP filter state.
400 : Http::RequestHeaderMap* request_headers_{};
401 : Http::ResponseHeaderMap* response_headers_{};
402 : ::Envoy::Buffer::Instance* request_body_buffer_{};
403 : ::Envoy::Buffer::Instance* response_body_buffer_{};
404 : Http::RequestTrailerMap* request_trailers_{};
405 : Http::ResponseTrailerMap* response_trailers_{};
406 : Http::MetadataMap* request_metadata_{};
407 : Http::MetadataMap* response_metadata_{};
408 :
409 : // Only available during onHttpCallResponse.
410 : Envoy::Http::ResponseMessagePtr* http_call_response_{};
411 :
412 : Http::HeaderMapPtr grpc_receive_initial_metadata_{};
413 : Http::HeaderMapPtr grpc_receive_trailing_metadata_{};
414 :
415 : // Only available (non-nullptr) during onGrpcReceive.
416 : ::Envoy::Buffer::InstancePtr grpc_receive_buffer_;
417 :
418 : // Only available (non-nullptr) during grpcCall and grpcStream.
419 : Http::RequestHeaderMapPtr grpc_initial_metadata_;
420 :
421 : // Access log state.
422 : bool access_log_phase_ = false;
423 : const StreamInfo::StreamInfo* access_log_stream_info_{};
424 : const Http::RequestHeaderMap* access_log_request_headers_{};
425 : const Http::ResponseHeaderMap* access_log_response_headers_{};
426 : const Http::ResponseTrailerMap* access_log_response_trailers_{};
427 :
428 : // Temporary state.
429 : Buffer buffer_;
430 : bool buffering_request_body_ = false;
431 : bool buffering_response_body_ = false;
432 : bool end_of_stream_ = false;
433 : bool local_reply_sent_ = false;
434 : bool local_reply_hold_ = false;
435 : ProtobufWkt::Struct temporary_metadata_;
436 :
437 : // MB: must be a node-type map as we take persistent references to the entries.
438 : std::map<uint32_t, AsyncClientHandler> http_request_;
439 : std::map<uint32_t, GrpcCallClientHandler> grpc_call_request_;
440 : std::map<uint32_t, GrpcStreamClientHandler> grpc_stream_;
441 :
442 : // Opaque state.
443 : absl::flat_hash_map<std::string, std::unique_ptr<StorageObject>> data_storage_;
444 :
445 : // TCP State.
446 : bool upstream_closed_ = false;
447 : bool downstream_closed_ = false;
448 : bool tcp_connection_closed_ = false;
449 :
450 : // Filter state prototype declaration.
451 : absl::flat_hash_map<std::string, Filters::Common::Expr::CelStatePrototypeConstPtr>
452 : state_prototypes_;
453 : };
454 : using ContextSharedPtr = std::shared_ptr<Context>;
455 :
456 : WasmResult serializeValue(Filters::Common::Expr::CelValue value, std::string* result);
457 :
458 : } // namespace Wasm
459 : } // namespace Common
460 : } // namespace Extensions
461 : } // namespace Envoy
|