/proc/self/cwd/source/extensions/common/wasm/wasm.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/common/wasm/wasm.h" |
2 | | |
3 | | #include <algorithm> |
4 | | #include <chrono> |
5 | | |
6 | | #include "envoy/event/deferred_deletable.h" |
7 | | |
8 | | #include "source/common/common/logger.h" |
9 | | #include "source/common/network/dns_resolver/dns_factory_util.h" |
10 | | #include "source/extensions/common/wasm/plugin.h" |
11 | | #include "source/extensions/common/wasm/remote_async_datasource.h" |
12 | | #include "source/extensions/common/wasm/stats_handler.h" |
13 | | |
14 | | #include "absl/strings/str_cat.h" |
15 | | |
16 | | using proxy_wasm::FailState; |
17 | | using proxy_wasm::Word; |
18 | | |
19 | | namespace Envoy { |
20 | | |
21 | | using ScopeWeakPtr = std::weak_ptr<Stats::Scope>; |
22 | | |
23 | | namespace Extensions { |
24 | | namespace Common { |
25 | | namespace Wasm { |
26 | | namespace { |
27 | | |
28 | | struct CodeCacheEntry { |
29 | | std::string code; |
30 | | bool in_progress; |
31 | | MonotonicTime use_time; |
32 | | MonotonicTime fetch_time; |
33 | | }; |
34 | | |
35 | | class RemoteDataFetcherAdapter : public Config::DataFetcher::RemoteDataFetcherCallback, |
36 | | public Event::DeferredDeletable { |
37 | | public: |
38 | 0 | RemoteDataFetcherAdapter(std::function<void(std::string cb)> cb) : cb_(cb) {} |
39 | 0 | ~RemoteDataFetcherAdapter() override = default; |
40 | 0 | void onSuccess(const std::string& data) override { cb_(data); } |
41 | 0 | void onFailure(Config::DataFetcher::FailureReason) override { cb_(""); } |
42 | 0 | void setFetcher(std::unique_ptr<Config::DataFetcher::RemoteDataFetcher>&& fetcher) { |
43 | 0 | fetcher_ = std::move(fetcher); |
44 | 0 | } |
45 | | |
46 | | private: |
47 | | std::function<void(std::string)> cb_; |
48 | | std::unique_ptr<Config::DataFetcher::RemoteDataFetcher> fetcher_; |
49 | | }; |
50 | | |
51 | | const std::string INLINE_STRING = "<inline>"; |
52 | | const int CODE_CACHE_SECONDS_NEGATIVE_CACHING = 10; |
53 | | const int CODE_CACHE_SECONDS_CACHING_TTL = 24 * 3600; // 24 hours. |
54 | | MonotonicTime::duration cache_time_offset_for_testing{}; |
55 | | |
56 | | std::mutex code_cache_mutex; |
57 | | absl::flat_hash_map<std::string, CodeCacheEntry>* code_cache = nullptr; |
58 | | |
59 | | // Downcast WasmBase to the actual Wasm. |
60 | 0 | inline Wasm* getWasm(WasmHandleSharedPtr& base_wasm_handle) { |
61 | 0 | return static_cast<Wasm*>(base_wasm_handle->wasm().get()); |
62 | 0 | } |
63 | | |
64 | | } // namespace |
65 | | |
66 | 1 | void Wasm::initializeLifecycle(Server::ServerLifecycleNotifier& lifecycle_notifier) { |
67 | 1 | auto weak = std::weak_ptr<Wasm>(std::static_pointer_cast<Wasm>(shared_from_this())); |
68 | 1 | lifecycle_notifier.registerCallback(Server::ServerLifecycleNotifier::Stage::ShutdownExit, |
69 | 1 | [this, weak](Event::PostCb post_cb) { |
70 | 0 | auto lock = weak.lock(); |
71 | 0 | if (lock) { // See if we are still alive. |
72 | 0 | server_shutdown_post_cb_ = std::move(post_cb); |
73 | 0 | } |
74 | 0 | }); |
75 | 1 | } |
76 | | |
77 | | Wasm::Wasm(WasmConfig& config, absl::string_view vm_key, const Stats::ScopeSharedPtr& scope, |
78 | | Api::Api& api, Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher) |
79 | | : WasmBase( |
80 | | createWasmVm(config.config().vm_config().runtime()), config.config().vm_config().vm_id(), |
81 | | MessageUtil::anyToBytes(config.config().vm_config().configuration()), |
82 | | toStdStringView(vm_key), config.environmentVariables(), config.allowedCapabilities()), |
83 | | scope_(scope), api_(api), stat_name_pool_(scope_->symbolTable()), |
84 | | custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)), |
85 | | cluster_manager_(cluster_manager), dispatcher_(dispatcher), |
86 | | time_source_(dispatcher.timeSource()), lifecycle_stats_handler_(LifecycleStatsHandler( |
87 | 1 | scope, config.config().vm_config().runtime())) { |
88 | 1 | lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated); |
89 | 1 | ENVOY_LOG(debug, "Base Wasm created {} now active", lifecycle_stats_handler_.getActiveVmCount()); |
90 | 1 | } |
91 | | |
92 | | Wasm::Wasm(WasmHandleSharedPtr base_wasm_handle, Event::Dispatcher& dispatcher) |
93 | | : WasmBase(base_wasm_handle, |
94 | 0 | [&base_wasm_handle]() { |
95 | 0 | return createWasmVm(absl::StrCat( |
96 | 0 | "envoy.wasm.runtime.", |
97 | 0 | toAbslStringView(base_wasm_handle->wasm()->wasm_vm()->getEngineName()))); |
98 | 0 | }), |
99 | | scope_(getWasm(base_wasm_handle)->scope_), api_(getWasm(base_wasm_handle)->api_), |
100 | | stat_name_pool_(scope_->symbolTable()), |
101 | | custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)), |
102 | | cluster_manager_(getWasm(base_wasm_handle)->clusterManager()), dispatcher_(dispatcher), |
103 | | time_source_(dispatcher.timeSource()), |
104 | 0 | lifecycle_stats_handler_(getWasm(base_wasm_handle)->lifecycle_stats_handler_) { |
105 | 0 | lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated); |
106 | 0 | ENVOY_LOG(debug, "Thread-Local Wasm created {} now active", |
107 | 0 | lifecycle_stats_handler_.getActiveVmCount()); |
108 | 0 | } |
109 | | |
110 | 1 | void Wasm::error(std::string_view message) { ENVOY_LOG(error, "Wasm VM failed {}", message); } |
111 | | |
112 | 0 | void Wasm::setTimerPeriod(uint32_t context_id, std::chrono::milliseconds new_period) { |
113 | 0 | auto& period = timer_period_[context_id]; |
114 | 0 | auto& timer = timer_[context_id]; |
115 | 0 | bool was_running = timer && period.count() > 0; |
116 | 0 | period = new_period; |
117 | 0 | if (was_running) { |
118 | 0 | timer->disableTimer(); |
119 | 0 | } |
120 | 0 | if (period.count() > 0) { |
121 | 0 | timer = dispatcher_.createTimer( |
122 | 0 | [weak = std::weak_ptr<Wasm>(std::static_pointer_cast<Wasm>(shared_from_this())), |
123 | 0 | context_id]() { |
124 | 0 | auto shared = weak.lock(); |
125 | 0 | if (shared) { |
126 | 0 | shared->tickHandler(context_id); |
127 | 0 | } |
128 | 0 | }); |
129 | 0 | timer->enableTimer(period); |
130 | 0 | } |
131 | 0 | } |
132 | | |
133 | 0 | void Wasm::tickHandler(uint32_t root_context_id) { |
134 | 0 | auto period = timer_period_.find(root_context_id); |
135 | 0 | auto timer = timer_.find(root_context_id); |
136 | 0 | if (period == timer_period_.end() || timer == timer_.end() || !on_tick_) { |
137 | 0 | return; |
138 | 0 | } |
139 | 0 | auto context = getContext(root_context_id); |
140 | 0 | if (context) { |
141 | 0 | context->onTick(0); |
142 | 0 | } |
143 | 0 | if (timer->second && period->second.count() > 0) { |
144 | 0 | timer->second->enableTimer(period->second); |
145 | 0 | } |
146 | 0 | } |
147 | | |
148 | 1 | Wasm::~Wasm() { |
149 | 1 | lifecycle_stats_handler_.onEvent(WasmEvent::VmShutDown); |
150 | 1 | ENVOY_LOG(debug, "~Wasm {} remaining active", lifecycle_stats_handler_.getActiveVmCount()); |
151 | 1 | if (server_shutdown_post_cb_) { |
152 | 0 | dispatcher_.post(std::move(server_shutdown_post_cb_)); |
153 | 0 | } |
154 | 1 | } |
155 | | |
156 | | // NOLINTNEXTLINE(readability-identifier-naming) |
157 | 0 | Word resolve_dns(Word dns_address_ptr, Word dns_address_size, Word token_ptr) { |
158 | 0 | auto context = static_cast<Context*>(proxy_wasm::contextOrEffectiveContext()); |
159 | 0 | auto root_context = context->isRootContext() ? context : context->rootContext(); |
160 | 0 | auto address = context->wasmVm()->getMemory(dns_address_ptr, dns_address_size); |
161 | 0 | if (!address) { |
162 | 0 | return WasmResult::InvalidMemoryAccess; |
163 | 0 | } |
164 | | // Verify set and verify token_ptr before initiating the async resolve. |
165 | 0 | uint32_t token = context->wasm()->nextDnsToken(); |
166 | 0 | if (!context->wasm()->setDatatype(token_ptr, token)) { |
167 | 0 | return WasmResult::InvalidMemoryAccess; |
168 | 0 | } |
169 | 0 | auto callback = [weak_wasm = std::weak_ptr<Wasm>(context->wasm()->sharedThis()), root_context, |
170 | 0 | context_id = context->id(), |
171 | 0 | token](Envoy::Network::DnsResolver::ResolutionStatus status, absl::string_view, |
172 | 0 | std::list<Envoy::Network::DnsResponse>&& response) { |
173 | 0 | auto wasm = weak_wasm.lock(); |
174 | 0 | if (!wasm) { |
175 | 0 | return; |
176 | 0 | } |
177 | 0 | root_context->onResolveDns(token, status, std::move(response)); |
178 | 0 | }; |
179 | 0 | if (!context->wasm()->dnsResolver()) { |
180 | 0 | envoy::config::core::v3::TypedExtensionConfig typed_dns_resolver_config; |
181 | 0 | Network::DnsResolverFactory& dns_resolver_factory = |
182 | 0 | Network::createDefaultDnsResolverFactory(typed_dns_resolver_config); |
183 | 0 | context->wasm()->dnsResolver() = THROW_OR_RETURN_VALUE( |
184 | 0 | dns_resolver_factory.createDnsResolver(context->wasm()->dispatcher(), |
185 | 0 | context->wasm()->api(), typed_dns_resolver_config), |
186 | 0 | Network::DnsResolverSharedPtr); |
187 | 0 | } |
188 | 0 | context->wasm()->dnsResolver()->resolve(std::string(address.value()), |
189 | 0 | Network::DnsLookupFamily::Auto, callback); |
190 | 0 | return WasmResult::Ok; |
191 | 0 | } |
192 | | |
193 | 0 | void Wasm::registerCallbacks() { |
194 | 0 | WasmBase::registerCallbacks(); |
195 | 0 | #define _REGISTER(_fn) \ |
196 | 0 | wasm_vm_->registerCallback( \ |
197 | 0 | "env", "envoy_" #_fn, &_fn, \ |
198 | 0 | &proxy_wasm::ConvertFunctionWordToUint32<decltype(_fn), _fn>::convertFunctionWordToUint32) |
199 | 0 | _REGISTER(resolve_dns); |
200 | 0 | #undef _REGISTER |
201 | 0 | } |
202 | | |
203 | 0 | void Wasm::getFunctions() { |
204 | 0 | WasmBase::getFunctions(); |
205 | 0 | #define _GET(_fn) wasm_vm_->getFunction("envoy_" #_fn, &_fn##_); |
206 | 0 | _GET(on_resolve_dns) |
207 | 0 | _GET(on_stats_update) |
208 | 0 | #undef _GET |
209 | 0 | } |
210 | | |
211 | 0 | proxy_wasm::CallOnThreadFunction Wasm::callOnThreadFunction() { |
212 | 0 | auto& dispatcher = dispatcher_; |
213 | 0 | return [&dispatcher](const std::function<void()>& f) { return dispatcher.post(f); }; |
214 | 0 | } |
215 | | |
216 | 0 | ContextBase* Wasm::createContext(const std::shared_ptr<PluginBase>& plugin) { |
217 | 0 | if (create_context_for_testing_) { |
218 | 0 | return create_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin)); |
219 | 0 | } |
220 | 0 | return new Context(this, std::static_pointer_cast<Plugin>(plugin)); |
221 | 0 | } |
222 | | |
223 | 0 | ContextBase* Wasm::createRootContext(const std::shared_ptr<PluginBase>& plugin) { |
224 | 0 | if (create_root_context_for_testing_) { |
225 | 0 | return create_root_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin)); |
226 | 0 | } |
227 | 0 | return new Context(this, std::static_pointer_cast<Plugin>(plugin)); |
228 | 0 | } |
229 | | |
230 | 0 | ContextBase* Wasm::createVmContext() { return new Context(this); } |
231 | | |
232 | | void Wasm::log(const PluginSharedPtr& plugin, const Formatter::HttpFormatterContext& log_context, |
233 | 0 | const StreamInfo::StreamInfo& info) { |
234 | 0 | auto context = getRootContext(plugin, true); |
235 | 0 | context->log(log_context, info); |
236 | 0 | } |
237 | | |
238 | 0 | void Wasm::onStatsUpdate(const PluginSharedPtr& plugin, Envoy::Stats::MetricSnapshot& snapshot) { |
239 | 0 | auto context = getRootContext(plugin, true); |
240 | 0 | context->onStatsUpdate(snapshot); |
241 | 0 | } |
242 | | |
243 | 0 | void clearCodeCacheForTesting() { |
244 | 0 | std::lock_guard<std::mutex> guard(code_cache_mutex); |
245 | 0 | if (code_cache) { |
246 | 0 | delete code_cache; |
247 | 0 | code_cache = nullptr; |
248 | 0 | } |
249 | 0 | getCreateStatsHandler().resetStatsForTesting(); |
250 | 0 | } |
251 | | |
252 | | // TODO: remove this post #4160: Switch default to SimulatedTimeSystem. |
253 | 0 | void setTimeOffsetForCodeCacheForTesting(MonotonicTime::duration d) { |
254 | 0 | cache_time_offset_for_testing = d; |
255 | 0 | } |
256 | | |
257 | | static proxy_wasm::WasmHandleFactory |
258 | | getWasmHandleFactory(WasmConfig& wasm_config, const Stats::ScopeSharedPtr& scope, Api::Api& api, |
259 | | Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher, |
260 | 1 | Server::ServerLifecycleNotifier& lifecycle_notifier) { |
261 | 1 | return [&wasm_config, &scope, &api, &cluster_manager, &dispatcher, |
262 | 1 | &lifecycle_notifier](std::string_view vm_key) -> WasmHandleBaseSharedPtr { |
263 | 1 | auto wasm = std::make_shared<Wasm>(wasm_config, toAbslStringView(vm_key), scope, api, |
264 | 1 | cluster_manager, dispatcher); |
265 | 1 | wasm->initializeLifecycle(lifecycle_notifier); |
266 | 1 | return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm))); |
267 | 1 | }; |
268 | 1 | } |
269 | | |
270 | | static proxy_wasm::WasmHandleCloneFactory |
271 | | getWasmHandleCloneFactory(Event::Dispatcher& dispatcher, |
272 | 1 | CreateContextFn create_root_context_for_testing) { |
273 | 1 | return [&dispatcher, create_root_context_for_testing]( |
274 | 1 | WasmHandleBaseSharedPtr base_wasm) -> std::shared_ptr<WasmHandleBase> { |
275 | 0 | auto wasm = std::make_shared<Wasm>(std::static_pointer_cast<WasmHandle>(base_wasm), dispatcher); |
276 | 0 | wasm->setCreateContextForTesting(nullptr, create_root_context_for_testing); |
277 | 0 | return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm))); |
278 | 0 | }; |
279 | 1 | } |
280 | | |
281 | 0 | static proxy_wasm::PluginHandleFactory getPluginHandleFactory() { |
282 | 0 | return [](WasmHandleBaseSharedPtr base_wasm, |
283 | 0 | PluginBaseSharedPtr base_plugin) -> std::shared_ptr<PluginHandleBase> { |
284 | 0 | return std::static_pointer_cast<PluginHandleBase>( |
285 | 0 | std::make_shared<PluginHandle>(std::static_pointer_cast<WasmHandle>(base_wasm), |
286 | 0 | std::static_pointer_cast<Plugin>(base_plugin))); |
287 | 0 | }; |
288 | 0 | } |
289 | | |
290 | 1 | WasmEvent toWasmEvent(const std::shared_ptr<WasmHandleBase>& wasm) { |
291 | 1 | if (!wasm) { |
292 | 1 | return WasmEvent::UnableToCreateVm; |
293 | 1 | } |
294 | 0 | switch (wasm->wasm()->fail_state()) { |
295 | 0 | case FailState::Ok: |
296 | 0 | return WasmEvent::Ok; |
297 | 0 | case FailState::UnableToCreateVm: |
298 | 0 | return WasmEvent::UnableToCreateVm; |
299 | 0 | case FailState::UnableToCloneVm: |
300 | 0 | return WasmEvent::UnableToCloneVm; |
301 | 0 | case FailState::MissingFunction: |
302 | 0 | return WasmEvent::MissingFunction; |
303 | 0 | case FailState::UnableToInitializeCode: |
304 | 0 | return WasmEvent::UnableToInitializeCode; |
305 | 0 | case FailState::StartFailed: |
306 | 0 | return WasmEvent::StartFailed; |
307 | 0 | case FailState::ConfigureFailed: |
308 | 0 | return WasmEvent::ConfigureFailed; |
309 | 0 | case FailState::RuntimeError: |
310 | 0 | return WasmEvent::RuntimeError; |
311 | 0 | } |
312 | 0 | PANIC("corrupt enum"); |
313 | 0 | } |
314 | | |
315 | | bool createWasm(const PluginSharedPtr& plugin, const Stats::ScopeSharedPtr& scope, |
316 | | Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager, |
317 | | Event::Dispatcher& dispatcher, Api::Api& api, |
318 | | Server::ServerLifecycleNotifier& lifecycle_notifier, |
319 | | RemoteAsyncDataProviderPtr& remote_data_provider, CreateWasmCallback&& cb, |
320 | 14 | CreateContextFn create_root_context_for_testing) { |
321 | 14 | auto& stats_handler = getCreateStatsHandler(); |
322 | 14 | std::string source, code; |
323 | 14 | auto config = plugin->wasmConfig(); |
324 | 14 | auto vm_config = config.config().vm_config(); |
325 | 14 | bool fetch = false; |
326 | 14 | if (vm_config.code().has_remote()) { |
327 | | // TODO(https://github.com/envoyproxy/envoy/issues/25052) Stabilize this feature. |
328 | 0 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn, |
329 | 0 | "Wasm remote code fetch is unstable and may cause a crash"); |
330 | 0 | auto now = dispatcher.timeSource().monotonicTime() + cache_time_offset_for_testing; |
331 | 0 | source = vm_config.code().remote().http_uri().uri(); |
332 | 0 | std::lock_guard<std::mutex> guard(code_cache_mutex); |
333 | 0 | if (!code_cache) { |
334 | 0 | code_cache = new std::remove_reference<decltype(*code_cache)>::type; |
335 | 0 | } |
336 | 0 | Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope); |
337 | | // Remove entries older than CODE_CACHE_SECONDS_CACHING_TTL except for our target. |
338 | 0 | for (auto it = code_cache->begin(); it != code_cache->end();) { |
339 | 0 | if (now - it->second.use_time > std::chrono::seconds(CODE_CACHE_SECONDS_CACHING_TTL) && |
340 | 0 | it->first != vm_config.code().remote().sha256()) { |
341 | 0 | code_cache->erase(it++); |
342 | 0 | } else { |
343 | 0 | ++it; |
344 | 0 | } |
345 | 0 | } |
346 | 0 | stats_handler.onRemoteCacheEntriesChanged(code_cache->size()); |
347 | 0 | auto it = code_cache->find(vm_config.code().remote().sha256()); |
348 | 0 | if (it != code_cache->end()) { |
349 | 0 | it->second.use_time = now; |
350 | 0 | if (it->second.in_progress) { |
351 | 0 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss); |
352 | 0 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn, |
353 | 0 | "createWasm: failed to load (in progress) from {}", source); |
354 | 0 | cb(nullptr); |
355 | 0 | } |
356 | 0 | code = it->second.code; |
357 | 0 | if (code.empty()) { |
358 | 0 | if (now - it->second.fetch_time < |
359 | 0 | std::chrono::seconds(CODE_CACHE_SECONDS_NEGATIVE_CACHING)) { |
360 | 0 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheNegativeHit); |
361 | 0 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn, |
362 | 0 | "createWasm: failed to load (cached) from {}", source); |
363 | 0 | cb(nullptr); |
364 | 0 | } |
365 | 0 | fetch = true; // Fetch failed, retry. |
366 | 0 | it->second.in_progress = true; |
367 | 0 | it->second.fetch_time = now; |
368 | 0 | } else { |
369 | 0 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheHit); |
370 | 0 | } |
371 | 0 | } else { |
372 | 0 | fetch = true; // Not in cache, fetch. |
373 | 0 | auto& e = (*code_cache)[vm_config.code().remote().sha256()]; |
374 | 0 | e.in_progress = true; |
375 | 0 | e.use_time = e.fetch_time = now; |
376 | 0 | stats_handler.onRemoteCacheEntriesChanged(code_cache->size()); |
377 | 0 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss); |
378 | 0 | } |
379 | 14 | } else if (vm_config.code().has_local()) { |
380 | 4 | code = THROW_OR_RETURN_VALUE(Config::DataSource::read(vm_config.code().local(), true, api), |
381 | 4 | std::string); |
382 | 4 | source = Config::DataSource::getPath(vm_config.code().local()) |
383 | 4 | .value_or(code.empty() ? EMPTY_STRING : INLINE_STRING); |
384 | 4 | } |
385 | | |
386 | 14 | auto vm_key = proxy_wasm::makeVmKey(vm_config.vm_id(), |
387 | 14 | MessageUtil::anyToBytes(vm_config.configuration()), code); |
388 | 14 | auto complete_cb = [cb, vm_key, plugin, scope, &api, &cluster_manager, &dispatcher, |
389 | 14 | &lifecycle_notifier, create_root_context_for_testing, |
390 | 14 | &stats_handler](std::string code) -> bool { |
391 | 11 | if (code.empty()) { |
392 | 10 | cb(nullptr); |
393 | 10 | return false; |
394 | 10 | } |
395 | | |
396 | 1 | auto config = plugin->wasmConfig(); |
397 | 1 | auto wasm = proxy_wasm::createWasm( |
398 | 1 | vm_key, code, plugin, |
399 | 1 | getWasmHandleFactory(config, scope, api, cluster_manager, dispatcher, lifecycle_notifier), |
400 | 1 | getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing), |
401 | 1 | config.config().vm_config().allow_precompiled()); |
402 | 1 | Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope); |
403 | 1 | stats_handler.onEvent(toWasmEvent(wasm)); |
404 | 1 | if (!wasm || wasm->wasm()->isFailed()) { |
405 | 1 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace, |
406 | 1 | "Unable to create Wasm"); |
407 | 1 | cb(nullptr); |
408 | 1 | return false; |
409 | 1 | } |
410 | 0 | cb(std::static_pointer_cast<WasmHandle>(wasm)); |
411 | 0 | return true; |
412 | 1 | }; |
413 | | |
414 | 14 | if (fetch) { |
415 | 0 | auto holder = std::make_shared<std::unique_ptr<Event::DeferredDeletable>>(); |
416 | 0 | auto fetch_callback = [vm_config, complete_cb, source, &dispatcher, scope, holder, plugin, |
417 | 0 | &stats_handler](const std::string& code) { |
418 | 0 | { |
419 | 0 | std::lock_guard<std::mutex> guard(code_cache_mutex); |
420 | 0 | auto& e = (*code_cache)[vm_config.code().remote().sha256()]; |
421 | 0 | e.in_progress = false; |
422 | 0 | e.code = code; |
423 | 0 | Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope); |
424 | 0 | if (code.empty()) { |
425 | 0 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchFailure); |
426 | 0 | } else { |
427 | 0 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchSuccess); |
428 | 0 | } |
429 | 0 | stats_handler.onRemoteCacheEntriesChanged(code_cache->size()); |
430 | 0 | } |
431 | | // NB: xDS currently does not support failing asynchronously, so we fail immediately |
432 | | // if remote Wasm code is not cached and do a background fill. |
433 | 0 | if (!vm_config.nack_on_code_cache_miss()) { |
434 | 0 | if (code.empty()) { |
435 | 0 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace, |
436 | 0 | "Failed to load Wasm code (fetch failed) from {}", source); |
437 | 0 | } |
438 | 0 | complete_cb(code); |
439 | 0 | } |
440 | | // NB: must be deleted explicitly. |
441 | 0 | if (*holder) { |
442 | 0 | dispatcher.deferredDelete(Envoy::Event::DeferredDeletablePtr{holder->release()}); |
443 | 0 | } |
444 | 0 | }; |
445 | 0 | if (vm_config.nack_on_code_cache_miss()) { |
446 | 0 | auto adapter = std::make_unique<RemoteDataFetcherAdapter>(fetch_callback); |
447 | 0 | auto fetcher = std::make_unique<Config::DataFetcher::RemoteDataFetcher>( |
448 | 0 | cluster_manager, vm_config.code().remote().http_uri(), vm_config.code().remote().sha256(), |
449 | 0 | *adapter); |
450 | 0 | auto fetcher_ptr = fetcher.get(); |
451 | 0 | adapter->setFetcher(std::move(fetcher)); |
452 | 0 | *holder = std::move(adapter); |
453 | 0 | fetcher_ptr->fetch(); |
454 | 0 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace, |
455 | 0 | fmt::format("Failed to load Wasm code (fetching) from {}", source)); |
456 | 0 | cb(nullptr); |
457 | 0 | return false; |
458 | 0 | } else { |
459 | 0 | remote_data_provider = std::make_unique<RemoteAsyncDataProvider>( |
460 | 0 | cluster_manager, init_manager, vm_config.code().remote(), dispatcher, |
461 | 0 | api.randomGenerator(), true, fetch_callback); |
462 | 0 | } |
463 | 14 | } else { |
464 | 14 | return complete_cb(code); |
465 | 14 | } |
466 | 0 | return true; |
467 | 14 | } |
468 | | |
469 | | PluginHandleSharedPtr |
470 | | getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginSharedPtr& plugin, |
471 | | Event::Dispatcher& dispatcher, |
472 | 11 | CreateContextFn create_root_context_for_testing) { |
473 | 11 | if (!base_wasm) { |
474 | 11 | if (!plugin->fail_open_) { |
475 | 10 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), critical, |
476 | 10 | "Plugin configured to fail closed failed to load"); |
477 | 10 | } |
478 | | // To handle the case when failed to create VMs and fail-open/close properly, |
479 | | // we still create PluginHandle with null WasmBase. |
480 | 11 | return std::make_shared<PluginHandle>(nullptr, plugin); |
481 | 11 | } |
482 | 0 | return std::static_pointer_cast<PluginHandle>(proxy_wasm::getOrCreateThreadLocalPlugin( |
483 | 0 | std::static_pointer_cast<WasmHandle>(base_wasm), plugin, |
484 | 0 | getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing), |
485 | 0 | getPluginHandleFactory())); |
486 | 11 | } |
487 | | |
488 | | } // namespace Wasm |
489 | | } // namespace Common |
490 | | } // namespace Extensions |
491 | | } // namespace Envoy |