Line data Source code
1 : #include "source/extensions/common/wasm/wasm.h"
2 :
3 : #include <algorithm>
4 : #include <chrono>
5 :
6 : #include "envoy/event/deferred_deletable.h"
7 :
8 : #include "source/common/common/logger.h"
9 : #include "source/common/network/dns_resolver/dns_factory_util.h"
10 : #include "source/extensions/common/wasm/plugin.h"
11 : #include "source/extensions/common/wasm/stats_handler.h"
12 :
13 : #include "absl/strings/str_cat.h"
14 :
15 : using proxy_wasm::FailState;
16 : using proxy_wasm::Word;
17 :
18 : namespace Envoy {
19 :
20 : using ScopeWeakPtr = std::weak_ptr<Stats::Scope>;
21 :
22 : namespace Extensions {
23 : namespace Common {
24 : namespace Wasm {
25 : namespace {
26 :
27 : struct CodeCacheEntry {
28 : std::string code;
29 : bool in_progress;
30 : MonotonicTime use_time;
31 : MonotonicTime fetch_time;
32 : };
33 :
34 : class RemoteDataFetcherAdapter : public Config::DataFetcher::RemoteDataFetcherCallback,
35 : public Event::DeferredDeletable {
36 : public:
37 0 : RemoteDataFetcherAdapter(std::function<void(std::string cb)> cb) : cb_(cb) {}
38 0 : ~RemoteDataFetcherAdapter() override = default;
39 0 : void onSuccess(const std::string& data) override { cb_(data); }
40 0 : void onFailure(Config::DataFetcher::FailureReason) override { cb_(""); }
41 0 : void setFetcher(std::unique_ptr<Config::DataFetcher::RemoteDataFetcher>&& fetcher) {
42 0 : fetcher_ = std::move(fetcher);
43 0 : }
44 :
45 : private:
46 : std::function<void(std::string)> cb_;
47 : std::unique_ptr<Config::DataFetcher::RemoteDataFetcher> fetcher_;
48 : };
49 :
50 : const std::string INLINE_STRING = "<inline>";
51 : const int CODE_CACHE_SECONDS_NEGATIVE_CACHING = 10;
52 : const int CODE_CACHE_SECONDS_CACHING_TTL = 24 * 3600; // 24 hours.
53 : MonotonicTime::duration cache_time_offset_for_testing{};
54 :
55 : std::mutex code_cache_mutex;
56 : absl::flat_hash_map<std::string, CodeCacheEntry>* code_cache = nullptr;
57 :
58 : // Downcast WasmBase to the actual Wasm.
59 0 : inline Wasm* getWasm(WasmHandleSharedPtr& base_wasm_handle) {
60 0 : return static_cast<Wasm*>(base_wasm_handle->wasm().get());
61 0 : }
62 :
63 : } // namespace
64 :
65 0 : void Wasm::initializeLifecycle(Server::ServerLifecycleNotifier& lifecycle_notifier) {
66 0 : auto weak = std::weak_ptr<Wasm>(std::static_pointer_cast<Wasm>(shared_from_this()));
67 0 : lifecycle_notifier.registerCallback(Server::ServerLifecycleNotifier::Stage::ShutdownExit,
68 0 : [this, weak](Event::PostCb post_cb) {
69 0 : auto lock = weak.lock();
70 0 : if (lock) { // See if we are still alive.
71 0 : server_shutdown_post_cb_ = std::move(post_cb);
72 0 : }
73 0 : });
74 0 : }
75 :
76 : Wasm::Wasm(WasmConfig& config, absl::string_view vm_key, const Stats::ScopeSharedPtr& scope,
77 : Api::Api& api, Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher)
78 : : WasmBase(
79 : createWasmVm(config.config().vm_config().runtime()), config.config().vm_config().vm_id(),
80 : MessageUtil::anyToBytes(config.config().vm_config().configuration()),
81 : toStdStringView(vm_key), config.environmentVariables(), config.allowedCapabilities()),
82 : scope_(scope), api_(api), stat_name_pool_(scope_->symbolTable()),
83 : custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)),
84 : cluster_manager_(cluster_manager), dispatcher_(dispatcher),
85 : time_source_(dispatcher.timeSource()), lifecycle_stats_handler_(LifecycleStatsHandler(
86 0 : scope, config.config().vm_config().runtime())) {
87 0 : lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated);
88 0 : ENVOY_LOG(debug, "Base Wasm created {} now active", lifecycle_stats_handler_.getActiveVmCount());
89 0 : }
90 :
91 : Wasm::Wasm(WasmHandleSharedPtr base_wasm_handle, Event::Dispatcher& dispatcher)
92 : : WasmBase(base_wasm_handle,
93 0 : [&base_wasm_handle]() {
94 0 : return createWasmVm(absl::StrCat(
95 0 : "envoy.wasm.runtime.",
96 0 : toAbslStringView(base_wasm_handle->wasm()->wasm_vm()->getEngineName())));
97 0 : }),
98 : scope_(getWasm(base_wasm_handle)->scope_), api_(getWasm(base_wasm_handle)->api_),
99 : stat_name_pool_(scope_->symbolTable()),
100 : custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)),
101 : cluster_manager_(getWasm(base_wasm_handle)->clusterManager()), dispatcher_(dispatcher),
102 : time_source_(dispatcher.timeSource()),
103 0 : lifecycle_stats_handler_(getWasm(base_wasm_handle)->lifecycle_stats_handler_) {
104 0 : lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated);
105 0 : ENVOY_LOG(debug, "Thread-Local Wasm created {} now active",
106 0 : lifecycle_stats_handler_.getActiveVmCount());
107 0 : }
108 :
109 0 : void Wasm::error(std::string_view message) { ENVOY_LOG(error, "Wasm VM failed {}", message); }
110 :
111 0 : void Wasm::setTimerPeriod(uint32_t context_id, std::chrono::milliseconds new_period) {
112 0 : auto& period = timer_period_[context_id];
113 0 : auto& timer = timer_[context_id];
114 0 : bool was_running = timer && period.count() > 0;
115 0 : period = new_period;
116 0 : if (was_running) {
117 0 : timer->disableTimer();
118 0 : }
119 0 : if (period.count() > 0) {
120 0 : timer = dispatcher_.createTimer(
121 0 : [weak = std::weak_ptr<Wasm>(std::static_pointer_cast<Wasm>(shared_from_this())),
122 0 : context_id]() {
123 0 : auto shared = weak.lock();
124 0 : if (shared) {
125 0 : shared->tickHandler(context_id);
126 0 : }
127 0 : });
128 0 : timer->enableTimer(period);
129 0 : }
130 0 : }
131 :
132 0 : void Wasm::tickHandler(uint32_t root_context_id) {
133 0 : auto period = timer_period_.find(root_context_id);
134 0 : auto timer = timer_.find(root_context_id);
135 0 : if (period == timer_period_.end() || timer == timer_.end() || !on_tick_) {
136 0 : return;
137 0 : }
138 0 : auto context = getContext(root_context_id);
139 0 : if (context) {
140 0 : context->onTick(0);
141 0 : }
142 0 : if (timer->second && period->second.count() > 0) {
143 0 : timer->second->enableTimer(period->second);
144 0 : }
145 0 : }
146 :
147 0 : Wasm::~Wasm() {
148 0 : lifecycle_stats_handler_.onEvent(WasmEvent::VmShutDown);
149 0 : ENVOY_LOG(debug, "~Wasm {} remaining active", lifecycle_stats_handler_.getActiveVmCount());
150 0 : if (server_shutdown_post_cb_) {
151 0 : dispatcher_.post(std::move(server_shutdown_post_cb_));
152 0 : }
153 0 : }
154 :
155 : // NOLINTNEXTLINE(readability-identifier-naming)
156 0 : Word resolve_dns(Word dns_address_ptr, Word dns_address_size, Word token_ptr) {
157 0 : auto context = static_cast<Context*>(proxy_wasm::contextOrEffectiveContext());
158 0 : auto root_context = context->isRootContext() ? context : context->rootContext();
159 0 : auto address = context->wasmVm()->getMemory(dns_address_ptr, dns_address_size);
160 0 : if (!address) {
161 0 : return WasmResult::InvalidMemoryAccess;
162 0 : }
163 : // Verify set and verify token_ptr before initiating the async resolve.
164 0 : uint32_t token = context->wasm()->nextDnsToken();
165 0 : if (!context->wasm()->setDatatype(token_ptr, token)) {
166 0 : return WasmResult::InvalidMemoryAccess;
167 0 : }
168 0 : auto callback = [weak_wasm = std::weak_ptr<Wasm>(context->wasm()->sharedThis()), root_context,
169 0 : context_id = context->id(),
170 0 : token](Envoy::Network::DnsResolver::ResolutionStatus status,
171 0 : std::list<Envoy::Network::DnsResponse>&& response) {
172 0 : auto wasm = weak_wasm.lock();
173 0 : if (!wasm) {
174 0 : return;
175 0 : }
176 0 : root_context->onResolveDns(token, status, std::move(response));
177 0 : };
178 0 : if (!context->wasm()->dnsResolver()) {
179 0 : envoy::config::core::v3::TypedExtensionConfig typed_dns_resolver_config;
180 0 : Network::DnsResolverFactory& dns_resolver_factory =
181 0 : Network::createDefaultDnsResolverFactory(typed_dns_resolver_config);
182 0 : context->wasm()->dnsResolver() = dns_resolver_factory.createDnsResolver(
183 0 : context->wasm()->dispatcher(), context->wasm()->api(), typed_dns_resolver_config);
184 0 : }
185 0 : context->wasm()->dnsResolver()->resolve(std::string(address.value()),
186 0 : Network::DnsLookupFamily::Auto, callback);
187 0 : return WasmResult::Ok;
188 0 : }
189 :
190 0 : void Wasm::registerCallbacks() {
191 0 : WasmBase::registerCallbacks();
192 0 : #define _REGISTER(_fn) \
193 0 : wasm_vm_->registerCallback( \
194 0 : "env", "envoy_" #_fn, &_fn, \
195 0 : &proxy_wasm::ConvertFunctionWordToUint32<decltype(_fn), _fn>::convertFunctionWordToUint32)
196 0 : _REGISTER(resolve_dns);
197 0 : #undef _REGISTER
198 0 : }
199 :
200 0 : void Wasm::getFunctions() {
201 0 : WasmBase::getFunctions();
202 0 : #define _GET(_fn) wasm_vm_->getFunction("envoy_" #_fn, &_fn##_);
203 0 : _GET(on_resolve_dns)
204 0 : _GET(on_stats_update)
205 0 : #undef _GET
206 0 : }
207 :
208 0 : proxy_wasm::CallOnThreadFunction Wasm::callOnThreadFunction() {
209 0 : auto& dispatcher = dispatcher_;
210 0 : return [&dispatcher](const std::function<void()>& f) { return dispatcher.post(f); };
211 0 : }
212 :
213 0 : ContextBase* Wasm::createContext(const std::shared_ptr<PluginBase>& plugin) {
214 0 : if (create_context_for_testing_) {
215 0 : return create_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin));
216 0 : }
217 0 : return new Context(this, std::static_pointer_cast<Plugin>(plugin));
218 0 : }
219 :
220 0 : ContextBase* Wasm::createRootContext(const std::shared_ptr<PluginBase>& plugin) {
221 0 : if (create_root_context_for_testing_) {
222 0 : return create_root_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin));
223 0 : }
224 0 : return new Context(this, std::static_pointer_cast<Plugin>(plugin));
225 0 : }
226 :
227 0 : ContextBase* Wasm::createVmContext() { return new Context(this); }
228 :
229 : void Wasm::log(const PluginSharedPtr& plugin, const Formatter::HttpFormatterContext& log_context,
230 0 : const StreamInfo::StreamInfo& info) {
231 0 : auto context = getRootContext(plugin, true);
232 0 : context->log(log_context, info);
233 0 : }
234 :
235 0 : void Wasm::onStatsUpdate(const PluginSharedPtr& plugin, Envoy::Stats::MetricSnapshot& snapshot) {
236 0 : auto context = getRootContext(plugin, true);
237 0 : context->onStatsUpdate(snapshot);
238 0 : }
239 :
240 0 : void clearCodeCacheForTesting() {
241 0 : std::lock_guard<std::mutex> guard(code_cache_mutex);
242 0 : if (code_cache) {
243 0 : delete code_cache;
244 0 : code_cache = nullptr;
245 0 : }
246 0 : getCreateStatsHandler().resetStatsForTesting();
247 0 : }
248 :
249 : // TODO: remove this post #4160: Switch default to SimulatedTimeSystem.
250 0 : void setTimeOffsetForCodeCacheForTesting(MonotonicTime::duration d) {
251 0 : cache_time_offset_for_testing = d;
252 0 : }
253 :
254 : static proxy_wasm::WasmHandleFactory
255 : getWasmHandleFactory(WasmConfig& wasm_config, const Stats::ScopeSharedPtr& scope, Api::Api& api,
256 : Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher,
257 0 : Server::ServerLifecycleNotifier& lifecycle_notifier) {
258 0 : return [&wasm_config, &scope, &api, &cluster_manager, &dispatcher,
259 0 : &lifecycle_notifier](std::string_view vm_key) -> WasmHandleBaseSharedPtr {
260 0 : auto wasm = std::make_shared<Wasm>(wasm_config, toAbslStringView(vm_key), scope, api,
261 0 : cluster_manager, dispatcher);
262 0 : wasm->initializeLifecycle(lifecycle_notifier);
263 0 : return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm)));
264 0 : };
265 0 : }
266 :
267 : static proxy_wasm::WasmHandleCloneFactory
268 : getWasmHandleCloneFactory(Event::Dispatcher& dispatcher,
269 0 : CreateContextFn create_root_context_for_testing) {
270 0 : return [&dispatcher, create_root_context_for_testing](
271 0 : WasmHandleBaseSharedPtr base_wasm) -> std::shared_ptr<WasmHandleBase> {
272 0 : auto wasm = std::make_shared<Wasm>(std::static_pointer_cast<WasmHandle>(base_wasm), dispatcher);
273 0 : wasm->setCreateContextForTesting(nullptr, create_root_context_for_testing);
274 0 : return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm)));
275 0 : };
276 0 : }
277 :
278 0 : static proxy_wasm::PluginHandleFactory getPluginHandleFactory() {
279 0 : return [](WasmHandleBaseSharedPtr base_wasm,
280 0 : PluginBaseSharedPtr base_plugin) -> std::shared_ptr<PluginHandleBase> {
281 0 : return std::static_pointer_cast<PluginHandleBase>(
282 0 : std::make_shared<PluginHandle>(std::static_pointer_cast<WasmHandle>(base_wasm),
283 0 : std::static_pointer_cast<Plugin>(base_plugin)));
284 0 : };
285 0 : }
286 :
287 0 : WasmEvent toWasmEvent(const std::shared_ptr<WasmHandleBase>& wasm) {
288 0 : if (!wasm) {
289 0 : return WasmEvent::UnableToCreateVm;
290 0 : }
291 0 : switch (wasm->wasm()->fail_state()) {
292 0 : case FailState::Ok:
293 0 : return WasmEvent::Ok;
294 0 : case FailState::UnableToCreateVm:
295 0 : return WasmEvent::UnableToCreateVm;
296 0 : case FailState::UnableToCloneVm:
297 0 : return WasmEvent::UnableToCloneVm;
298 0 : case FailState::MissingFunction:
299 0 : return WasmEvent::MissingFunction;
300 0 : case FailState::UnableToInitializeCode:
301 0 : return WasmEvent::UnableToInitializeCode;
302 0 : case FailState::StartFailed:
303 0 : return WasmEvent::StartFailed;
304 0 : case FailState::ConfigureFailed:
305 0 : return WasmEvent::ConfigureFailed;
306 0 : case FailState::RuntimeError:
307 0 : return WasmEvent::RuntimeError;
308 0 : }
309 0 : PANIC("corrupt enum");
310 0 : }
311 :
312 : bool createWasm(const PluginSharedPtr& plugin, const Stats::ScopeSharedPtr& scope,
313 : Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager,
314 : Event::Dispatcher& dispatcher, Api::Api& api,
315 : Server::ServerLifecycleNotifier& lifecycle_notifier,
316 : Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
317 4 : CreateWasmCallback&& cb, CreateContextFn create_root_context_for_testing) {
318 4 : auto& stats_handler = getCreateStatsHandler();
319 4 : std::string source, code;
320 4 : auto config = plugin->wasmConfig();
321 4 : auto vm_config = config.config().vm_config();
322 4 : bool fetch = false;
323 4 : if (vm_config.code().has_remote()) {
324 : // TODO(https://github.com/envoyproxy/envoy/issues/25052) Stabilize this feature.
325 0 : ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
326 0 : "Wasm remote code fetch is unstable and may cause a crash");
327 0 : auto now = dispatcher.timeSource().monotonicTime() + cache_time_offset_for_testing;
328 0 : source = vm_config.code().remote().http_uri().uri();
329 0 : std::lock_guard<std::mutex> guard(code_cache_mutex);
330 0 : if (!code_cache) {
331 0 : code_cache = new std::remove_reference<decltype(*code_cache)>::type;
332 0 : }
333 0 : Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
334 : // Remove entries older than CODE_CACHE_SECONDS_CACHING_TTL except for our target.
335 0 : for (auto it = code_cache->begin(); it != code_cache->end();) {
336 0 : if (now - it->second.use_time > std::chrono::seconds(CODE_CACHE_SECONDS_CACHING_TTL) &&
337 0 : it->first != vm_config.code().remote().sha256()) {
338 0 : code_cache->erase(it++);
339 0 : } else {
340 0 : ++it;
341 0 : }
342 0 : }
343 0 : stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
344 0 : auto it = code_cache->find(vm_config.code().remote().sha256());
345 0 : if (it != code_cache->end()) {
346 0 : it->second.use_time = now;
347 0 : if (it->second.in_progress) {
348 0 : stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss);
349 0 : ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
350 0 : "createWasm: failed to load (in progress) from {}", source);
351 0 : cb(nullptr);
352 0 : }
353 0 : code = it->second.code;
354 0 : if (code.empty()) {
355 0 : if (now - it->second.fetch_time <
356 0 : std::chrono::seconds(CODE_CACHE_SECONDS_NEGATIVE_CACHING)) {
357 0 : stats_handler.onEvent(WasmEvent::RemoteLoadCacheNegativeHit);
358 0 : ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
359 0 : "createWasm: failed to load (cached) from {}", source);
360 0 : cb(nullptr);
361 0 : }
362 0 : fetch = true; // Fetch failed, retry.
363 0 : it->second.in_progress = true;
364 0 : it->second.fetch_time = now;
365 0 : } else {
366 0 : stats_handler.onEvent(WasmEvent::RemoteLoadCacheHit);
367 0 : }
368 0 : } else {
369 0 : fetch = true; // Not in cache, fetch.
370 0 : auto& e = (*code_cache)[vm_config.code().remote().sha256()];
371 0 : e.in_progress = true;
372 0 : e.use_time = e.fetch_time = now;
373 0 : stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
374 0 : stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss);
375 0 : }
376 4 : } else if (vm_config.code().has_local()) {
377 0 : code = Config::DataSource::read(vm_config.code().local(), true, api);
378 0 : source = Config::DataSource::getPath(vm_config.code().local())
379 0 : .value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
380 0 : }
381 :
382 4 : auto vm_key = proxy_wasm::makeVmKey(vm_config.vm_id(),
383 4 : MessageUtil::anyToBytes(vm_config.configuration()), code);
384 4 : auto complete_cb = [cb, vm_key, plugin, scope, &api, &cluster_manager, &dispatcher,
385 4 : &lifecycle_notifier, create_root_context_for_testing,
386 4 : &stats_handler](std::string code) -> bool {
387 4 : if (code.empty()) {
388 4 : cb(nullptr);
389 4 : return false;
390 4 : }
391 :
392 0 : auto config = plugin->wasmConfig();
393 0 : auto wasm = proxy_wasm::createWasm(
394 0 : vm_key, code, plugin,
395 0 : getWasmHandleFactory(config, scope, api, cluster_manager, dispatcher, lifecycle_notifier),
396 0 : getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
397 0 : config.config().vm_config().allow_precompiled());
398 0 : Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
399 0 : stats_handler.onEvent(toWasmEvent(wasm));
400 0 : if (!wasm || wasm->wasm()->isFailed()) {
401 0 : ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
402 0 : "Unable to create Wasm");
403 0 : cb(nullptr);
404 0 : return false;
405 0 : }
406 0 : cb(std::static_pointer_cast<WasmHandle>(wasm));
407 0 : return true;
408 0 : };
409 :
410 4 : if (fetch) {
411 0 : auto holder = std::make_shared<std::unique_ptr<Event::DeferredDeletable>>();
412 0 : auto fetch_callback = [vm_config, complete_cb, source, &dispatcher, scope, holder, plugin,
413 0 : &stats_handler](const std::string& code) {
414 0 : {
415 0 : std::lock_guard<std::mutex> guard(code_cache_mutex);
416 0 : auto& e = (*code_cache)[vm_config.code().remote().sha256()];
417 0 : e.in_progress = false;
418 0 : e.code = code;
419 0 : Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
420 0 : if (code.empty()) {
421 0 : stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchFailure);
422 0 : } else {
423 0 : stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchSuccess);
424 0 : }
425 0 : stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
426 0 : }
427 : // NB: xDS currently does not support failing asynchronously, so we fail immediately
428 : // if remote Wasm code is not cached and do a background fill.
429 0 : if (!vm_config.nack_on_code_cache_miss()) {
430 0 : if (code.empty()) {
431 0 : ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
432 0 : "Failed to load Wasm code (fetch failed) from {}", source);
433 0 : }
434 0 : complete_cb(code);
435 0 : }
436 : // NB: must be deleted explicitly.
437 0 : if (*holder) {
438 0 : dispatcher.deferredDelete(Envoy::Event::DeferredDeletablePtr{holder->release()});
439 0 : }
440 0 : };
441 0 : if (vm_config.nack_on_code_cache_miss()) {
442 0 : auto adapter = std::make_unique<RemoteDataFetcherAdapter>(fetch_callback);
443 0 : auto fetcher = std::make_unique<Config::DataFetcher::RemoteDataFetcher>(
444 0 : cluster_manager, vm_config.code().remote().http_uri(), vm_config.code().remote().sha256(),
445 0 : *adapter);
446 0 : auto fetcher_ptr = fetcher.get();
447 0 : adapter->setFetcher(std::move(fetcher));
448 0 : *holder = std::move(adapter);
449 0 : fetcher_ptr->fetch();
450 0 : ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
451 0 : fmt::format("Failed to load Wasm code (fetching) from {}", source));
452 0 : cb(nullptr);
453 0 : return false;
454 0 : } else {
455 0 : remote_data_provider = std::make_unique<Config::DataSource::RemoteAsyncDataProvider>(
456 0 : cluster_manager, init_manager, vm_config.code().remote(), dispatcher,
457 0 : api.randomGenerator(), true, fetch_callback);
458 0 : }
459 4 : } else {
460 4 : return complete_cb(code);
461 4 : }
462 0 : return true;
463 4 : }
464 :
465 : PluginHandleSharedPtr
466 : getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginSharedPtr& plugin,
467 : Event::Dispatcher& dispatcher,
468 4 : CreateContextFn create_root_context_for_testing) {
469 4 : if (!base_wasm) {
470 4 : if (!plugin->fail_open_) {
471 4 : ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), critical,
472 4 : "Plugin configured to fail closed failed to load");
473 4 : }
474 : // To handle the case when failed to create VMs and fail-open/close properly,
475 : // we still create PluginHandle with null WasmBase.
476 4 : return std::make_shared<PluginHandle>(nullptr, plugin);
477 4 : }
478 0 : return std::static_pointer_cast<PluginHandle>(proxy_wasm::getOrCreateThreadLocalPlugin(
479 0 : std::static_pointer_cast<WasmHandle>(base_wasm), plugin,
480 0 : getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
481 0 : getPluginHandleFactory()));
482 4 : }
483 :
484 : } // namespace Wasm
485 : } // namespace Common
486 : } // namespace Extensions
487 : } // namespace Envoy
|