1
#pragma once
2

            
3
#include <atomic>
4
#include <chrono>
5
#include <map>
6
#include <memory>
7

            
8
#include "envoy/common/exception.h"
9
#include "envoy/extensions/wasm/v3/wasm.pb.h"
10
#include "envoy/extensions/wasm/v3/wasm.pb.validate.h"
11
#include "envoy/http/filter.h"
12
#include "envoy/server/lifecycle_notifier.h"
13
#include "envoy/stats/scope.h"
14
#include "envoy/stats/stats.h"
15
#include "envoy/thread_local/thread_local_object.h"
16
#include "envoy/upstream/cluster_manager.h"
17

            
18
#include "source/common/common/assert.h"
19
#include "source/common/common/logger.h"
20
#include "source/common/config/datasource.h"
21
#include "source/common/stats/symbol_table.h"
22
#include "source/common/version/version.h"
23
#include "source/extensions/common/wasm/context.h"
24
#include "source/extensions/common/wasm/plugin.h"
25
#include "source/extensions/common/wasm/remote_async_datasource.h"
26
#include "source/extensions/common/wasm/stats_handler.h"
27
#include "source/extensions/common/wasm/wasm_vm.h"
28

            
29
#include "include/proxy-wasm/exports.h"
30
#include "include/proxy-wasm/wasm.h"
31

            
32
namespace Envoy {
33
namespace Extensions {
34
namespace Common {
35
namespace Wasm {
36

            
37
using CreateContextFn =
38
    std::function<ContextBase*(Wasm* wasm, const std::shared_ptr<Plugin>& plugin)>;
39
using FailurePolicy = envoy::extensions::wasm::v3::FailurePolicy;
40

            
41
class WasmHandle;
42

            
43
// Wasm execution instance. Manages the Envoy side of the Wasm interface.
44
class Wasm : public WasmBase, Logger::Loggable<Logger::Id::wasm> {
45
public:
46
  Wasm(WasmConfig& config, absl::string_view vm_key, const Stats::ScopeSharedPtr& scope,
47
       Api::Api& api, Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher);
48
  Wasm(std::shared_ptr<WasmHandle> other, Event::Dispatcher& dispatcher);
49
  ~Wasm() override;
50

            
51
917
  Upstream::ClusterManager& clusterManager() const { return cluster_manager_; }
52
10
  Event::Dispatcher& dispatcher() { return dispatcher_; }
53
4
  Api::Api& api() { return api_; }
54
321
  Context* getRootContext(const std::shared_ptr<PluginBase>& plugin, bool allow_closed) override {
55
321
    return static_cast<Context*>(WasmBase::getRootContext(plugin, allow_closed));
56
321
  }
57
  void setTimerPeriod(uint32_t root_context_id, std::chrono::milliseconds period) override;
58
  virtual void tickHandler(uint32_t root_context_id);
59
4
  std::shared_ptr<Wasm> sharedThis() { return std::static_pointer_cast<Wasm>(shared_from_this()); }
60
12
  Network::DnsResolverSharedPtr& dnsResolver() { return dns_resolver_; }
61

            
62
  // WasmBase
63
  void error(std::string_view message) override;
64
  proxy_wasm::CallOnThreadFunction callOnThreadFunction() override;
65
  ContextBase* createContext(const std::shared_ptr<PluginBase>& plugin) override;
66
  ContextBase* createRootContext(const std::shared_ptr<PluginBase>& plugin) override;
67
  ContextBase* createVmContext() override;
68
  void registerCallbacks() override;
69
  void getFunctions() override;
70

            
71
  // AccessLog::Instance
72
  void log(const PluginSharedPtr& plugin, const Formatter::Context& log_context,
73
           const StreamInfo::StreamInfo& info);
74

            
75
  void onStatsUpdate(const PluginSharedPtr& plugin, Envoy::Stats::MetricSnapshot& snapshot);
76

            
77
2
  virtual std::string buildVersion() { return BUILD_VERSION_NUMBER; }
78

            
79
6
  uint32_t nextDnsToken() {
80
6
    do {
81
6
      dns_token_++;
82
6
    } while (!dns_token_);
83
6
    return dns_token_;
84
6
  }
85

            
86
  void setCreateContextForTesting(CreateContextFn create_context,
87
720
                                  CreateContextFn create_root_context) {
88
720
    create_context_for_testing_ = create_context;
89
720
    create_root_context_for_testing_ = create_root_context;
90
720
  }
91
14
  void setFailStateForTesting(proxy_wasm::FailState fail_state) { failed_ = fail_state; }
92

            
93
protected:
94
  friend class Context;
95

            
96
  void initializeStats();
97
  // Calls into the VM.
98
  proxy_wasm::WasmCallVoid<3> on_resolve_dns_;
99
  proxy_wasm::WasmCallVoid<2> on_stats_update_;
100

            
101
  Stats::ScopeSharedPtr scope_;
102
  Api::Api& api_;
103
  Stats::StatNamePool stat_name_pool_;
104
  const Stats::StatName custom_stat_namespace_;
105
  Upstream::ClusterManager& cluster_manager_;
106
  Event::Dispatcher& dispatcher_;
107
  absl::flat_hash_map<uint32_t, Event::TimerPtr> timer_; // per root_id.
108
  TimeSource& time_source_;
109

            
110
  // Lifecycle stats
111
  LifecycleStatsHandler lifecycle_stats_handler_;
112

            
113
  // Plugin stats
114
  absl::flat_hash_map<uint32_t, Stats::Counter*> counters_;
115
  absl::flat_hash_map<uint32_t, Stats::Gauge*> gauges_;
116
  absl::flat_hash_map<uint32_t, Stats::Histogram*> histograms_;
117

            
118
  CreateContextFn create_context_for_testing_;
119
  CreateContextFn create_root_context_for_testing_;
120
  Network::DnsResolverSharedPtr dns_resolver_;
121
  uint32_t dns_token_ = 1;
122
};
123
using WasmSharedPtr = std::shared_ptr<Wasm>;
124

            
125
class WasmHandle : public WasmHandleBase, public ThreadLocal::ThreadLocalObject {
126
public:
127
  explicit WasmHandle(const WasmSharedPtr& wasm)
128
1040
      : WasmHandleBase(std::static_pointer_cast<WasmBase>(wasm)), wasm_(wasm) {}
129

            
130
3235
  WasmSharedPtr& wasm() { return wasm_; }
131

            
132
private:
133
  WasmSharedPtr wasm_;
134
};
135

            
136
using WasmHandleSharedPtr = std::shared_ptr<WasmHandle>;
137

            
138
class PluginHandle : public PluginHandleBase {
139
public:
140
  explicit PluginHandle(const WasmHandleSharedPtr& wasm_handle, const PluginSharedPtr& plugin)
141
423
      : PluginHandleBase(std::static_pointer_cast<WasmHandleBase>(wasm_handle),
142
423
                         std::static_pointer_cast<PluginBase>(plugin)),
143
423
        plugin_(plugin), wasm_handle_(wasm_handle) {}
144

            
145
381
  WasmHandleSharedPtr& wasmHandle() { return wasm_handle_; }
146
73
  uint32_t rootContextId() { return wasm_handle_->wasm()->getRootContext(plugin_, false)->id(); }
147

            
148
private:
149
  PluginSharedPtr plugin_;
150
  WasmHandleSharedPtr wasm_handle_;
151
};
152

            
153
using PluginHandleSharedPtr = std::shared_ptr<PluginHandle>;
154

            
155
class PluginHandleSharedPtrThreadLocal : public ThreadLocal::ThreadLocalObject {
156
public:
157
  PluginHandleSharedPtr handle{};
158
  MonotonicTime last_load{};
159

            
160
  PluginHandleSharedPtrThreadLocal(PluginHandleSharedPtr h, MonotonicTime t = {})
161
173
      : handle(std::move(h)), last_load(t) {}
162
  PluginHandleSharedPtrThreadLocal() = default;
163
};
164

            
165
using CreateWasmCallback = std::function<void(WasmHandleSharedPtr)>;
166

            
167
// Returns false if createWasm failed synchronously. This is necessary because xDS *MUST* report
168
// all failures synchronously as it has no facility to report configuration update failures
169
// asynchronously. Callers should throw an exception if they are part of a synchronous xDS update
170
// because that is the mechanism for reporting configuration errors.
171
bool createWasm(const PluginSharedPtr& plugin, const Stats::ScopeSharedPtr& scope,
172
                Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager,
173
                Event::Dispatcher& dispatcher, Api::Api& api,
174
                Server::ServerLifecycleNotifier& lifecycle_notifier,
175
                RemoteAsyncDataProviderPtr& remote_data_provider, CreateWasmCallback&& callback,
176
                CreateContextFn create_root_context_for_testing = nullptr);
177

            
178
PluginHandleSharedPtr
179
getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginSharedPtr& plugin,
180
                             Event::Dispatcher& dispatcher,
181
                             CreateContextFn create_root_context_for_testing = nullptr);
182

            
183
void clearCodeCacheForTesting();
184
void setTimeOffsetForCodeCacheForTesting(MonotonicTime::duration d);
185
WasmEvent toWasmEvent(const std::shared_ptr<WasmHandleBase>& wasm);
186

            
187
class PluginConfig : Logger::Loggable<Logger::Id::wasm> {
188
public:
189
  // TODO(wbpcode): the code of PluginConfig will be shared cross all Wasm extensions (loggers,
190
  // http filters, etc.), we may extend the constructor to takes a static string view to tell
191
  // the type of the plugin if needed.
192
  PluginConfig(const envoy::extensions::wasm::v3::PluginConfig& config,
193
               Server::Configuration::ServerFactoryContext& context, Stats::Scope& scope,
194
               Init::Manager& init_manager, envoy::config::core::v3::TrafficDirection direction,
195
               const envoy::config::core::v3::Metadata* metadata, bool singleton);
196

            
197
  std::shared_ptr<Context> createContext();
198
  Wasm* wasm();
199
7
  const PluginSharedPtr& plugin() { return plugin_; }
200
8
  WasmStats& wasmStats() { return stats_handler_->wasmStats(); }
201

            
202
  using SinglePluginHandle = PluginHandleSharedPtrThreadLocal;
203
  using ThreadLocalPluginHandle = ThreadLocal::TypedSlotPtr<SinglePluginHandle>;
204

            
205
private:
206
  /**
207
   * Get the latest wasm and plugin handle wrapper. The plugin handle may be reloaded if
208
   * the wasm is failed and the policy allows it.
209
   */
210
  std::pair<OptRef<SinglePluginHandle>, Wasm*> getPluginHandleAndWasm();
211

            
212
  /**
213
   * May reload the handle if the wasm if failed. The input handle will be updated if the
214
   * handle is reloaded.
215
   * @return the wasm pointer of the latest handle.
216
   */
217
  Wasm* maybeReloadHandleIfNeeded(SinglePluginHandle& handle_wrapper);
218

            
219
  StatsHandlerSharedPtr stats_handler_;
220
  FailurePolicy failure_policy_;
221
  // This backoff strategy implementation is thread-safe and could be shared across multiple
222
  // workers.
223
  std::unique_ptr<JitteredLowerBoundBackOffStrategy> reload_backoff_;
224
  PluginSharedPtr plugin_;
225
  RemoteAsyncDataProviderPtr remote_data_provider_;
226
  const bool is_singleton_handle_{};
227
  WasmHandleSharedPtr base_wasm_{};
228
  absl::variant<absl::monostate, SinglePluginHandle, ThreadLocalPluginHandle> plugin_handle_;
229
};
230

            
231
using PluginConfigPtr = std::unique_ptr<PluginConfig>;
232
using PluginConfigSharedPtr = std::shared_ptr<PluginConfig>;
233

            
234
} // namespace Wasm
235
} // namespace Common
236
} // namespace Extensions
237
} // namespace Envoy