LCOV - code coverage report
Current view: top level - source/extensions/common/wasm - wasm.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 31 361 8.6 %
Date: 2024-01-05 06:35:25 Functions: 3 40 7.5 %

          Line data    Source code
       1             : #include "source/extensions/common/wasm/wasm.h"
       2             : 
       3             : #include <algorithm>
       4             : #include <chrono>
       5             : 
       6             : #include "envoy/event/deferred_deletable.h"
       7             : 
       8             : #include "source/common/common/logger.h"
       9             : #include "source/common/network/dns_resolver/dns_factory_util.h"
      10             : #include "source/extensions/common/wasm/plugin.h"
      11             : #include "source/extensions/common/wasm/stats_handler.h"
      12             : 
      13             : #include "absl/strings/str_cat.h"
      14             : 
      15             : using proxy_wasm::FailState;
      16             : using proxy_wasm::Word;
      17             : 
      18             : namespace Envoy {
      19             : 
      20             : using ScopeWeakPtr = std::weak_ptr<Stats::Scope>;
      21             : 
      22             : namespace Extensions {
      23             : namespace Common {
      24             : namespace Wasm {
      25             : namespace {
      26             : 
      27             : struct CodeCacheEntry {
      28             :   std::string code;
      29             :   bool in_progress;
      30             :   MonotonicTime use_time;
      31             :   MonotonicTime fetch_time;
      32             : };
      33             : 
      34             : class RemoteDataFetcherAdapter : public Config::DataFetcher::RemoteDataFetcherCallback,
      35             :                                  public Event::DeferredDeletable {
      36             : public:
      37           0 :   RemoteDataFetcherAdapter(std::function<void(std::string cb)> cb) : cb_(cb) {}
      38           0 :   ~RemoteDataFetcherAdapter() override = default;
      39           0 :   void onSuccess(const std::string& data) override { cb_(data); }
      40           0 :   void onFailure(Config::DataFetcher::FailureReason) override { cb_(""); }
      41           0 :   void setFetcher(std::unique_ptr<Config::DataFetcher::RemoteDataFetcher>&& fetcher) {
      42           0 :     fetcher_ = std::move(fetcher);
      43           0 :   }
      44             : 
      45             : private:
      46             :   std::function<void(std::string)> cb_;
      47             :   std::unique_ptr<Config::DataFetcher::RemoteDataFetcher> fetcher_;
      48             : };
      49             : 
      50             : const std::string INLINE_STRING = "<inline>";
      51             : const int CODE_CACHE_SECONDS_NEGATIVE_CACHING = 10;
      52             : const int CODE_CACHE_SECONDS_CACHING_TTL = 24 * 3600; // 24 hours.
      53             : MonotonicTime::duration cache_time_offset_for_testing{};
      54             : 
      55             : std::mutex code_cache_mutex;
      56             : absl::flat_hash_map<std::string, CodeCacheEntry>* code_cache = nullptr;
      57             : 
      58             : // Downcast WasmBase to the actual Wasm.
      59           0 : inline Wasm* getWasm(WasmHandleSharedPtr& base_wasm_handle) {
      60           0 :   return static_cast<Wasm*>(base_wasm_handle->wasm().get());
      61           0 : }
      62             : 
      63             : } // namespace
      64             : 
      65           0 : void Wasm::initializeLifecycle(Server::ServerLifecycleNotifier& lifecycle_notifier) {
      66           0 :   auto weak = std::weak_ptr<Wasm>(std::static_pointer_cast<Wasm>(shared_from_this()));
      67           0 :   lifecycle_notifier.registerCallback(Server::ServerLifecycleNotifier::Stage::ShutdownExit,
      68           0 :                                       [this, weak](Event::PostCb post_cb) {
      69           0 :                                         auto lock = weak.lock();
      70           0 :                                         if (lock) { // See if we are still alive.
      71           0 :                                           server_shutdown_post_cb_ = std::move(post_cb);
      72           0 :                                         }
      73           0 :                                       });
      74           0 : }
      75             : 
      76             : Wasm::Wasm(WasmConfig& config, absl::string_view vm_key, const Stats::ScopeSharedPtr& scope,
      77             :            Api::Api& api, Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher)
      78             :     : WasmBase(
      79             :           createWasmVm(config.config().vm_config().runtime()), config.config().vm_config().vm_id(),
      80             :           MessageUtil::anyToBytes(config.config().vm_config().configuration()),
      81             :           toStdStringView(vm_key), config.environmentVariables(), config.allowedCapabilities()),
      82             :       scope_(scope), api_(api), stat_name_pool_(scope_->symbolTable()),
      83             :       custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)),
      84             :       cluster_manager_(cluster_manager), dispatcher_(dispatcher),
      85             :       time_source_(dispatcher.timeSource()), lifecycle_stats_handler_(LifecycleStatsHandler(
      86           0 :                                                  scope, config.config().vm_config().runtime())) {
      87           0 :   lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated);
      88           0 :   ENVOY_LOG(debug, "Base Wasm created {} now active", lifecycle_stats_handler_.getActiveVmCount());
      89           0 : }
      90             : 
      91             : Wasm::Wasm(WasmHandleSharedPtr base_wasm_handle, Event::Dispatcher& dispatcher)
      92             :     : WasmBase(base_wasm_handle,
      93           0 :                [&base_wasm_handle]() {
      94           0 :                  return createWasmVm(absl::StrCat(
      95           0 :                      "envoy.wasm.runtime.",
      96           0 :                      toAbslStringView(base_wasm_handle->wasm()->wasm_vm()->getEngineName())));
      97           0 :                }),
      98             :       scope_(getWasm(base_wasm_handle)->scope_), api_(getWasm(base_wasm_handle)->api_),
      99             :       stat_name_pool_(scope_->symbolTable()),
     100             :       custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)),
     101             :       cluster_manager_(getWasm(base_wasm_handle)->clusterManager()), dispatcher_(dispatcher),
     102             :       time_source_(dispatcher.timeSource()),
     103           0 :       lifecycle_stats_handler_(getWasm(base_wasm_handle)->lifecycle_stats_handler_) {
     104           0 :   lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated);
     105           0 :   ENVOY_LOG(debug, "Thread-Local Wasm created {} now active",
     106           0 :             lifecycle_stats_handler_.getActiveVmCount());
     107           0 : }
     108             : 
     109           0 : void Wasm::error(std::string_view message) { ENVOY_LOG(error, "Wasm VM failed {}", message); }
     110             : 
     111           0 : void Wasm::setTimerPeriod(uint32_t context_id, std::chrono::milliseconds new_period) {
     112           0 :   auto& period = timer_period_[context_id];
     113           0 :   auto& timer = timer_[context_id];
     114           0 :   bool was_running = timer && period.count() > 0;
     115           0 :   period = new_period;
     116           0 :   if (was_running) {
     117           0 :     timer->disableTimer();
     118           0 :   }
     119           0 :   if (period.count() > 0) {
     120           0 :     timer = dispatcher_.createTimer(
     121           0 :         [weak = std::weak_ptr<Wasm>(std::static_pointer_cast<Wasm>(shared_from_this())),
     122           0 :          context_id]() {
     123           0 :           auto shared = weak.lock();
     124           0 :           if (shared) {
     125           0 :             shared->tickHandler(context_id);
     126           0 :           }
     127           0 :         });
     128           0 :     timer->enableTimer(period);
     129           0 :   }
     130           0 : }
     131             : 
     132           0 : void Wasm::tickHandler(uint32_t root_context_id) {
     133           0 :   auto period = timer_period_.find(root_context_id);
     134           0 :   auto timer = timer_.find(root_context_id);
     135           0 :   if (period == timer_period_.end() || timer == timer_.end() || !on_tick_) {
     136           0 :     return;
     137           0 :   }
     138           0 :   auto context = getContext(root_context_id);
     139           0 :   if (context) {
     140           0 :     context->onTick(0);
     141           0 :   }
     142           0 :   if (timer->second && period->second.count() > 0) {
     143           0 :     timer->second->enableTimer(period->second);
     144           0 :   }
     145           0 : }
     146             : 
     147           0 : Wasm::~Wasm() {
     148           0 :   lifecycle_stats_handler_.onEvent(WasmEvent::VmShutDown);
     149           0 :   ENVOY_LOG(debug, "~Wasm {} remaining active", lifecycle_stats_handler_.getActiveVmCount());
     150           0 :   if (server_shutdown_post_cb_) {
     151           0 :     dispatcher_.post(std::move(server_shutdown_post_cb_));
     152           0 :   }
     153           0 : }
     154             : 
     155             : // NOLINTNEXTLINE(readability-identifier-naming)
     156           0 : Word resolve_dns(Word dns_address_ptr, Word dns_address_size, Word token_ptr) {
     157           0 :   auto context = static_cast<Context*>(proxy_wasm::contextOrEffectiveContext());
     158           0 :   auto root_context = context->isRootContext() ? context : context->rootContext();
     159           0 :   auto address = context->wasmVm()->getMemory(dns_address_ptr, dns_address_size);
     160           0 :   if (!address) {
     161           0 :     return WasmResult::InvalidMemoryAccess;
     162           0 :   }
     163             :   // Verify set and verify token_ptr before initiating the async resolve.
     164           0 :   uint32_t token = context->wasm()->nextDnsToken();
     165           0 :   if (!context->wasm()->setDatatype(token_ptr, token)) {
     166           0 :     return WasmResult::InvalidMemoryAccess;
     167           0 :   }
     168           0 :   auto callback = [weak_wasm = std::weak_ptr<Wasm>(context->wasm()->sharedThis()), root_context,
     169           0 :                    context_id = context->id(),
     170           0 :                    token](Envoy::Network::DnsResolver::ResolutionStatus status,
     171           0 :                           std::list<Envoy::Network::DnsResponse>&& response) {
     172           0 :     auto wasm = weak_wasm.lock();
     173           0 :     if (!wasm) {
     174           0 :       return;
     175           0 :     }
     176           0 :     root_context->onResolveDns(token, status, std::move(response));
     177           0 :   };
     178           0 :   if (!context->wasm()->dnsResolver()) {
     179           0 :     envoy::config::core::v3::TypedExtensionConfig typed_dns_resolver_config;
     180           0 :     Network::DnsResolverFactory& dns_resolver_factory =
     181           0 :         Network::createDefaultDnsResolverFactory(typed_dns_resolver_config);
     182           0 :     context->wasm()->dnsResolver() = dns_resolver_factory.createDnsResolver(
     183           0 :         context->wasm()->dispatcher(), context->wasm()->api(), typed_dns_resolver_config);
     184           0 :   }
     185           0 :   context->wasm()->dnsResolver()->resolve(std::string(address.value()),
     186           0 :                                           Network::DnsLookupFamily::Auto, callback);
     187           0 :   return WasmResult::Ok;
     188           0 : }
     189             : 
     190           0 : void Wasm::registerCallbacks() {
     191           0 :   WasmBase::registerCallbacks();
     192           0 : #define _REGISTER(_fn)                                                                             \
     193           0 :   wasm_vm_->registerCallback(                                                                      \
     194           0 :       "env", "envoy_" #_fn, &_fn,                                                                  \
     195           0 :       &proxy_wasm::ConvertFunctionWordToUint32<decltype(_fn), _fn>::convertFunctionWordToUint32)
     196           0 :   _REGISTER(resolve_dns);
     197           0 : #undef _REGISTER
     198           0 : }
     199             : 
     200           0 : void Wasm::getFunctions() {
     201           0 :   WasmBase::getFunctions();
     202           0 : #define _GET(_fn) wasm_vm_->getFunction("envoy_" #_fn, &_fn##_);
     203           0 :   _GET(on_resolve_dns)
     204           0 :   _GET(on_stats_update)
     205           0 : #undef _GET
     206           0 : }
     207             : 
     208           0 : proxy_wasm::CallOnThreadFunction Wasm::callOnThreadFunction() {
     209           0 :   auto& dispatcher = dispatcher_;
     210           0 :   return [&dispatcher](const std::function<void()>& f) { return dispatcher.post(f); };
     211           0 : }
     212             : 
     213           0 : ContextBase* Wasm::createContext(const std::shared_ptr<PluginBase>& plugin) {
     214           0 :   if (create_context_for_testing_) {
     215           0 :     return create_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin));
     216           0 :   }
     217           0 :   return new Context(this, std::static_pointer_cast<Plugin>(plugin));
     218           0 : }
     219             : 
     220           0 : ContextBase* Wasm::createRootContext(const std::shared_ptr<PluginBase>& plugin) {
     221           0 :   if (create_root_context_for_testing_) {
     222           0 :     return create_root_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin));
     223           0 :   }
     224           0 :   return new Context(this, std::static_pointer_cast<Plugin>(plugin));
     225           0 : }
     226             : 
     227           0 : ContextBase* Wasm::createVmContext() { return new Context(this); }
     228             : 
     229             : void Wasm::log(const PluginSharedPtr& plugin, const Formatter::HttpFormatterContext& log_context,
     230           0 :                const StreamInfo::StreamInfo& info) {
     231           0 :   auto context = getRootContext(plugin, true);
     232           0 :   context->log(log_context, info);
     233           0 : }
     234             : 
     235           0 : void Wasm::onStatsUpdate(const PluginSharedPtr& plugin, Envoy::Stats::MetricSnapshot& snapshot) {
     236           0 :   auto context = getRootContext(plugin, true);
     237           0 :   context->onStatsUpdate(snapshot);
     238           0 : }
     239             : 
     240           0 : void clearCodeCacheForTesting() {
     241           0 :   std::lock_guard<std::mutex> guard(code_cache_mutex);
     242           0 :   if (code_cache) {
     243           0 :     delete code_cache;
     244           0 :     code_cache = nullptr;
     245           0 :   }
     246           0 :   getCreateStatsHandler().resetStatsForTesting();
     247           0 : }
     248             : 
     249             : // TODO: remove this post #4160: Switch default to SimulatedTimeSystem.
     250           0 : void setTimeOffsetForCodeCacheForTesting(MonotonicTime::duration d) {
     251           0 :   cache_time_offset_for_testing = d;
     252           0 : }
     253             : 
     254             : static proxy_wasm::WasmHandleFactory
     255             : getWasmHandleFactory(WasmConfig& wasm_config, const Stats::ScopeSharedPtr& scope, Api::Api& api,
     256             :                      Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher,
     257           0 :                      Server::ServerLifecycleNotifier& lifecycle_notifier) {
     258           0 :   return [&wasm_config, &scope, &api, &cluster_manager, &dispatcher,
     259           0 :           &lifecycle_notifier](std::string_view vm_key) -> WasmHandleBaseSharedPtr {
     260           0 :     auto wasm = std::make_shared<Wasm>(wasm_config, toAbslStringView(vm_key), scope, api,
     261           0 :                                        cluster_manager, dispatcher);
     262           0 :     wasm->initializeLifecycle(lifecycle_notifier);
     263           0 :     return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm)));
     264           0 :   };
     265           0 : }
     266             : 
     267             : static proxy_wasm::WasmHandleCloneFactory
     268             : getWasmHandleCloneFactory(Event::Dispatcher& dispatcher,
     269           0 :                           CreateContextFn create_root_context_for_testing) {
     270           0 :   return [&dispatcher, create_root_context_for_testing](
     271           0 :              WasmHandleBaseSharedPtr base_wasm) -> std::shared_ptr<WasmHandleBase> {
     272           0 :     auto wasm = std::make_shared<Wasm>(std::static_pointer_cast<WasmHandle>(base_wasm), dispatcher);
     273           0 :     wasm->setCreateContextForTesting(nullptr, create_root_context_for_testing);
     274           0 :     return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm)));
     275           0 :   };
     276           0 : }
     277             : 
     278           0 : static proxy_wasm::PluginHandleFactory getPluginHandleFactory() {
     279           0 :   return [](WasmHandleBaseSharedPtr base_wasm,
     280           0 :             PluginBaseSharedPtr base_plugin) -> std::shared_ptr<PluginHandleBase> {
     281           0 :     return std::static_pointer_cast<PluginHandleBase>(
     282           0 :         std::make_shared<PluginHandle>(std::static_pointer_cast<WasmHandle>(base_wasm),
     283           0 :                                        std::static_pointer_cast<Plugin>(base_plugin)));
     284           0 :   };
     285           0 : }
     286             : 
     287           0 : WasmEvent toWasmEvent(const std::shared_ptr<WasmHandleBase>& wasm) {
     288           0 :   if (!wasm) {
     289           0 :     return WasmEvent::UnableToCreateVm;
     290           0 :   }
     291           0 :   switch (wasm->wasm()->fail_state()) {
     292           0 :   case FailState::Ok:
     293           0 :     return WasmEvent::Ok;
     294           0 :   case FailState::UnableToCreateVm:
     295           0 :     return WasmEvent::UnableToCreateVm;
     296           0 :   case FailState::UnableToCloneVm:
     297           0 :     return WasmEvent::UnableToCloneVm;
     298           0 :   case FailState::MissingFunction:
     299           0 :     return WasmEvent::MissingFunction;
     300           0 :   case FailState::UnableToInitializeCode:
     301           0 :     return WasmEvent::UnableToInitializeCode;
     302           0 :   case FailState::StartFailed:
     303           0 :     return WasmEvent::StartFailed;
     304           0 :   case FailState::ConfigureFailed:
     305           0 :     return WasmEvent::ConfigureFailed;
     306           0 :   case FailState::RuntimeError:
     307           0 :     return WasmEvent::RuntimeError;
     308           0 :   }
     309           0 :   PANIC("corrupt enum");
     310           0 : }
     311             : 
     312             : bool createWasm(const PluginSharedPtr& plugin, const Stats::ScopeSharedPtr& scope,
     313             :                 Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager,
     314             :                 Event::Dispatcher& dispatcher, Api::Api& api,
     315             :                 Server::ServerLifecycleNotifier& lifecycle_notifier,
     316             :                 Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
     317           4 :                 CreateWasmCallback&& cb, CreateContextFn create_root_context_for_testing) {
     318           4 :   auto& stats_handler = getCreateStatsHandler();
     319           4 :   std::string source, code;
     320           4 :   auto config = plugin->wasmConfig();
     321           4 :   auto vm_config = config.config().vm_config();
     322           4 :   bool fetch = false;
     323           4 :   if (vm_config.code().has_remote()) {
     324             :     // TODO(https://github.com/envoyproxy/envoy/issues/25052) Stabilize this feature.
     325           0 :     ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
     326           0 :                         "Wasm remote code fetch is unstable and may cause a crash");
     327           0 :     auto now = dispatcher.timeSource().monotonicTime() + cache_time_offset_for_testing;
     328           0 :     source = vm_config.code().remote().http_uri().uri();
     329           0 :     std::lock_guard<std::mutex> guard(code_cache_mutex);
     330           0 :     if (!code_cache) {
     331           0 :       code_cache = new std::remove_reference<decltype(*code_cache)>::type;
     332           0 :     }
     333           0 :     Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
     334             :     // Remove entries older than CODE_CACHE_SECONDS_CACHING_TTL except for our target.
     335           0 :     for (auto it = code_cache->begin(); it != code_cache->end();) {
     336           0 :       if (now - it->second.use_time > std::chrono::seconds(CODE_CACHE_SECONDS_CACHING_TTL) &&
     337           0 :           it->first != vm_config.code().remote().sha256()) {
     338           0 :         code_cache->erase(it++);
     339           0 :       } else {
     340           0 :         ++it;
     341           0 :       }
     342           0 :     }
     343           0 :     stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
     344           0 :     auto it = code_cache->find(vm_config.code().remote().sha256());
     345           0 :     if (it != code_cache->end()) {
     346           0 :       it->second.use_time = now;
     347           0 :       if (it->second.in_progress) {
     348           0 :         stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss);
     349           0 :         ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
     350           0 :                             "createWasm: failed to load (in progress) from {}", source);
     351           0 :         cb(nullptr);
     352           0 :       }
     353           0 :       code = it->second.code;
     354           0 :       if (code.empty()) {
     355           0 :         if (now - it->second.fetch_time <
     356           0 :             std::chrono::seconds(CODE_CACHE_SECONDS_NEGATIVE_CACHING)) {
     357           0 :           stats_handler.onEvent(WasmEvent::RemoteLoadCacheNegativeHit);
     358           0 :           ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
     359           0 :                               "createWasm: failed to load (cached) from {}", source);
     360           0 :           cb(nullptr);
     361           0 :         }
     362           0 :         fetch = true; // Fetch failed, retry.
     363           0 :         it->second.in_progress = true;
     364           0 :         it->second.fetch_time = now;
     365           0 :       } else {
     366           0 :         stats_handler.onEvent(WasmEvent::RemoteLoadCacheHit);
     367           0 :       }
     368           0 :     } else {
     369           0 :       fetch = true; // Not in cache, fetch.
     370           0 :       auto& e = (*code_cache)[vm_config.code().remote().sha256()];
     371           0 :       e.in_progress = true;
     372           0 :       e.use_time = e.fetch_time = now;
     373           0 :       stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
     374           0 :       stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss);
     375           0 :     }
     376           4 :   } else if (vm_config.code().has_local()) {
     377           0 :     code = Config::DataSource::read(vm_config.code().local(), true, api);
     378           0 :     source = Config::DataSource::getPath(vm_config.code().local())
     379           0 :                  .value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
     380           0 :   }
     381             : 
     382           4 :   auto vm_key = proxy_wasm::makeVmKey(vm_config.vm_id(),
     383           4 :                                       MessageUtil::anyToBytes(vm_config.configuration()), code);
     384           4 :   auto complete_cb = [cb, vm_key, plugin, scope, &api, &cluster_manager, &dispatcher,
     385           4 :                       &lifecycle_notifier, create_root_context_for_testing,
     386           4 :                       &stats_handler](std::string code) -> bool {
     387           4 :     if (code.empty()) {
     388           4 :       cb(nullptr);
     389           4 :       return false;
     390           4 :     }
     391             : 
     392           0 :     auto config = plugin->wasmConfig();
     393           0 :     auto wasm = proxy_wasm::createWasm(
     394           0 :         vm_key, code, plugin,
     395           0 :         getWasmHandleFactory(config, scope, api, cluster_manager, dispatcher, lifecycle_notifier),
     396           0 :         getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
     397           0 :         config.config().vm_config().allow_precompiled());
     398           0 :     Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
     399           0 :     stats_handler.onEvent(toWasmEvent(wasm));
     400           0 :     if (!wasm || wasm->wasm()->isFailed()) {
     401           0 :       ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
     402           0 :                           "Unable to create Wasm");
     403           0 :       cb(nullptr);
     404           0 :       return false;
     405           0 :     }
     406           0 :     cb(std::static_pointer_cast<WasmHandle>(wasm));
     407           0 :     return true;
     408           0 :   };
     409             : 
     410           4 :   if (fetch) {
     411           0 :     auto holder = std::make_shared<std::unique_ptr<Event::DeferredDeletable>>();
     412           0 :     auto fetch_callback = [vm_config, complete_cb, source, &dispatcher, scope, holder, plugin,
     413           0 :                            &stats_handler](const std::string& code) {
     414           0 :       {
     415           0 :         std::lock_guard<std::mutex> guard(code_cache_mutex);
     416           0 :         auto& e = (*code_cache)[vm_config.code().remote().sha256()];
     417           0 :         e.in_progress = false;
     418           0 :         e.code = code;
     419           0 :         Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
     420           0 :         if (code.empty()) {
     421           0 :           stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchFailure);
     422           0 :         } else {
     423           0 :           stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchSuccess);
     424           0 :         }
     425           0 :         stats_handler.onRemoteCacheEntriesChanged(code_cache->size());
     426           0 :       }
     427             :       // NB: xDS currently does not support failing asynchronously, so we fail immediately
     428             :       // if remote Wasm code is not cached and do a background fill.
     429           0 :       if (!vm_config.nack_on_code_cache_miss()) {
     430           0 :         if (code.empty()) {
     431           0 :           ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
     432           0 :                               "Failed to load Wasm code (fetch failed) from {}", source);
     433           0 :         }
     434           0 :         complete_cb(code);
     435           0 :       }
     436             :       // NB: must be deleted explicitly.
     437           0 :       if (*holder) {
     438           0 :         dispatcher.deferredDelete(Envoy::Event::DeferredDeletablePtr{holder->release()});
     439           0 :       }
     440           0 :     };
     441           0 :     if (vm_config.nack_on_code_cache_miss()) {
     442           0 :       auto adapter = std::make_unique<RemoteDataFetcherAdapter>(fetch_callback);
     443           0 :       auto fetcher = std::make_unique<Config::DataFetcher::RemoteDataFetcher>(
     444           0 :           cluster_manager, vm_config.code().remote().http_uri(), vm_config.code().remote().sha256(),
     445           0 :           *adapter);
     446           0 :       auto fetcher_ptr = fetcher.get();
     447           0 :       adapter->setFetcher(std::move(fetcher));
     448           0 :       *holder = std::move(adapter);
     449           0 :       fetcher_ptr->fetch();
     450           0 :       ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
     451           0 :                           fmt::format("Failed to load Wasm code (fetching) from {}", source));
     452           0 :       cb(nullptr);
     453           0 :       return false;
     454           0 :     } else {
     455           0 :       remote_data_provider = std::make_unique<Config::DataSource::RemoteAsyncDataProvider>(
     456           0 :           cluster_manager, init_manager, vm_config.code().remote(), dispatcher,
     457           0 :           api.randomGenerator(), true, fetch_callback);
     458           0 :     }
     459           4 :   } else {
     460           4 :     return complete_cb(code);
     461           4 :   }
     462           0 :   return true;
     463           4 : }
     464             : 
     465             : PluginHandleSharedPtr
     466             : getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginSharedPtr& plugin,
     467             :                              Event::Dispatcher& dispatcher,
     468           4 :                              CreateContextFn create_root_context_for_testing) {
     469           4 :   if (!base_wasm) {
     470           4 :     if (!plugin->fail_open_) {
     471           4 :       ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), critical,
     472           4 :                           "Plugin configured to fail closed failed to load");
     473           4 :     }
     474             :     // To handle the case when failed to create VMs and fail-open/close properly,
     475             :     // we still create PluginHandle with null WasmBase.
     476           4 :     return std::make_shared<PluginHandle>(nullptr, plugin);
     477           4 :   }
     478           0 :   return std::static_pointer_cast<PluginHandle>(proxy_wasm::getOrCreateThreadLocalPlugin(
     479           0 :       std::static_pointer_cast<WasmHandle>(base_wasm), plugin,
     480           0 :       getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
     481           0 :       getPluginHandleFactory()));
     482           4 : }
     483             : 
     484             : } // namespace Wasm
     485             : } // namespace Common
     486             : } // namespace Extensions
     487             : } // namespace Envoy

Generated by: LCOV version 1.15