/proc/self/cwd/source/extensions/common/wasm/wasm.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/common/wasm/wasm.h" |
2 | | |
3 | | #include <algorithm> |
4 | | #include <chrono> |
5 | | |
6 | | #include "envoy/event/deferred_deletable.h" |
7 | | |
8 | | #include "source/common/common/logger.h" |
9 | | #include "source/common/network/dns_resolver/dns_factory_util.h" |
10 | | #include "source/extensions/common/wasm/plugin.h" |
11 | | #include "source/extensions/common/wasm/stats_handler.h" |
12 | | |
13 | | #include "absl/strings/str_cat.h" |
14 | | |
15 | | using proxy_wasm::FailState; |
16 | | using proxy_wasm::Word; |
17 | | |
18 | | namespace Envoy { |
19 | | |
20 | | using ScopeWeakPtr = std::weak_ptr<Stats::Scope>; |
21 | | |
22 | | namespace Extensions { |
23 | | namespace Common { |
24 | | namespace Wasm { |
25 | | namespace { |
26 | | |
27 | | struct CodeCacheEntry { |
28 | | std::string code; |
29 | | bool in_progress; |
30 | | MonotonicTime use_time; |
31 | | MonotonicTime fetch_time; |
32 | | }; |
33 | | |
34 | | class RemoteDataFetcherAdapter : public Config::DataFetcher::RemoteDataFetcherCallback, |
35 | | public Event::DeferredDeletable { |
36 | | public: |
37 | 1 | RemoteDataFetcherAdapter(std::function<void(std::string cb)> cb) : cb_(cb) {} |
38 | 1 | ~RemoteDataFetcherAdapter() override = default; |
39 | 0 | void onSuccess(const std::string& data) override { cb_(data); } |
40 | 1 | void onFailure(Config::DataFetcher::FailureReason) override { cb_(""); } |
41 | 1 | void setFetcher(std::unique_ptr<Config::DataFetcher::RemoteDataFetcher>&& fetcher) { |
42 | 1 | fetcher_ = std::move(fetcher); |
43 | 1 | } |
44 | | |
45 | | private: |
46 | | std::function<void(std::string)> cb_; |
47 | | std::unique_ptr<Config::DataFetcher::RemoteDataFetcher> fetcher_; |
48 | | }; |
49 | | |
50 | | const std::string INLINE_STRING = "<inline>"; |
51 | | const int CODE_CACHE_SECONDS_NEGATIVE_CACHING = 10; |
52 | | const int CODE_CACHE_SECONDS_CACHING_TTL = 24 * 3600; // 24 hours. |
53 | | MonotonicTime::duration cache_time_offset_for_testing{}; |
54 | | |
55 | | std::mutex code_cache_mutex; |
56 | | absl::flat_hash_map<std::string, CodeCacheEntry>* code_cache = nullptr; |
57 | | |
58 | | // Downcast WasmBase to the actual Wasm. |
59 | 0 | inline Wasm* getWasm(WasmHandleSharedPtr& base_wasm_handle) { |
60 | 0 | return static_cast<Wasm*>(base_wasm_handle->wasm().get()); |
61 | 0 | } |
62 | | |
63 | | } // namespace |
64 | | |
65 | 43 | void Wasm::initializeLifecycle(Server::ServerLifecycleNotifier& lifecycle_notifier) { |
66 | 43 | auto weak = std::weak_ptr<Wasm>(std::static_pointer_cast<Wasm>(shared_from_this())); |
67 | 43 | lifecycle_notifier.registerCallback(Server::ServerLifecycleNotifier::Stage::ShutdownExit, |
68 | 43 | [this, weak](Event::PostCb post_cb) { |
69 | 0 | auto lock = weak.lock(); |
70 | 0 | if (lock) { // See if we are still alive. |
71 | 0 | server_shutdown_post_cb_ = std::move(post_cb); |
72 | 0 | } |
73 | 0 | }); |
74 | 43 | } |
75 | | |
76 | | Wasm::Wasm(WasmConfig& config, absl::string_view vm_key, const Stats::ScopeSharedPtr& scope, |
77 | | Api::Api& api, Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher) |
78 | | : WasmBase( |
79 | | createWasmVm(config.config().vm_config().runtime()), config.config().vm_config().vm_id(), |
80 | | MessageUtil::anyToBytes(config.config().vm_config().configuration()), |
81 | | toStdStringView(vm_key), config.environmentVariables(), config.allowedCapabilities()), |
82 | | scope_(scope), api_(api), stat_name_pool_(scope_->symbolTable()), |
83 | | custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)), |
84 | | cluster_manager_(cluster_manager), dispatcher_(dispatcher), |
85 | | time_source_(dispatcher.timeSource()), lifecycle_stats_handler_(LifecycleStatsHandler( |
86 | 43 | scope, config.config().vm_config().runtime())) { |
87 | 43 | lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated); |
88 | 43 | ENVOY_LOG(debug, "Base Wasm created {} now active", lifecycle_stats_handler_.getActiveVmCount()); |
89 | 43 | } |
90 | | |
91 | | Wasm::Wasm(WasmHandleSharedPtr base_wasm_handle, Event::Dispatcher& dispatcher) |
92 | | : WasmBase(base_wasm_handle, |
93 | 0 | [&base_wasm_handle]() { |
94 | 0 | return createWasmVm(absl::StrCat( |
95 | 0 | "envoy.wasm.runtime.", |
96 | 0 | toAbslStringView(base_wasm_handle->wasm()->wasm_vm()->getEngineName()))); |
97 | 0 | }), |
98 | | scope_(getWasm(base_wasm_handle)->scope_), api_(getWasm(base_wasm_handle)->api_), |
99 | | stat_name_pool_(scope_->symbolTable()), |
100 | | custom_stat_namespace_(stat_name_pool_.add(CustomStatNamespace)), |
101 | | cluster_manager_(getWasm(base_wasm_handle)->clusterManager()), dispatcher_(dispatcher), |
102 | | time_source_(dispatcher.timeSource()), |
103 | 0 | lifecycle_stats_handler_(getWasm(base_wasm_handle)->lifecycle_stats_handler_) { |
104 | 0 | lifecycle_stats_handler_.onEvent(WasmEvent::VmCreated); |
105 | 0 | ENVOY_LOG(debug, "Thread-Local Wasm created {} now active", |
106 | 0 | lifecycle_stats_handler_.getActiveVmCount()); |
107 | 0 | } |
108 | | |
109 | 43 | void Wasm::error(std::string_view message) { ENVOY_LOG(error, "Wasm VM failed {}", message); } |
110 | | |
111 | 0 | void Wasm::setTimerPeriod(uint32_t context_id, std::chrono::milliseconds new_period) { |
112 | 0 | auto& period = timer_period_[context_id]; |
113 | 0 | auto& timer = timer_[context_id]; |
114 | 0 | bool was_running = timer && period.count() > 0; |
115 | 0 | period = new_period; |
116 | 0 | if (was_running) { |
117 | 0 | timer->disableTimer(); |
118 | 0 | } |
119 | 0 | if (period.count() > 0) { |
120 | 0 | timer = dispatcher_.createTimer( |
121 | 0 | [weak = std::weak_ptr<Wasm>(std::static_pointer_cast<Wasm>(shared_from_this())), |
122 | 0 | context_id]() { |
123 | 0 | auto shared = weak.lock(); |
124 | 0 | if (shared) { |
125 | 0 | shared->tickHandler(context_id); |
126 | 0 | } |
127 | 0 | }); |
128 | 0 | timer->enableTimer(period); |
129 | 0 | } |
130 | 0 | } |
131 | | |
132 | 0 | void Wasm::tickHandler(uint32_t root_context_id) { |
133 | 0 | auto period = timer_period_.find(root_context_id); |
134 | 0 | auto timer = timer_.find(root_context_id); |
135 | 0 | if (period == timer_period_.end() || timer == timer_.end() || !on_tick_) { |
136 | 0 | return; |
137 | 0 | } |
138 | 0 | auto context = getContext(root_context_id); |
139 | 0 | if (context) { |
140 | 0 | context->onTick(0); |
141 | 0 | } |
142 | 0 | if (timer->second && period->second.count() > 0) { |
143 | 0 | timer->second->enableTimer(period->second); |
144 | 0 | } |
145 | 0 | } |
146 | | |
147 | 43 | Wasm::~Wasm() { |
148 | 43 | lifecycle_stats_handler_.onEvent(WasmEvent::VmShutDown); |
149 | 43 | ENVOY_LOG(debug, "~Wasm {} remaining active", lifecycle_stats_handler_.getActiveVmCount()); |
150 | 43 | if (server_shutdown_post_cb_) { |
151 | 0 | dispatcher_.post(std::move(server_shutdown_post_cb_)); |
152 | 0 | } |
153 | 43 | } |
154 | | |
155 | | // NOLINTNEXTLINE(readability-identifier-naming) |
156 | 0 | Word resolve_dns(Word dns_address_ptr, Word dns_address_size, Word token_ptr) { |
157 | 0 | auto context = static_cast<Context*>(proxy_wasm::contextOrEffectiveContext()); |
158 | 0 | auto root_context = context->isRootContext() ? context : context->rootContext(); |
159 | 0 | auto address = context->wasmVm()->getMemory(dns_address_ptr, dns_address_size); |
160 | 0 | if (!address) { |
161 | 0 | return WasmResult::InvalidMemoryAccess; |
162 | 0 | } |
163 | | // Verify set and verify token_ptr before initiating the async resolve. |
164 | 0 | uint32_t token = context->wasm()->nextDnsToken(); |
165 | 0 | if (!context->wasm()->setDatatype(token_ptr, token)) { |
166 | 0 | return WasmResult::InvalidMemoryAccess; |
167 | 0 | } |
168 | 0 | auto callback = [weak_wasm = std::weak_ptr<Wasm>(context->wasm()->sharedThis()), root_context, |
169 | 0 | context_id = context->id(), |
170 | 0 | token](Envoy::Network::DnsResolver::ResolutionStatus status, |
171 | 0 | std::list<Envoy::Network::DnsResponse>&& response) { |
172 | 0 | auto wasm = weak_wasm.lock(); |
173 | 0 | if (!wasm) { |
174 | 0 | return; |
175 | 0 | } |
176 | 0 | root_context->onResolveDns(token, status, std::move(response)); |
177 | 0 | }; |
178 | 0 | if (!context->wasm()->dnsResolver()) { |
179 | 0 | envoy::config::core::v3::TypedExtensionConfig typed_dns_resolver_config; |
180 | 0 | Network::DnsResolverFactory& dns_resolver_factory = |
181 | 0 | Network::createDefaultDnsResolverFactory(typed_dns_resolver_config); |
182 | 0 | context->wasm()->dnsResolver() = dns_resolver_factory.createDnsResolver( |
183 | 0 | context->wasm()->dispatcher(), context->wasm()->api(), typed_dns_resolver_config); |
184 | 0 | } |
185 | 0 | context->wasm()->dnsResolver()->resolve(std::string(address.value()), |
186 | 0 | Network::DnsLookupFamily::Auto, callback); |
187 | 0 | return WasmResult::Ok; |
188 | 0 | } |
189 | | |
190 | 0 | void Wasm::registerCallbacks() { |
191 | 0 | WasmBase::registerCallbacks(); |
192 | 0 | #define _REGISTER(_fn) \ |
193 | 0 | wasm_vm_->registerCallback( \ |
194 | 0 | "env", "envoy_" #_fn, &_fn, \ |
195 | 0 | &proxy_wasm::ConvertFunctionWordToUint32<decltype(_fn), _fn>::convertFunctionWordToUint32) |
196 | 0 | _REGISTER(resolve_dns); |
197 | 0 | #undef _REGISTER |
198 | 0 | } |
199 | | |
200 | 0 | void Wasm::getFunctions() { |
201 | 0 | WasmBase::getFunctions(); |
202 | 0 | #define _GET(_fn) wasm_vm_->getFunction("envoy_" #_fn, &_fn##_); |
203 | 0 | _GET(on_resolve_dns) |
204 | 0 | _GET(on_stats_update) |
205 | 0 | #undef _GET |
206 | 0 | } |
207 | | |
208 | 0 | proxy_wasm::CallOnThreadFunction Wasm::callOnThreadFunction() { |
209 | 0 | auto& dispatcher = dispatcher_; |
210 | 0 | return [&dispatcher](const std::function<void()>& f) { return dispatcher.post(f); }; |
211 | 0 | } |
212 | | |
213 | 0 | ContextBase* Wasm::createContext(const std::shared_ptr<PluginBase>& plugin) { |
214 | 0 | if (create_context_for_testing_) { |
215 | 0 | return create_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin)); |
216 | 0 | } |
217 | 0 | return new Context(this, std::static_pointer_cast<Plugin>(plugin)); |
218 | 0 | } |
219 | | |
220 | 0 | ContextBase* Wasm::createRootContext(const std::shared_ptr<PluginBase>& plugin) { |
221 | 0 | if (create_root_context_for_testing_) { |
222 | 0 | return create_root_context_for_testing_(this, std::static_pointer_cast<Plugin>(plugin)); |
223 | 0 | } |
224 | 0 | return new Context(this, std::static_pointer_cast<Plugin>(plugin)); |
225 | 0 | } |
226 | | |
227 | 0 | ContextBase* Wasm::createVmContext() { return new Context(this); } |
228 | | |
229 | | void Wasm::log(const PluginSharedPtr& plugin, const Formatter::HttpFormatterContext& log_context, |
230 | 0 | const StreamInfo::StreamInfo& info) { |
231 | 0 | auto context = getRootContext(plugin, true); |
232 | 0 | context->log(log_context, info); |
233 | 0 | } |
234 | | |
235 | 0 | void Wasm::onStatsUpdate(const PluginSharedPtr& plugin, Envoy::Stats::MetricSnapshot& snapshot) { |
236 | 0 | auto context = getRootContext(plugin, true); |
237 | 0 | context->onStatsUpdate(snapshot); |
238 | 0 | } |
239 | | |
240 | 0 | void clearCodeCacheForTesting() { |
241 | 0 | std::lock_guard<std::mutex> guard(code_cache_mutex); |
242 | 0 | if (code_cache) { |
243 | 0 | delete code_cache; |
244 | 0 | code_cache = nullptr; |
245 | 0 | } |
246 | 0 | getCreateStatsHandler().resetStatsForTesting(); |
247 | 0 | } |
248 | | |
249 | | // TODO: remove this post #4160: Switch default to SimulatedTimeSystem. |
250 | 0 | void setTimeOffsetForCodeCacheForTesting(MonotonicTime::duration d) { |
251 | 0 | cache_time_offset_for_testing = d; |
252 | 0 | } |
253 | | |
254 | | static proxy_wasm::WasmHandleFactory |
255 | | getWasmHandleFactory(WasmConfig& wasm_config, const Stats::ScopeSharedPtr& scope, Api::Api& api, |
256 | | Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher, |
257 | 43 | Server::ServerLifecycleNotifier& lifecycle_notifier) { |
258 | 43 | return [&wasm_config, &scope, &api, &cluster_manager, &dispatcher, |
259 | 43 | &lifecycle_notifier](std::string_view vm_key) -> WasmHandleBaseSharedPtr { |
260 | 43 | auto wasm = std::make_shared<Wasm>(wasm_config, toAbslStringView(vm_key), scope, api, |
261 | 43 | cluster_manager, dispatcher); |
262 | 43 | wasm->initializeLifecycle(lifecycle_notifier); |
263 | 43 | return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm))); |
264 | 43 | }; |
265 | 43 | } |
266 | | |
267 | | static proxy_wasm::WasmHandleCloneFactory |
268 | | getWasmHandleCloneFactory(Event::Dispatcher& dispatcher, |
269 | 43 | CreateContextFn create_root_context_for_testing) { |
270 | 43 | return [&dispatcher, create_root_context_for_testing]( |
271 | 43 | WasmHandleBaseSharedPtr base_wasm) -> std::shared_ptr<WasmHandleBase> { |
272 | 0 | auto wasm = std::make_shared<Wasm>(std::static_pointer_cast<WasmHandle>(base_wasm), dispatcher); |
273 | 0 | wasm->setCreateContextForTesting(nullptr, create_root_context_for_testing); |
274 | 0 | return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(std::move(wasm))); |
275 | 0 | }; |
276 | 43 | } |
277 | | |
278 | 0 | static proxy_wasm::PluginHandleFactory getPluginHandleFactory() { |
279 | 0 | return [](WasmHandleBaseSharedPtr base_wasm, |
280 | 0 | PluginBaseSharedPtr base_plugin) -> std::shared_ptr<PluginHandleBase> { |
281 | 0 | return std::static_pointer_cast<PluginHandleBase>( |
282 | 0 | std::make_shared<PluginHandle>(std::static_pointer_cast<WasmHandle>(base_wasm), |
283 | 0 | std::static_pointer_cast<Plugin>(base_plugin))); |
284 | 0 | }; |
285 | 0 | } |
286 | | |
287 | 43 | WasmEvent toWasmEvent(const std::shared_ptr<WasmHandleBase>& wasm) { |
288 | 43 | if (!wasm) { |
289 | 43 | return WasmEvent::UnableToCreateVm; |
290 | 43 | } |
291 | 0 | switch (wasm->wasm()->fail_state()) { |
292 | 0 | case FailState::Ok: |
293 | 0 | return WasmEvent::Ok; |
294 | 0 | case FailState::UnableToCreateVm: |
295 | 0 | return WasmEvent::UnableToCreateVm; |
296 | 0 | case FailState::UnableToCloneVm: |
297 | 0 | return WasmEvent::UnableToCloneVm; |
298 | 0 | case FailState::MissingFunction: |
299 | 0 | return WasmEvent::MissingFunction; |
300 | 0 | case FailState::UnableToInitializeCode: |
301 | 0 | return WasmEvent::UnableToInitializeCode; |
302 | 0 | case FailState::StartFailed: |
303 | 0 | return WasmEvent::StartFailed; |
304 | 0 | case FailState::ConfigureFailed: |
305 | 0 | return WasmEvent::ConfigureFailed; |
306 | 0 | case FailState::RuntimeError: |
307 | 0 | return WasmEvent::RuntimeError; |
308 | 0 | } |
309 | 0 | PANIC("corrupt enum"); |
310 | 0 | } |
311 | | |
312 | | bool createWasm(const PluginSharedPtr& plugin, const Stats::ScopeSharedPtr& scope, |
313 | | Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager, |
314 | | Event::Dispatcher& dispatcher, Api::Api& api, |
315 | | Server::ServerLifecycleNotifier& lifecycle_notifier, |
316 | | Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider, |
317 | 101 | CreateWasmCallback&& cb, CreateContextFn create_root_context_for_testing) { |
318 | 101 | auto& stats_handler = getCreateStatsHandler(); |
319 | 101 | std::string source, code; |
320 | 101 | auto config = plugin->wasmConfig(); |
321 | 101 | auto vm_config = config.config().vm_config(); |
322 | 101 | bool fetch = false; |
323 | 101 | if (vm_config.code().has_remote()) { |
324 | | // TODO(https://github.com/envoyproxy/envoy/issues/25052) Stabilize this feature. |
325 | 1 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn, |
326 | 1 | "Wasm remote code fetch is unstable and may cause a crash"); |
327 | 1 | auto now = dispatcher.timeSource().monotonicTime() + cache_time_offset_for_testing; |
328 | 1 | source = vm_config.code().remote().http_uri().uri(); |
329 | 1 | std::lock_guard<std::mutex> guard(code_cache_mutex); |
330 | 1 | if (!code_cache) { |
331 | 1 | code_cache = new std::remove_reference<decltype(*code_cache)>::type; |
332 | 1 | } |
333 | 1 | Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope); |
334 | | // Remove entries older than CODE_CACHE_SECONDS_CACHING_TTL except for our target. |
335 | 1 | for (auto it = code_cache->begin(); it != code_cache->end();) { |
336 | 0 | if (now - it->second.use_time > std::chrono::seconds(CODE_CACHE_SECONDS_CACHING_TTL) && |
337 | 0 | it->first != vm_config.code().remote().sha256()) { |
338 | 0 | code_cache->erase(it++); |
339 | 0 | } else { |
340 | 0 | ++it; |
341 | 0 | } |
342 | 0 | } |
343 | 1 | stats_handler.onRemoteCacheEntriesChanged(code_cache->size()); |
344 | 1 | auto it = code_cache->find(vm_config.code().remote().sha256()); |
345 | 1 | if (it != code_cache->end()) { |
346 | 0 | it->second.use_time = now; |
347 | 0 | if (it->second.in_progress) { |
348 | 0 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss); |
349 | 0 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn, |
350 | 0 | "createWasm: failed to load (in progress) from {}", source); |
351 | 0 | cb(nullptr); |
352 | 0 | } |
353 | 0 | code = it->second.code; |
354 | 0 | if (code.empty()) { |
355 | 0 | if (now - it->second.fetch_time < |
356 | 0 | std::chrono::seconds(CODE_CACHE_SECONDS_NEGATIVE_CACHING)) { |
357 | 0 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheNegativeHit); |
358 | 0 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn, |
359 | 0 | "createWasm: failed to load (cached) from {}", source); |
360 | 0 | cb(nullptr); |
361 | 0 | } |
362 | 0 | fetch = true; // Fetch failed, retry. |
363 | 0 | it->second.in_progress = true; |
364 | 0 | it->second.fetch_time = now; |
365 | 0 | } else { |
366 | 0 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheHit); |
367 | 0 | } |
368 | 1 | } else { |
369 | 1 | fetch = true; // Not in cache, fetch. |
370 | 1 | auto& e = (*code_cache)[vm_config.code().remote().sha256()]; |
371 | 1 | e.in_progress = true; |
372 | 1 | e.use_time = e.fetch_time = now; |
373 | 1 | stats_handler.onRemoteCacheEntriesChanged(code_cache->size()); |
374 | 1 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheMiss); |
375 | 1 | } |
376 | 100 | } else if (vm_config.code().has_local()) { |
377 | 45 | code = Config::DataSource::read(vm_config.code().local(), true, api); |
378 | 45 | source = Config::DataSource::getPath(vm_config.code().local()) |
379 | 45 | .value_or(code.empty() ? EMPTY_STRING : INLINE_STRING); |
380 | 45 | } |
381 | | |
382 | 101 | auto vm_key = proxy_wasm::makeVmKey(vm_config.vm_id(), |
383 | 101 | MessageUtil::anyToBytes(vm_config.configuration()), code); |
384 | 101 | auto complete_cb = [cb, vm_key, plugin, scope, &api, &cluster_manager, &dispatcher, |
385 | 101 | &lifecycle_notifier, create_root_context_for_testing, |
386 | 101 | &stats_handler](std::string code) -> bool { |
387 | 98 | if (code.empty()) { |
388 | 55 | cb(nullptr); |
389 | 55 | return false; |
390 | 55 | } |
391 | | |
392 | 43 | auto config = plugin->wasmConfig(); |
393 | 43 | auto wasm = proxy_wasm::createWasm( |
394 | 43 | vm_key, code, plugin, |
395 | 43 | getWasmHandleFactory(config, scope, api, cluster_manager, dispatcher, lifecycle_notifier), |
396 | 43 | getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing), |
397 | 43 | config.config().vm_config().allow_precompiled()); |
398 | 43 | Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope); |
399 | 43 | stats_handler.onEvent(toWasmEvent(wasm)); |
400 | 43 | if (!wasm || wasm->wasm()->isFailed()) { |
401 | 43 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace, |
402 | 43 | "Unable to create Wasm"); |
403 | 43 | cb(nullptr); |
404 | 43 | return false; |
405 | 43 | } |
406 | 0 | cb(std::static_pointer_cast<WasmHandle>(wasm)); |
407 | 0 | return true; |
408 | 43 | }; |
409 | | |
410 | 101 | if (fetch) { |
411 | 1 | auto holder = std::make_shared<std::unique_ptr<Event::DeferredDeletable>>(); |
412 | 1 | auto fetch_callback = [vm_config, complete_cb, source, &dispatcher, scope, holder, plugin, |
413 | 1 | &stats_handler](const std::string& code) { |
414 | 1 | { |
415 | 1 | std::lock_guard<std::mutex> guard(code_cache_mutex); |
416 | 1 | auto& e = (*code_cache)[vm_config.code().remote().sha256()]; |
417 | 1 | e.in_progress = false; |
418 | 1 | e.code = code; |
419 | 1 | Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope); |
420 | 1 | if (code.empty()) { |
421 | 1 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchFailure); |
422 | 1 | } else { |
423 | 0 | stats_handler.onEvent(WasmEvent::RemoteLoadCacheFetchSuccess); |
424 | 0 | } |
425 | 1 | stats_handler.onRemoteCacheEntriesChanged(code_cache->size()); |
426 | 1 | } |
427 | | // NB: xDS currently does not support failing asynchronously, so we fail immediately |
428 | | // if remote Wasm code is not cached and do a background fill. |
429 | 1 | if (!vm_config.nack_on_code_cache_miss()) { |
430 | 0 | if (code.empty()) { |
431 | 0 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace, |
432 | 0 | "Failed to load Wasm code (fetch failed) from {}", source); |
433 | 0 | } |
434 | 0 | complete_cb(code); |
435 | 0 | } |
436 | | // NB: must be deleted explicitly. |
437 | 1 | if (*holder) { |
438 | 1 | dispatcher.deferredDelete(Envoy::Event::DeferredDeletablePtr{holder->release()}); |
439 | 1 | } |
440 | 1 | }; |
441 | 1 | if (vm_config.nack_on_code_cache_miss()) { |
442 | 1 | auto adapter = std::make_unique<RemoteDataFetcherAdapter>(fetch_callback); |
443 | 1 | auto fetcher = std::make_unique<Config::DataFetcher::RemoteDataFetcher>( |
444 | 1 | cluster_manager, vm_config.code().remote().http_uri(), vm_config.code().remote().sha256(), |
445 | 1 | *adapter); |
446 | 1 | auto fetcher_ptr = fetcher.get(); |
447 | 1 | adapter->setFetcher(std::move(fetcher)); |
448 | 1 | *holder = std::move(adapter); |
449 | 1 | fetcher_ptr->fetch(); |
450 | 1 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace, |
451 | 1 | fmt::format("Failed to load Wasm code (fetching) from {}", source)); |
452 | 1 | cb(nullptr); |
453 | 1 | return false; |
454 | 1 | } else { |
455 | 0 | remote_data_provider = std::make_unique<Config::DataSource::RemoteAsyncDataProvider>( |
456 | 0 | cluster_manager, init_manager, vm_config.code().remote(), dispatcher, |
457 | 0 | api.randomGenerator(), true, fetch_callback); |
458 | 0 | } |
459 | 100 | } else { |
460 | 100 | return complete_cb(code); |
461 | 100 | } |
462 | 0 | return true; |
463 | 101 | } |
464 | | |
465 | | PluginHandleSharedPtr |
466 | | getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginSharedPtr& plugin, |
467 | | Event::Dispatcher& dispatcher, |
468 | 99 | CreateContextFn create_root_context_for_testing) { |
469 | 99 | if (!base_wasm) { |
470 | 99 | if (!plugin->fail_open_) { |
471 | 92 | ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), critical, |
472 | 92 | "Plugin configured to fail closed failed to load"); |
473 | 92 | } |
474 | | // To handle the case when failed to create VMs and fail-open/close properly, |
475 | | // we still create PluginHandle with null WasmBase. |
476 | 99 | return std::make_shared<PluginHandle>(nullptr, plugin); |
477 | 99 | } |
478 | 0 | return std::static_pointer_cast<PluginHandle>(proxy_wasm::getOrCreateThreadLocalPlugin( |
479 | 0 | std::static_pointer_cast<WasmHandle>(base_wasm), plugin, |
480 | 0 | getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing), |
481 | 0 | getPluginHandleFactory())); |
482 | 99 | } |
483 | | |
484 | | } // namespace Wasm |
485 | | } // namespace Common |
486 | | } // namespace Extensions |
487 | | } // namespace Envoy |