1
#include "source/extensions/common/wasm/wasm.h"
2

            
3
#include <algorithm>
4
#include <chrono>
5

            
6
#include "envoy/event/deferred_deletable.h"
7
#include "envoy/extensions/wasm/v3/wasm.pb.h"
8

            
9
#include "source/common/common/backoff_strategy.h"
10
#include "source/common/common/logger.h"
11
#include "source/common/network/dns_resolver/dns_factory_util.h"
12
#include "source/common/runtime/runtime_features.h"
13
#include "source/extensions/common/wasm/plugin.h"
14
#include "source/extensions/common/wasm/remote_async_datasource.h"
15
#include "source/extensions/common/wasm/stats_handler.h"
16

            
17
#include "absl/strings/str_cat.h"
18

            
19
using proxy_wasm::FailState;
20
using proxy_wasm::Word;
21

            
22
namespace Envoy {
23

            
24
using ScopeWeakPtr = std::weak_ptr<Stats::Scope>;
25

            
26
namespace Extensions {
27
namespace Common {
28
namespace Wasm {
29
namespace {
30

            
31
struct CodeCacheEntry {
32
  std::string code;
33
  bool in_progress;
34
  MonotonicTime use_time;
35
  MonotonicTime fetch_time;
36
};
37

            
38
class RemoteDataFetcherAdapter : public Config::DataFetcher::RemoteDataFetcherCallback,
39
                                 public Event::DeferredDeletable {
40
public:
41
12
  RemoteDataFetcherAdapter(std::function<void(std::string cb)> cb) : cb_(cb) {}
42
12
  ~RemoteDataFetcherAdapter() override = default;
43
4
  void onSuccess(const std::string& data) override { cb_(data); }
44
8
  void onFailure(Config::DataFetcher::FailureReason) override { cb_(""); }
45
12
  void setFetcher(std::unique_ptr<Config::DataFetcher::RemoteDataFetcher>&& fetcher) {
46
12
    fetcher_ = std::move(fetcher);
47
12
  }
48

            
49
private:
50
  std::function<void(std::string)> cb_;
51
  std::unique_ptr<Config::DataFetcher::RemoteDataFetcher> fetcher_;
52
};
53

            
54
const std::string INLINE_STRING = "<inline>";
55
const int CODE_CACHE_SECONDS_NEGATIVE_CACHING = 10;
56
const int CODE_CACHE_SECONDS_CACHING_TTL = 24 * 3600; // 24 hours.
57
MonotonicTime::duration cache_time_offset_for_testing{};
58

            
59
std::mutex code_cache_mutex;
60
absl::flat_hash_map<std::string, CodeCacheEntry>* code_cache = nullptr;
61

            
62
// Downcast WasmBase to the actual Wasm.
63
2748
inline Wasm* getWasm(WasmHandleSharedPtr& base_wasm_handle) {
64
2748
  return static_cast<Wasm*>(base_wasm_handle->wasm().get());
65
2748
}
66

            
67
} // namespace
68

            
69
Wasm::Wasm(WasmConfig& config, absl::string_view vm_key, const Stats::ScopeSharedPtr& scope,
70
           Api::Api& api, Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher)
71
388
    : WasmBase(
72
388
          createWasmVm(config.config().vm_config().runtime()), config.config().vm_config().vm_id(),
73
388
          THROW_OR_RETURN_VALUE(
74
388
              MessageUtil::anyToBytes(config.config().vm_config().configuration()), std::string),
75
388
          toStdStringView(vm_key), config.environmentVariables(), config.allowedCapabilities()),
76
388
      scope_(scope), api_(api), stat_name_pool_(scope_->symbolTable()),
77
388
      custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)),
78
388
      cluster_manager_(cluster_manager), dispatcher_(dispatcher),
79
388
      time_source_(dispatcher.timeSource()), lifecycle_stats_handler_(LifecycleStatsHandler(
80
388
                                                 scope, config.config().vm_config().runtime())) {
81
388
  lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated);
82
388
  ENVOY_LOG(debug, "Base Wasm created {} now active", lifecycle_stats_handler_.getActiveVmCount());
83
388
}
84

            
85
Wasm::Wasm(WasmHandleSharedPtr base_wasm_handle, Event::Dispatcher& dispatcher)
86
687
    : WasmBase(base_wasm_handle,
87
687
               [&base_wasm_handle]() {
88
                 return createWasmVm(absl::StrCat(
89
                     "envoy.wasm.runtime.",
90
                     toAbslStringView(base_wasm_handle->wasm()->wasm_vm()->getEngineName())));
91
               }),
92
687
      scope_(getWasm(base_wasm_handle)->scope_), api_(getWasm(base_wasm_handle)->api_),
93
687
      stat_name_pool_(scope_->symbolTable()),
94
687
      custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)),
95
687
      cluster_manager_(getWasm(base_wasm_handle)->clusterManager()), dispatcher_(dispatcher),
96
687
      time_source_(dispatcher.timeSource()),
97
687
      lifecycle_stats_handler_(getWasm(base_wasm_handle)->lifecycle_stats_handler_) {
98
687
  lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated);
99
687
  ENVOY_LOG(debug, "Thread-Local Wasm created {} now active",
100
687
            lifecycle_stats_handler_.getActiveVmCount());
101
687
}
102

            
103
62
void Wasm::error(std::string_view message) { ENVOY_LOG(error, "Wasm VM failed {}", message); }
104

            
105
8
void Wasm::setTimerPeriod(uint32_t context_id, std::chrono::milliseconds new_period) {
106
8
  auto& period = timer_period_[context_id];
107
8
  auto& timer = timer_[context_id];
108
8
  bool was_running = timer && period.count() > 0;
109
8
  period = new_period;
110
8
  if (was_running) {
111
1
    timer->disableTimer();
112
1
  }
113
8
  if (period.count() > 0) {
114
7
    timer = dispatcher_.createTimer(
115
7
        [weak = std::weak_ptr<Wasm>(std::static_pointer_cast<Wasm>(shared_from_this())),
116
7
         context_id]() {
117
1
          auto shared = weak.lock();
118
1
          if (shared) {
119
1
            shared->tickHandler(context_id);
120
1
          }
121
1
        });
122
7
    timer->enableTimer(period);
123
7
  }
124
8
}
125

            
126
5
void Wasm::tickHandler(uint32_t root_context_id) {
127
5
  auto period = timer_period_.find(root_context_id);
128
5
  auto timer = timer_.find(root_context_id);
129
5
  if (period == timer_period_.end() || timer == timer_.end() || !on_tick_) {
130
    return;
131
  }
132
5
  auto context = getContext(root_context_id);
133
5
  if (context) {
134
5
    context->onTick(0);
135
5
  }
136
5
  if (timer->second && period->second.count() > 0) {
137
4
    timer->second->enableTimer(period->second);
138
4
  }
139
5
}
140

            
141
1075
Wasm::~Wasm() {
142
1075
  lifecycle_stats_handler_.onEvent(WasmEvent::VmShutDown);
143
1075
  ENVOY_LOG(debug, "~Wasm {} remaining active", lifecycle_stats_handler_.getActiveVmCount());
144
1075
}
145

            
146
// NOLINTNEXTLINE(readability-identifier-naming)
147
8
Word resolve_dns(Word dns_address_ptr, Word dns_address_size, Word token_ptr) {
148
8
  auto context = static_cast<Context*>(proxy_wasm::contextOrEffectiveContext());
149
8
  auto root_context = context->isRootContext() ? context : context->rootContext();
150
8
  auto address = context->wasmVm()->getMemory(dns_address_ptr, dns_address_size);
151
8
  if (!address) {
152
2
    return WasmResult::InvalidMemoryAccess;
153
2
  }
154
  // Verify set and verify token_ptr before initiating the async resolve.
155
6
  uint32_t token = context->envoyWasm()->nextDnsToken();
156
6
  if (!context->envoyWasm()->setDatatype(token_ptr, token)) {
157
2
    return WasmResult::InvalidMemoryAccess;
158
2
  }
159
4
  auto callback = [weak_wasm = std::weak_ptr<Wasm>(context->envoyWasm()->sharedThis()),
160
4
                   root_context, context_id = context->id(),
161
4
                   token](Envoy::Network::DnsResolver::ResolutionStatus status, absl::string_view,
162
4
                          std::list<Envoy::Network::DnsResponse>&& response) {
163
4
    auto wasm = weak_wasm.lock();
164
4
    if (!wasm) {
165
2
      return;
166
2
    }
167
2
    root_context->onResolveDns(token, status, std::move(response));
168
2
  };
169
4
  if (!context->envoyWasm()->dnsResolver()) {
170
4
    envoy::config::core::v3::TypedExtensionConfig typed_dns_resolver_config;
171
4
    Network::DnsResolverFactory& dns_resolver_factory =
172
4
        Network::createDefaultDnsResolverFactory(typed_dns_resolver_config);
173
4
    context->envoyWasm()->dnsResolver() =
174
4
        THROW_OR_RETURN_VALUE(dns_resolver_factory.createDnsResolver(
175
4
                                  context->envoyWasm()->dispatcher(), context->envoyWasm()->api(),
176
4
                                  typed_dns_resolver_config),
177
4
                              Network::DnsResolverSharedPtr);
178
4
  }
179
4
  context->envoyWasm()->dnsResolver()->resolve(std::string(address.value()),
180
4
                                               Network::DnsLookupFamily::Auto, callback);
181
4
  return WasmResult::Ok;
182
6
}
183

            
184
872
void Wasm::registerCallbacks() {
185
872
  WasmBase::registerCallbacks();
186
872
#define _REGISTER(_fn)                                                                             \
187
872
  wasm_vm_->registerCallback(                                                                      \
188
872
      "env", "envoy_" #_fn, &_fn,                                                                  \
189
872
      &proxy_wasm::ConvertFunctionWordToUint32<decltype(_fn), _fn>::convertFunctionWordToUint32)
190
872
  _REGISTER(resolve_dns);
191
872
#undef _REGISTER
192
872
}
193

            
194
1043
void Wasm::getFunctions() {
195
1043
  WasmBase::getFunctions();
196
2086
#define _GET(_fn) wasm_vm_->getFunction("envoy_" #_fn, &_fn##_);
197
1043
  _GET(on_resolve_dns)
198
1043
  _GET(on_stats_update)
199
1043
#undef _GET
200
1043
}
201

            
202
6
proxy_wasm::CallOnThreadFunction Wasm::callOnThreadFunction() {
203
6
  auto& dispatcher = dispatcher_;
204
6
  return [&dispatcher](const std::function<void()>& f) { return dispatcher.post(f); };
205
6
}
206

            
207
4
ContextBase* Wasm::createContext(const std::shared_ptr<PluginBase>& plugin) {
208
4
  if (create_context_for_testing_) {
209
2
    return create_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin));
210
2
  }
211
2
  return new Context(this, std::static_pointer_cast<Plugin>(plugin));
212
4
}
213

            
214
739
ContextBase* Wasm::createRootContext(const std::shared_ptr<PluginBase>& plugin) {
215
739
  if (create_root_context_for_testing_) {
216
529
    return create_root_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin));
217
529
  }
218
210
  return new Context(this, std::static_pointer_cast<Plugin>(plugin));
219
739
}
220

            
221
1043
ContextBase* Wasm::createVmContext() { return new Context(this); }
222

            
223
void Wasm::log(const PluginSharedPtr& plugin, const Formatter::Context& log_context,
224
5
               const StreamInfo::StreamInfo& info) {
225
5
  auto context = getRootContext(plugin, true);
226
5
  context->log(log_context, info);
227
5
}
228

            
229
2
void Wasm::onStatsUpdate(const PluginSharedPtr& plugin, Envoy::Stats::MetricSnapshot& snapshot) {
230
2
  auto context = getRootContext(plugin, true);
231
2
  context->onStatsUpdate(snapshot);
232
2
}
233

            
234
391
void clearCodeCacheForTesting() {
235
391
  std::lock_guard<std::mutex> guard(code_cache_mutex);
236
391
  if (code_cache) {
237
14
    delete code_cache;
238
14
    code_cache = nullptr;
239
14
  }
240
391
  getCreateStatsHandler().resetStatsForTesting();
241
391
}
242

            
243
// TODO: remove this post #4160: Switch default to SimulatedTimeSystem.
244
4
void setTimeOffsetForCodeCacheForTesting(MonotonicTime::duration d) {
245
4
  cache_time_offset_for_testing = d;
246
4
}
247

            
248
static proxy_wasm::WasmHandleFactory
249
getWasmHandleFactory(WasmConfig& wasm_config, const Stats::ScopeSharedPtr& scope, Api::Api& api,
250
                     Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher,
251
378
                     Server::ServerLifecycleNotifier&) {
252
378
  return [&wasm_config, &scope, &api, &cluster_manager,
253
378
          &dispatcher](std::string_view vm_key) -> WasmHandleBaseSharedPtr {
254
349
    auto wasm = std::make_shared<Wasm>(wasm_config, toAbslStringView(vm_key), scope, api,
255
349
                                       cluster_manager, dispatcher);
256
349
    return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm)));
257
349
  };
258
378
}
259

            
260
static proxy_wasm::WasmHandleCloneFactory
261
getWasmHandleCloneFactory(Event::Dispatcher& dispatcher,
262
742
                          CreateContextFn create_root_context_for_testing) {
263
742
  return [&dispatcher, create_root_context_for_testing](
264
742
             WasmHandleBaseSharedPtr base_wasm) -> std::shared_ptr<WasmHandleBase> {
265
680
    auto wasm = std::make_shared<Wasm>(std::static_pointer_cast<WasmHandle>(base_wasm), dispatcher);
266
680
    wasm->setCreateContextForTesting(nullptr, create_root_context_for_testing);
267
680
    return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm)));
268
680
  };
269
742
}
270

            
271
364
static proxy_wasm::PluginHandleFactory getPluginHandleFactory() {
272
364
  return [](WasmHandleBaseSharedPtr base_wasm,
273
364
            PluginBaseSharedPtr base_plugin) -> std::shared_ptr<PluginHandleBase> {
274
357
    return std::static_pointer_cast<PluginHandleBase>(
275
357
        std::make_shared<PluginHandle>(std::static_pointer_cast<WasmHandle>(base_wasm),
276
357
                                       std::static_pointer_cast<Plugin>(base_plugin)));
277
357
  };
278
364
}
279

            
280
392
WasmEvent toWasmEvent(const std::shared_ptr<WasmHandleBase>& wasm) {
281
392
  if (!wasm) {
282
34
    return WasmEvent::UnableToCreateVm;
283
34
  }
284
358
  switch (wasm->wasm()->fail_state()) {
285
344
  case FailState::Ok:
286
344
    return WasmEvent::Ok;
287
2
  case FailState::UnableToCreateVm:
288
2
    return WasmEvent::UnableToCreateVm;
289
2
  case FailState::UnableToCloneVm:
290
2
    return WasmEvent::UnableToCloneVm;
291
2
  case FailState::MissingFunction:
292
2
    return WasmEvent::MissingFunction;
293
2
  case FailState::UnableToInitializeCode:
294
2
    return WasmEvent::UnableToInitializeCode;
295
2
  case FailState::StartFailed:
296
2
    return WasmEvent::StartFailed;
297
2
  case FailState::ConfigureFailed:
298
2
    return WasmEvent::ConfigureFailed;
299
2
  case FailState::RuntimeError:
300
2
    return WasmEvent::RuntimeError;
301
358
  }
302
  PANIC("corrupt enum");
303
}
304

            
305
bool createWasm(const PluginSharedPtr& plugin, const Stats::ScopeSharedPtr& scope,
306
                Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager,
307
                Event::Dispatcher& dispatcher, Api::Api& api,
308
                Server::ServerLifecycleNotifier& lifecycle_notifier,
309
                RemoteAsyncDataProviderPtr& remote_data_provider, CreateWasmCallback&& cb,
310
400
                CreateContextFn create_root_context_for_testing) {
311
400
  auto& stats_handler = getCreateStatsHandler();
312
400
  std::string source, code;
313
400
  auto config = plugin->wasmConfig();
314
400
  auto vm_config = config.config().vm_config();
315
400
  bool fetch = false;
316
400
  if (vm_config.code().has_remote()) {
317
    // TODO(https://github.com/envoyproxy/envoy/issues/25052) Stabilize this feature.
318
42
    ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
319
42
                        "Wasm remote code fetch is unstable and may cause a crash");
320
42
    auto now = dispatcher.timeSource().monotonicTime() + cache_time_offset_for_testing;
321
42
    source = vm_config.code().remote().http_uri().uri();
322
42
    std::lock_guard<std::mutex> guard(code_cache_mutex);
323
42
    if (!code_cache) {
324
26
      code_cache = new std::remove_reference<decltype(*code_cache)>::type;
325
26
    }
326
42
    Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
327
    // Remove entries older than CODE_CACHE_SECONDS_CACHING_TTL except for our target.
328
62
    for (auto it = code_cache->begin(); it != code_cache->end();) {
329
20
      if (now - it->second.use_time > std::chrono::seconds(CODE_CACHE_SECONDS_CACHING_TTL) &&
330
20
          it->first != vm_config.code().remote().sha256()) {
331
        code_cache->erase(it++);
332
20
      } else {
333
20
        ++it;
334
20
      }
335
20
    }
336
42
    stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
337
42
    auto it = code_cache->find(vm_config.code().remote().sha256());
338
42
    if (it != code_cache->end()) {
339
14
      it->second.use_time = now;
340
14
      if (it->second.in_progress) {
341
2
        stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss);
342
2
        ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
343
2
                            "createWasm: failed to load (in progress) from {}", source);
344
2
        cb(nullptr);
345
2
      }
346
14
      code = it->second.code;
347
14
      if (code.empty()) {
348
6
        if (now - it->second.fetch_time <
349
6
            std::chrono::seconds(CODE_CACHE_SECONDS_NEGATIVE_CACHING)) {
350
4
          stats_handler.onEvent(WasmEvent::RemoteLoadCacheNegativeHit);
351
4
          ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
352
4
                              "createWasm: failed to load (cached) from {}", source);
353
4
          cb(nullptr);
354
4
        }
355
6
        fetch = true; // Fetch failed, retry.
356
6
        it->second.in_progress = true;
357
6
        it->second.fetch_time = now;
358
8
      } else {
359
8
        stats_handler.onEvent(WasmEvent::RemoteLoadCacheHit);
360
8
      }
361
34
    } else {
362
28
      fetch = true; // Not in cache, fetch.
363
28
      auto& e = (*code_cache)[vm_config.code().remote().sha256()];
364
28
      e.in_progress = true;
365
28
      e.use_time = e.fetch_time = now;
366
28
      stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
367
28
      stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss);
368
28
    }
369
380
  } else if (vm_config.code().has_local()) {
370
354
    code = THROW_OR_RETURN_VALUE(Config::DataSource::read(vm_config.code().local(), true, api),
371
354
                                 std::string);
372
354
    source = Config::DataSource::getPath(vm_config.code().local())
373
354
                 .value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
374
354
  }
375

            
376
400
  auto vm_key = proxy_wasm::makeVmKey(
377
400
      vm_config.vm_id(),
378
400
      THROW_OR_RETURN_VALUE(MessageUtil::anyToBytes(vm_config.configuration()), std::string), code);
379
400
  auto complete_cb = [cb, vm_key, plugin, scope, &api, &cluster_manager, &dispatcher,
380
400
                      &lifecycle_notifier, create_root_context_for_testing,
381
400
                      &stats_handler](std::string code) -> bool {
382
388
    if (code.empty()) {
383
10
      cb(nullptr);
384
10
      return false;
385
10
    }
386

            
387
378
    auto config = plugin->wasmConfig();
388
378
    auto wasm = proxy_wasm::createWasm(
389
378
        vm_key, code, plugin,
390
378
        getWasmHandleFactory(config, scope, api, cluster_manager, dispatcher, lifecycle_notifier),
391
378
        getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
392
378
        config.config().vm_config().allow_precompiled());
393
378
    Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
394
378
    stats_handler.onEvent(toWasmEvent(wasm));
395
378
    if (!wasm || wasm->wasm()->isFailed()) {
396
34
      ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
397
34
                          "Unable to create Wasm");
398
34
      cb(nullptr);
399
34
      return false;
400
34
    }
401
344
    cb(std::static_pointer_cast<WasmHandle>(wasm));
402
344
    return true;
403
378
  };
404

            
405
400
  if (fetch) {
406
34
    auto holder = std::make_shared<std::unique_ptr<Event::DeferredDeletable>>();
407
34
    auto fetch_callback = [vm_config, complete_cb, source, &dispatcher, scope, holder, plugin,
408
34
                           &stats_handler](const std::string& code) {
409
34
      {
410
34
        std::lock_guard<std::mutex> guard(code_cache_mutex);
411
34
        auto& e = (*code_cache)[vm_config.code().remote().sha256()];
412
34
        e.in_progress = false;
413
34
        e.code = code;
414
34
        Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
415
34
        if (code.empty()) {
416
14
          stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchFailure);
417
30
        } else {
418
20
          stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchSuccess);
419
20
        }
420
34
        stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
421
34
      }
422
      // NB: xDS currently does not support failing asynchronously, so we fail immediately
423
      // if remote Wasm code is not cached and do a background fill.
424
34
      if (!vm_config.nack_on_code_cache_miss()) {
425
22
        if (code.empty()) {
426
6
          ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
427
6
                              "Failed to load Wasm code (fetch failed) from {}", source);
428
6
        }
429
22
        complete_cb(code);
430
22
      }
431
      // NB: must be deleted explicitly.
432
34
      if (*holder) {
433
12
        dispatcher.deferredDelete(Envoy::Event::DeferredDeletablePtr{holder->release()});
434
12
      }
435
34
    };
436
34
    if (vm_config.nack_on_code_cache_miss()) {
437
12
      auto adapter = std::make_unique<RemoteDataFetcherAdapter>(fetch_callback);
438
12
      auto fetcher = std::make_unique<Config::DataFetcher::RemoteDataFetcher>(
439
12
          cluster_manager, vm_config.code().remote().http_uri(), vm_config.code().remote().sha256(),
440
12
          *adapter);
441
12
      auto fetcher_ptr = fetcher.get();
442
12
      adapter->setFetcher(std::move(fetcher));
443
12
      *holder = std::move(adapter);
444
12
      fetcher_ptr->fetch();
445
12
      ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
446
12
                          fmt::format("Failed to load Wasm code (fetching) from {}", source));
447
12
      cb(nullptr);
448
12
      return false;
449
30
    } else {
450
22
      remote_data_provider = std::make_unique<RemoteAsyncDataProvider>(
451
22
          cluster_manager, init_manager, vm_config.code().remote(), dispatcher,
452
22
          api.randomGenerator(), true, fetch_callback);
453
22
    }
454
372
  } else {
455
366
    return complete_cb(code);
456
366
  }
457
22
  return true;
458
400
}
459

            
460
PluginHandleSharedPtr
461
getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginSharedPtr& plugin,
462
                             Event::Dispatcher& dispatcher,
463
426
                             CreateContextFn create_root_context_for_testing) {
464
426
  if (!base_wasm) {
465
62
    if (!plugin->fail_open_) {
466
55
      ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), critical,
467
55
                          "Plugin configured to fail closed failed to load");
468
55
    }
469
    // To handle the case when failed to create VMs and fail-open/close properly,
470
    // we still create PluginHandle with null WasmBase.
471
62
    return std::make_shared<PluginHandle>(nullptr, plugin);
472
62
  }
473
364
  return std::static_pointer_cast<PluginHandle>(proxy_wasm::getOrCreateThreadLocalPlugin(
474
364
      std::static_pointer_cast<WasmHandle>(base_wasm), plugin,
475
364
      getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
476
364
      getPluginHandleFactory()));
477
426
}
478

            
479
// Simple helper function to get the Wasm* from a WasmHandle.
480
126
Wasm* getWasmOrNull(WasmHandleSharedPtr& h) { return h != nullptr ? h->wasm().get() : nullptr; }
481

            
482
130
Wasm* PluginConfig::maybeReloadHandleIfNeeded(SinglePluginHandle& handle_wrapper) {
483
  // base_wasm_ is null means the plugin is not loaded successfully. Return anyway.
484
130
  if (base_wasm_ == nullptr) {
485
6
    return nullptr;
486
6
  }
487

            
488
  // Null handle is special case and won't be reloaded for backward compatibility.
489
124
  if (handle_wrapper.handle == nullptr) {
490
6
    return nullptr;
491
6
  }
492

            
493
118
  Wasm* wasm = getWasmOrNull(handle_wrapper.handle->wasmHandle());
494

            
495
  // Only runtime failure will be handled by reloading logic. If the wasm is not failed or
496
  // failed with other errors, return it directly.
497
118
  if (wasm == nullptr || wasm->fail_state() != proxy_wasm::FailState::RuntimeError) {
498
97
    return wasm;
499
97
  }
500

            
501
  // If the handle is not allowed to reload, return it directly.
502
21
  if (failure_policy_ != FailurePolicy::FAIL_RELOAD) {
503
15
    return wasm;
504
15
  }
505

            
506
6
  ASSERT(reload_backoff_ != nullptr);
507
6
  uint64_t reload_interval = reload_backoff_->nextBackOffMs();
508

            
509
6
  Event::Dispatcher& dispatcher = wasm->dispatcher();
510

            
511
6
  MonotonicTime now = dispatcher.timeSource().monotonicTime();
512
6
  if (std::chrono::duration_cast<std::chrono::milliseconds>(now - handle_wrapper.last_load)
513
6
          .count() < static_cast<int64_t>(reload_interval)) {
514
2
    stats_handler_->onEvent(WasmEvent::VmReloadBackoff);
515
2
    return wasm;
516
2
  }
517

            
518
  // Reload the handle and update it if the new handle is not failed. The timestamp will be
519
  // updated anyway.
520
4
  handle_wrapper.last_load = now;
521
4
  PluginHandleSharedPtr new_load = getOrCreateThreadLocalPlugin(base_wasm_, plugin_, dispatcher);
522
4
  if (new_load != nullptr) {
523
4
    Wasm* new_wasm = getWasmOrNull(new_load->wasmHandle());
524
4
    if (new_wasm == nullptr || new_wasm->isFailed()) {
525
      stats_handler_->onEvent(WasmEvent::VmReloadFailure);
526
4
    } else {
527
4
      stats_handler_->onEvent(WasmEvent::VmReloadSuccess);
528
4
      handle_wrapper.handle = new_load;
529
4
    }
530
4
  } else {
531
    stats_handler_->onEvent(WasmEvent::VmReloadFailure);
532
  }
533

            
534
4
  ASSERT(handle_wrapper.handle != nullptr);
535
4
  return getWasmOrNull(handle_wrapper.handle->wasmHandle());
536
6
}
537

            
538
134
std::pair<OptRef<PluginConfig::SinglePluginHandle>, Wasm*> PluginConfig::getPluginHandleAndWasm() {
539
134
  if (absl::holds_alternative<absl::monostate>(plugin_handle_)) {
540
4
    return {OptRef<SinglePluginHandle>{}, nullptr};
541
4
  }
542

            
543
130
  if (is_singleton_handle_) {
544
21
    ASSERT(absl::holds_alternative<SinglePluginHandle>(plugin_handle_));
545
21
    OptRef<SinglePluginHandle> singleton_handle = absl::get<SinglePluginHandle>(plugin_handle_);
546
21
    return {singleton_handle, maybeReloadHandleIfNeeded(singleton_handle.ref())};
547
21
  }
548

            
549
109
  ASSERT(absl::holds_alternative<ThreadLocalPluginHandle>(plugin_handle_));
550
109
  auto* thread_local_handle = absl::get<ThreadLocalPluginHandle>(plugin_handle_).get();
551
109
  if (!thread_local_handle->currentThreadRegistered()) {
552
    return {OptRef<SinglePluginHandle>{}, nullptr};
553
  }
554
109
  auto plugin_handle_holder = thread_local_handle->get();
555
109
  if (!plugin_handle_holder.has_value()) {
556
    return {OptRef<SinglePluginHandle>{}, nullptr};
557
  }
558

            
559
109
  return {plugin_handle_holder, maybeReloadHandleIfNeeded(*plugin_handle_holder)};
560
109
}
561

            
562
PluginConfig::PluginConfig(const envoy::extensions::wasm::v3::PluginConfig& config,
563
                           Server::Configuration::ServerFactoryContext& context,
564
                           Stats::Scope& scope, Init::Manager& init_manager,
565
                           envoy::config::core::v3::TrafficDirection direction,
566
                           const envoy::config::core::v3::Metadata* metadata, bool singleton)
567
139
    : is_singleton_handle_(singleton) {
568

            
569
139
  if (config.fail_open()) {
570
    // If the legacy fail_open is set to true explicitly.
571

            
572
    // Only one of fail_open or failure_policy can be set explicitly.
573
9
    if (config.failure_policy() != FailurePolicy::UNSPECIFIED) {
574
      throw EnvoyException("only one of fail_open or failure_policy can be set");
575
    }
576

            
577
    // We treat fail_open as FAIL_OPEN.
578
9
    failure_policy_ = FailurePolicy::FAIL_OPEN;
579
132
  } else {
580
    // If the legacy fail_open is not set, we need to determine the failure policy.
581
130
    switch (config.failure_policy()) {
582
115
    case FailurePolicy::UNSPECIFIED: {
583
      // TODO(wbpcode): we may could add a runtime key to set the default failure policy.
584
115
      failure_policy_ = FailurePolicy::FAIL_CLOSED;
585
115
      break;
586
    }
587
2
    case FailurePolicy::FAIL_RELOAD:
588
4
    case FailurePolicy::FAIL_CLOSED:
589
15
    case FailurePolicy::FAIL_OPEN:
590
      // If the failure policy is FAIL_RELOAD, FAIL_CLOSED, or FAIL_OPEN, we treat it as the
591
      // failure policy.
592
15
      failure_policy_ = config.failure_policy();
593
15
      break;
594
    default:
595
      throw EnvoyException("unknown failure policy");
596
130
    }
597
130
  }
598
139
  ASSERT(failure_policy_ == FailurePolicy::FAIL_CLOSED ||
599
139
         failure_policy_ == FailurePolicy::FAIL_OPEN ||
600
139
         failure_policy_ == FailurePolicy::FAIL_RELOAD);
601

            
602
139
  if (failure_policy_ == FailurePolicy::FAIL_RELOAD) {
603
2
    const uint64_t base =
604
2
        PROTOBUF_GET_MS_OR_DEFAULT(config.reload_config().backoff(), base_interval, 1000);
605
2
    reload_backoff_ =
606
2
        std::make_unique<JitteredLowerBoundBackOffStrategy>(base, context.api().randomGenerator());
607
2
  }
608

            
609
139
  stats_handler_ = std::make_shared<StatsHandler>(scope, absl::StrCat("wasm.", config.name(), "."));
610
139
  plugin_ = std::make_shared<Plugin>(config, direction, context.localInfo(), metadata);
611

            
612
145
  auto callback = [this, &context](WasmHandleSharedPtr base_wasm) {
613
145
    base_wasm_ = base_wasm;
614

            
615
145
    if (base_wasm == nullptr) {
616
53
      ENVOY_LOG(critical, "Plugin {} failed to load", plugin_->name_);
617
53
    }
618

            
619
145
    if (is_singleton_handle_) {
620
19
      plugin_handle_ = SinglePluginHandle(
621
19
          getOrCreateThreadLocalPlugin(base_wasm, plugin_, context.mainThreadDispatcher()),
622
19
          context.mainThreadDispatcher().timeSource().monotonicTime());
623
19
      return;
624
19
    }
625

            
626
126
    auto thread_local_handle =
627
126
        ThreadLocal::TypedSlot<SinglePluginHandle>::makeUnique(context.threadLocal());
628
    // NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
629
148
    thread_local_handle->set([base_wasm, plugin = this->plugin_](Event::Dispatcher& dispatcher) {
630
148
      return std::make_shared<SinglePluginHandle>(
631
148
          getOrCreateThreadLocalPlugin(base_wasm, plugin, dispatcher),
632
148
          dispatcher.timeSource().monotonicTime());
633
148
    });
634
126
    plugin_handle_ = std::move(thread_local_handle);
635
126
  };
636

            
637
139
  if (!Common::Wasm::createWasm(plugin_, scope.createScope(""), context.clusterManager(),
638
139
                                init_manager, context.mainThreadDispatcher(), context.api(),
639
139
                                context.lifecycleNotifier(), remote_data_provider_,
640
139
                                std::move(callback))) {
641
    // TODO(wbpcode): use absl::Status to return error rather than throw.
642
35
    throw Common::Wasm::WasmException(
643
35
        fmt::format("Unable to create Wasm plugin {}", plugin_->name_));
644
35
  }
645
139
}
646

            
647
98
std::shared_ptr<Context> PluginConfig::createContext() {
648
98
  auto [plugin_handle_holder, wasm] = getPluginHandleAndWasm();
649
98
  if (!plugin_handle_holder.has_value() || plugin_handle_holder->handle == nullptr) {
650
8
    return nullptr;
651
8
  }
652

            
653
  // FAIL_RELOAD is handled by the getPluginHandleAndWasm() call. If the latest
654
  // wasm is still failed, return nullptr or an empty Context.
655
90
  if (!wasm || wasm->isFailed()) {
656
17
    if (failure_policy_ == FailurePolicy::FAIL_OPEN) {
657
      // Fail open skips adding this filter to callbacks.
658
8
      return nullptr;
659
14
    } else {
660
      // Fail closed is handled by an empty Context.
661
9
      return std::make_shared<Context>(nullptr, 0, plugin_handle_holder->handle);
662
9
    }
663
17
  }
664
73
  return std::make_shared<Context>(wasm, plugin_handle_holder->handle->rootContextId(),
665
73
                                   plugin_handle_holder->handle);
666
90
}
667

            
668
36
Wasm* PluginConfig::wasm() { return getPluginHandleAndWasm().second; }
669

            
670
} // namespace Wasm
671
} // namespace Common
672
} // namespace Extensions
673
} // namespace Envoy