Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/common/wasm/wasm.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/common/wasm/wasm.h"
2
3
#include <algorithm>
4
#include <chrono>
5
6
#include "envoy/event/deferred_deletable.h"
7
8
#include "source/common/common/logger.h"
9
#include "source/common/network/dns_resolver/dns_factory_util.h"
10
#include "source/extensions/common/wasm/plugin.h"
11
#include "source/extensions/common/wasm/stats_handler.h"
12
13
#include "absl/strings/str_cat.h"
14
15
using proxy_wasm::FailState;
16
using proxy_wasm::Word;
17
18
namespace Envoy {
19
20
using ScopeWeakPtr = std::weak_ptr<Stats::Scope>;
21
22
namespace Extensions {
23
namespace Common {
24
namespace Wasm {
25
namespace {
26
27
struct CodeCacheEntry {
28
  std::string code;
29
  bool in_progress;
30
  MonotonicTime use_time;
31
  MonotonicTime fetch_time;
32
};
33
34
class RemoteDataFetcherAdapter : public Config::DataFetcher::RemoteDataFetcherCallback,
35
                                 public Event::DeferredDeletable {
36
public:
37
1
  RemoteDataFetcherAdapter(std::function<void(std::string cb)> cb) : cb_(cb) {}
38
1
  ~RemoteDataFetcherAdapter() override = default;
39
0
  void onSuccess(const std::string& data) override { cb_(data); }
40
1
  void onFailure(Config::DataFetcher::FailureReason) override { cb_(""); }
41
1
  void setFetcher(std::unique_ptr<Config::DataFetcher::RemoteDataFetcher>&& fetcher) {
42
1
    fetcher_ = std::move(fetcher);
43
1
  }
44
45
private:
46
  std::function<void(std::string)> cb_;
47
  std::unique_ptr<Config::DataFetcher::RemoteDataFetcher> fetcher_;
48
};
49
50
const std::string INLINE_STRING = "<inline>";
51
const int CODE_CACHE_SECONDS_NEGATIVE_CACHING = 10;
52
const int CODE_CACHE_SECONDS_CACHING_TTL = 24 * 3600; // 24 hours.
53
MonotonicTime::duration cache_time_offset_for_testing{};
54
55
std::mutex code_cache_mutex;
56
absl::flat_hash_map<std::string, CodeCacheEntry>* code_cache = nullptr;
57
58
// Downcast WasmBase to the actual Wasm.
59
0
inline Wasm* getWasm(WasmHandleSharedPtr& base_wasm_handle) {
60
0
  return static_cast<Wasm*>(base_wasm_handle->wasm().get());
61
0
}
62
63
} // namespace
64
65
43
void Wasm::initializeLifecycle(Server::ServerLifecycleNotifier& lifecycle_notifier) {
66
43
  auto weak = std::weak_ptr<Wasm>(std::static_pointer_cast<Wasm>(shared_from_this()));
67
43
  lifecycle_notifier.registerCallback(Server::ServerLifecycleNotifier::Stage::ShutdownExit,
68
43
                                      [this, weak](Event::PostCb post_cb) {
69
0
                                        auto lock = weak.lock();
70
0
                                        if (lock) { // See if we are still alive.
71
0
                                          server_shutdown_post_cb_ = std::move(post_cb);
72
0
                                        }
73
0
                                      });
74
43
}
75
76
Wasm::Wasm(WasmConfig& config, absl::string_view vm_key, const Stats::ScopeSharedPtr& scope,
77
           Api::Api& api, Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher)
78
    : WasmBase(
79
          createWasmVm(config.config().vm_config().runtime()), config.config().vm_config().vm_id(),
80
          MessageUtil::anyToBytes(config.config().vm_config().configuration()),
81
          toStdStringView(vm_key), config.environmentVariables(), config.allowedCapabilities()),
82
      scope_(scope), api_(api), stat_name_pool_(scope_->symbolTable()),
83
      custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)),
84
      cluster_manager_(cluster_manager), dispatcher_(dispatcher),
85
      time_source_(dispatcher.timeSource()), lifecycle_stats_handler_(LifecycleStatsHandler(
86
43
                                                 scope, config.config().vm_config().runtime())) {
87
43
  lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated);
88
43
  ENVOY_LOG(debug, "Base Wasm created {} now active", lifecycle_stats_handler_.getActiveVmCount());
89
43
}
90
91
Wasm::Wasm(WasmHandleSharedPtr base_wasm_handle, Event::Dispatcher& dispatcher)
92
    : WasmBase(base_wasm_handle,
93
0
               [&base_wasm_handle]() {
94
0
                 return createWasmVm(absl::StrCat(
95
0
                     "envoy.wasm.runtime.",
96
0
                     toAbslStringView(base_wasm_handle->wasm()->wasm_vm()->getEngineName())));
97
0
               }),
98
      scope_(getWasm(base_wasm_handle)->scope_), api_(getWasm(base_wasm_handle)->api_),
99
      stat_name_pool_(scope_->symbolTable()),
100
      custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)),
101
      cluster_manager_(getWasm(base_wasm_handle)->clusterManager()), dispatcher_(dispatcher),
102
      time_source_(dispatcher.timeSource()),
103
0
      lifecycle_stats_handler_(getWasm(base_wasm_handle)->lifecycle_stats_handler_) {
104
0
  lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated);
105
0
  ENVOY_LOG(debug, "Thread-Local Wasm created {} now active",
106
0
            lifecycle_stats_handler_.getActiveVmCount());
107
0
}
108
109
43
void Wasm::error(std::string_view message) { ENVOY_LOG(error, "Wasm VM failed {}", message); }
110
111
0
void Wasm::setTimerPeriod(uint32_t context_id, std::chrono::milliseconds new_period) {
112
0
  auto& period = timer_period_[context_id];
113
0
  auto& timer = timer_[context_id];
114
0
  bool was_running = timer && period.count() > 0;
115
0
  period = new_period;
116
0
  if (was_running) {
117
0
    timer->disableTimer();
118
0
  }
119
0
  if (period.count() > 0) {
120
0
    timer = dispatcher_.createTimer(
121
0
        [weak = std::weak_ptr<Wasm>(std::static_pointer_cast<Wasm>(shared_from_this())),
122
0
         context_id]() {
123
0
          auto shared = weak.lock();
124
0
          if (shared) {
125
0
            shared->tickHandler(context_id);
126
0
          }
127
0
        });
128
0
    timer->enableTimer(period);
129
0
  }
130
0
}
131
132
0
void Wasm::tickHandler(uint32_t root_context_id) {
133
0
  auto period = timer_period_.find(root_context_id);
134
0
  auto timer = timer_.find(root_context_id);
135
0
  if (period == timer_period_.end() || timer == timer_.end() || !on_tick_) {
136
0
    return;
137
0
  }
138
0
  auto context = getContext(root_context_id);
139
0
  if (context) {
140
0
    context->onTick(0);
141
0
  }
142
0
  if (timer->second && period->second.count() > 0) {
143
0
    timer->second->enableTimer(period->second);
144
0
  }
145
0
}
146
147
43
Wasm::~Wasm() {
148
43
  lifecycle_stats_handler_.onEvent(WasmEvent::VmShutDown);
149
43
  ENVOY_LOG(debug, "~Wasm {} remaining active", lifecycle_stats_handler_.getActiveVmCount());
150
43
  if (server_shutdown_post_cb_) {
151
0
    dispatcher_.post(std::move(server_shutdown_post_cb_));
152
0
  }
153
43
}
154
155
// NOLINTNEXTLINE(readability-identifier-naming)
156
0
Word resolve_dns(Word dns_address_ptr, Word dns_address_size, Word token_ptr) {
157
0
  auto context = static_cast<Context*>(proxy_wasm::contextOrEffectiveContext());
158
0
  auto root_context = context->isRootContext() ? context : context->rootContext();
159
0
  auto address = context->wasmVm()->getMemory(dns_address_ptr, dns_address_size);
160
0
  if (!address) {
161
0
    return WasmResult::InvalidMemoryAccess;
162
0
  }
163
  // Verify set and verify token_ptr before initiating the async resolve.
164
0
  uint32_t token = context->wasm()->nextDnsToken();
165
0
  if (!context->wasm()->setDatatype(token_ptr, token)) {
166
0
    return WasmResult::InvalidMemoryAccess;
167
0
  }
168
0
  auto callback = [weak_wasm = std::weak_ptr<Wasm>(context->wasm()->sharedThis()), root_context,
169
0
                   context_id = context->id(),
170
0
                   token](Envoy::Network::DnsResolver::ResolutionStatus status,
171
0
                          std::list<Envoy::Network::DnsResponse>&& response) {
172
0
    auto wasm = weak_wasm.lock();
173
0
    if (!wasm) {
174
0
      return;
175
0
    }
176
0
    root_context->onResolveDns(token, status, std::move(response));
177
0
  };
178
0
  if (!context->wasm()->dnsResolver()) {
179
0
    envoy::config::core::v3::TypedExtensionConfig typed_dns_resolver_config;
180
0
    Network::DnsResolverFactory& dns_resolver_factory =
181
0
        Network::createDefaultDnsResolverFactory(typed_dns_resolver_config);
182
0
    context->wasm()->dnsResolver() = dns_resolver_factory.createDnsResolver(
183
0
        context->wasm()->dispatcher(), context->wasm()->api(), typed_dns_resolver_config);
184
0
  }
185
0
  context->wasm()->dnsResolver()->resolve(std::string(address.value()),
186
0
                                          Network::DnsLookupFamily::Auto, callback);
187
0
  return WasmResult::Ok;
188
0
}
189
190
0
void Wasm::registerCallbacks() {
191
0
  WasmBase::registerCallbacks();
192
0
#define _REGISTER(_fn)                                                                             \
193
0
  wasm_vm_->registerCallback(                                                                      \
194
0
      "env", "envoy_" #_fn, &_fn,                                                                  \
195
0
      &proxy_wasm::ConvertFunctionWordToUint32<decltype(_fn), _fn>::convertFunctionWordToUint32)
196
0
  _REGISTER(resolve_dns);
197
0
#undef _REGISTER
198
0
}
199
200
0
void Wasm::getFunctions() {
201
0
  WasmBase::getFunctions();
202
0
#define _GET(_fn) wasm_vm_->getFunction("envoy_" #_fn, &_fn##_);
203
0
  _GET(on_resolve_dns)
204
0
  _GET(on_stats_update)
205
0
#undef _GET
206
0
}
207
208
0
proxy_wasm::CallOnThreadFunction Wasm::callOnThreadFunction() {
209
0
  auto& dispatcher = dispatcher_;
210
0
  return [&dispatcher](const std::function<void()>& f) { return dispatcher.post(f); };
211
0
}
212
213
0
ContextBase* Wasm::createContext(const std::shared_ptr<PluginBase>& plugin) {
214
0
  if (create_context_for_testing_) {
215
0
    return create_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin));
216
0
  }
217
0
  return new Context(this, std::static_pointer_cast<Plugin>(plugin));
218
0
}
219
220
0
ContextBase* Wasm::createRootContext(const std::shared_ptr<PluginBase>& plugin) {
221
0
  if (create_root_context_for_testing_) {
222
0
    return create_root_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin));
223
0
  }
224
0
  return new Context(this, std::static_pointer_cast<Plugin>(plugin));
225
0
}
226
227
0
ContextBase* Wasm::createVmContext() { return new Context(this); }
228
229
void Wasm::log(const PluginSharedPtr& plugin, const Formatter::HttpFormatterContext& log_context,
230
0
               const StreamInfo::StreamInfo& info) {
231
0
  auto context = getRootContext(plugin, true);
232
0
  context->log(log_context, info);
233
0
}
234
235
0
void Wasm::onStatsUpdate(const PluginSharedPtr& plugin, Envoy::Stats::MetricSnapshot& snapshot) {
236
0
  auto context = getRootContext(plugin, true);
237
0
  context->onStatsUpdate(snapshot);
238
0
}
239
240
0
void clearCodeCacheForTesting() {
241
0
  std::lock_guard<std::mutex> guard(code_cache_mutex);
242
0
  if (code_cache) {
243
0
    delete code_cache;
244
0
    code_cache = nullptr;
245
0
  }
246
0
  getCreateStatsHandler().resetStatsForTesting();
247
0
}
248
249
// TODO: remove this post #4160: Switch default to SimulatedTimeSystem.
250
0
void setTimeOffsetForCodeCacheForTesting(MonotonicTime::duration d) {
251
0
  cache_time_offset_for_testing = d;
252
0
}
253
254
static proxy_wasm::WasmHandleFactory
255
getWasmHandleFactory(WasmConfig& wasm_config, const Stats::ScopeSharedPtr& scope, Api::Api& api,
256
                     Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher,
257
43
                     Server::ServerLifecycleNotifier& lifecycle_notifier) {
258
43
  return [&wasm_config, &scope, &api, &cluster_manager, &dispatcher,
259
43
          &lifecycle_notifier](std::string_view vm_key) -> WasmHandleBaseSharedPtr {
260
43
    auto wasm = std::make_shared<Wasm>(wasm_config, toAbslStringView(vm_key), scope, api,
261
43
                                       cluster_manager, dispatcher);
262
43
    wasm->initializeLifecycle(lifecycle_notifier);
263
43
    return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm)));
264
43
  };
265
43
}
266
267
static proxy_wasm::WasmHandleCloneFactory
268
getWasmHandleCloneFactory(Event::Dispatcher& dispatcher,
269
43
                          CreateContextFn create_root_context_for_testing) {
270
43
  return [&dispatcher, create_root_context_for_testing](
271
43
             WasmHandleBaseSharedPtr base_wasm) -> std::shared_ptr<WasmHandleBase> {
272
0
    auto wasm = std::make_shared<Wasm>(std::static_pointer_cast<WasmHandle>(base_wasm), dispatcher);
273
0
    wasm->setCreateContextForTesting(nullptr, create_root_context_for_testing);
274
0
    return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm)));
275
0
  };
276
43
}
277
278
0
static proxy_wasm::PluginHandleFactory getPluginHandleFactory() {
279
0
  return [](WasmHandleBaseSharedPtr base_wasm,
280
0
            PluginBaseSharedPtr base_plugin) -> std::shared_ptr<PluginHandleBase> {
281
0
    return std::static_pointer_cast<PluginHandleBase>(
282
0
        std::make_shared<PluginHandle>(std::static_pointer_cast<WasmHandle>(base_wasm),
283
0
                                       std::static_pointer_cast<Plugin>(base_plugin)));
284
0
  };
285
0
}
286
287
43
WasmEvent toWasmEvent(const std::shared_ptr<WasmHandleBase>& wasm) {
288
43
  if (!wasm) {
289
43
    return WasmEvent::UnableToCreateVm;
290
43
  }
291
0
  switch (wasm->wasm()->fail_state()) {
292
0
  case FailState::Ok:
293
0
    return WasmEvent::Ok;
294
0
  case FailState::UnableToCreateVm:
295
0
    return WasmEvent::UnableToCreateVm;
296
0
  case FailState::UnableToCloneVm:
297
0
    return WasmEvent::UnableToCloneVm;
298
0
  case FailState::MissingFunction:
299
0
    return WasmEvent::MissingFunction;
300
0
  case FailState::UnableToInitializeCode:
301
0
    return WasmEvent::UnableToInitializeCode;
302
0
  case FailState::StartFailed:
303
0
    return WasmEvent::StartFailed;
304
0
  case FailState::ConfigureFailed:
305
0
    return WasmEvent::ConfigureFailed;
306
0
  case FailState::RuntimeError:
307
0
    return WasmEvent::RuntimeError;
308
0
  }
309
0
  PANIC("corrupt enum");
310
0
}
311
312
bool createWasm(const PluginSharedPtr& plugin, const Stats::ScopeSharedPtr& scope,
313
                Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager,
314
                Event::Dispatcher& dispatcher, Api::Api& api,
315
                Server::ServerLifecycleNotifier& lifecycle_notifier,
316
                Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
317
101
                CreateWasmCallback&& cb, CreateContextFn create_root_context_for_testing) {
318
101
  auto& stats_handler = getCreateStatsHandler();
319
101
  std::string source, code;
320
101
  auto config = plugin->wasmConfig();
321
101
  auto vm_config = config.config().vm_config();
322
101
  bool fetch = false;
323
101
  if (vm_config.code().has_remote()) {
324
    // TODO(https://github.com/envoyproxy/envoy/issues/25052) Stabilize this feature.
325
1
    ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
326
1
                        "Wasm remote code fetch is unstable and may cause a crash");
327
1
    auto now = dispatcher.timeSource().monotonicTime() + cache_time_offset_for_testing;
328
1
    source = vm_config.code().remote().http_uri().uri();
329
1
    std::lock_guard<std::mutex> guard(code_cache_mutex);
330
1
    if (!code_cache) {
331
1
      code_cache = new std::remove_reference<decltype(*code_cache)>::type;
332
1
    }
333
1
    Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
334
    // Remove entries older than CODE_CACHE_SECONDS_CACHING_TTL except for our target.
335
1
    for (auto it = code_cache->begin(); it != code_cache->end();) {
336
0
      if (now - it->second.use_time > std::chrono::seconds(CODE_CACHE_SECONDS_CACHING_TTL) &&
337
0
          it->first != vm_config.code().remote().sha256()) {
338
0
        code_cache->erase(it++);
339
0
      } else {
340
0
        ++it;
341
0
      }
342
0
    }
343
1
    stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
344
1
    auto it = code_cache->find(vm_config.code().remote().sha256());
345
1
    if (it != code_cache->end()) {
346
0
      it->second.use_time = now;
347
0
      if (it->second.in_progress) {
348
0
        stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss);
349
0
        ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
350
0
                            "createWasm: failed to load (in progress) from {}", source);
351
0
        cb(nullptr);
352
0
      }
353
0
      code = it->second.code;
354
0
      if (code.empty()) {
355
0
        if (now - it->second.fetch_time <
356
0
            std::chrono::seconds(CODE_CACHE_SECONDS_NEGATIVE_CACHING)) {
357
0
          stats_handler.onEvent(WasmEvent::RemoteLoadCacheNegativeHit);
358
0
          ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
359
0
                              "createWasm: failed to load (cached) from {}", source);
360
0
          cb(nullptr);
361
0
        }
362
0
        fetch = true; // Fetch failed, retry.
363
0
        it->second.in_progress = true;
364
0
        it->second.fetch_time = now;
365
0
      } else {
366
0
        stats_handler.onEvent(WasmEvent::RemoteLoadCacheHit);
367
0
      }
368
1
    } else {
369
1
      fetch = true; // Not in cache, fetch.
370
1
      auto& e = (*code_cache)[vm_config.code().remote().sha256()];
371
1
      e.in_progress = true;
372
1
      e.use_time = e.fetch_time = now;
373
1
      stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
374
1
      stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss);
375
1
    }
376
100
  } else if (vm_config.code().has_local()) {
377
45
    code = Config::DataSource::read(vm_config.code().local(), true, api);
378
45
    source = Config::DataSource::getPath(vm_config.code().local())
379
45
                 .value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
380
45
  }
381
382
101
  auto vm_key = proxy_wasm::makeVmKey(vm_config.vm_id(),
383
101
                                      MessageUtil::anyToBytes(vm_config.configuration()), code);
384
101
  auto complete_cb = [cb, vm_key, plugin, scope, &api, &cluster_manager, &dispatcher,
385
101
                      &lifecycle_notifier, create_root_context_for_testing,
386
101
                      &stats_handler](std::string code) -> bool {
387
98
    if (code.empty()) {
388
55
      cb(nullptr);
389
55
      return false;
390
55
    }
391
392
43
    auto config = plugin->wasmConfig();
393
43
    auto wasm = proxy_wasm::createWasm(
394
43
        vm_key, code, plugin,
395
43
        getWasmHandleFactory(config, scope, api, cluster_manager, dispatcher, lifecycle_notifier),
396
43
        getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
397
43
        config.config().vm_config().allow_precompiled());
398
43
    Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
399
43
    stats_handler.onEvent(toWasmEvent(wasm));
400
43
    if (!wasm || wasm->wasm()->isFailed()) {
401
43
      ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
402
43
                          "Unable to create Wasm");
403
43
      cb(nullptr);
404
43
      return false;
405
43
    }
406
0
    cb(std::static_pointer_cast<WasmHandle>(wasm));
407
0
    return true;
408
43
  };
409
410
101
  if (fetch) {
411
1
    auto holder = std::make_shared<std::unique_ptr<Event::DeferredDeletable>>();
412
1
    auto fetch_callback = [vm_config, complete_cb, source, &dispatcher, scope, holder, plugin,
413
1
                           &stats_handler](const std::string& code) {
414
1
      {
415
1
        std::lock_guard<std::mutex> guard(code_cache_mutex);
416
1
        auto& e = (*code_cache)[vm_config.code().remote().sha256()];
417
1
        e.in_progress = false;
418
1
        e.code = code;
419
1
        Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
420
1
        if (code.empty()) {
421
1
          stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchFailure);
422
1
        } else {
423
0
          stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchSuccess);
424
0
        }
425
1
        stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
426
1
      }
427
      // NB: xDS currently does not support failing asynchronously, so we fail immediately
428
      // if remote Wasm code is not cached and do a background fill.
429
1
      if (!vm_config.nack_on_code_cache_miss()) {
430
0
        if (code.empty()) {
431
0
          ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
432
0
                              "Failed to load Wasm code (fetch failed) from {}", source);
433
0
        }
434
0
        complete_cb(code);
435
0
      }
436
      // NB: must be deleted explicitly.
437
1
      if (*holder) {
438
1
        dispatcher.deferredDelete(Envoy::Event::DeferredDeletablePtr{holder->release()});
439
1
      }
440
1
    };
441
1
    if (vm_config.nack_on_code_cache_miss()) {
442
1
      auto adapter = std::make_unique<RemoteDataFetcherAdapter>(fetch_callback);
443
1
      auto fetcher = std::make_unique<Config::DataFetcher::RemoteDataFetcher>(
444
1
          cluster_manager, vm_config.code().remote().http_uri(), vm_config.code().remote().sha256(),
445
1
          *adapter);
446
1
      auto fetcher_ptr = fetcher.get();
447
1
      adapter->setFetcher(std::move(fetcher));
448
1
      *holder = std::move(adapter);
449
1
      fetcher_ptr->fetch();
450
1
      ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
451
1
                          fmt::format("Failed to load Wasm code (fetching) from {}", source));
452
1
      cb(nullptr);
453
1
      return false;
454
1
    } else {
455
0
      remote_data_provider = std::make_unique<Config::DataSource::RemoteAsyncDataProvider>(
456
0
          cluster_manager, init_manager, vm_config.code().remote(), dispatcher,
457
0
          api.randomGenerator(), true, fetch_callback);
458
0
    }
459
100
  } else {
460
100
    return complete_cb(code);
461
100
  }
462
0
  return true;
463
101
}
464
465
PluginHandleSharedPtr
466
getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginSharedPtr& plugin,
467
                             Event::Dispatcher& dispatcher,
468
99
                             CreateContextFn create_root_context_for_testing) {
469
99
  if (!base_wasm) {
470
99
    if (!plugin->fail_open_) {
471
92
      ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), critical,
472
92
                          "Plugin configured to fail closed failed to load");
473
92
    }
474
    // To handle the case when failed to create VMs and fail-open/close properly,
475
    // we still create PluginHandle with null WasmBase.
476
99
    return std::make_shared<PluginHandle>(nullptr, plugin);
477
99
  }
478
0
  return std::static_pointer_cast<PluginHandle>(proxy_wasm::getOrCreateThreadLocalPlugin(
479
0
      std::static_pointer_cast<WasmHandle>(base_wasm), plugin,
480
0
      getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
481
0
      getPluginHandleFactory()));
482
99
}
483
484
} // namespace Wasm
485
} // namespace Common
486
} // namespace Extensions
487
} // namespace Envoy