Coverage Report

Created: 2025-10-31 09:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/node/src/node_platform.cc
Line
Count
Source
1
#include "node_platform.h"
2
#include "node_internals.h"
3
4
#include "env-inl.h"
5
#include "debug_utils-inl.h"
6
#include <algorithm>  // find_if(), find(), move()
7
#include <cmath>  // llround()
8
#include <memory>  // unique_ptr(), shared_ptr(), make_shared()
9
10
namespace node {
11
12
using v8::Isolate;
13
using v8::Object;
14
using v8::Platform;
15
using v8::Task;
16
using v8::TaskPriority;
17
18
namespace {
19
20
struct PlatformWorkerData {
21
  TaskQueue<TaskQueueEntry>* task_queue;
22
  Mutex* platform_workers_mutex;
23
  ConditionVariable* platform_workers_ready;
24
  int* pending_platform_workers;
25
  int id;
26
  PlatformDebugLogLevel debug_log_level;
27
};
28
29
0
const char* GetTaskPriorityName(TaskPriority priority) {
30
0
  switch (priority) {
31
0
    case TaskPriority::kUserBlocking:
32
0
      return "UserBlocking";
33
0
    case TaskPriority::kUserVisible:
34
0
      return "UserVisible";
35
0
    case TaskPriority::kBestEffort:
36
0
      return "BestEffort";
37
0
    default:
38
0
      return "Unknown";
39
0
  }
40
0
}
41
42
0
static void PrintSourceLocation(const v8::SourceLocation& location) {
43
0
  auto loc = location.ToString();
44
0
  if (!loc.empty()) {
45
0
    fprintf(stderr, " %s\n", loc.c_str());
46
0
  } else {
47
0
    fprintf(stderr, " <no location>\n");
48
0
  }
49
0
}
50
51
35
static void PlatformWorkerThread(void* data) {
52
35
  uv_thread_setname("V8Worker");
53
35
  std::unique_ptr<PlatformWorkerData>
54
35
      worker_data(static_cast<PlatformWorkerData*>(data));
55
56
35
  TaskQueue<TaskQueueEntry>* pending_worker_tasks = worker_data->task_queue;
57
35
  TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
58
35
                        "PlatformWorkerThread");
59
60
  // Notify the main thread that the platform worker is ready.
61
35
  {
62
35
    Mutex::ScopedLock lock(*worker_data->platform_workers_mutex);
63
35
    (*worker_data->pending_platform_workers)--;
64
35
    worker_data->platform_workers_ready->Signal(lock);
65
35
  }
66
67
35
  bool debug_log_enabled =
68
35
      worker_data->debug_log_level != PlatformDebugLogLevel::kNone;
69
35
  int id = worker_data->id;
70
10.3k
  while (std::unique_ptr<TaskQueueEntry> entry =
71
10.3k
             pending_worker_tasks->Lock().BlockingPop()) {
72
10.3k
    if (debug_log_enabled) {
73
0
      fprintf(stderr,
74
0
              "\nPlatformWorkerThread %d running task %p %s\n",
75
0
              id,
76
0
              entry->task.get(),
77
0
              GetTaskPriorityName(entry->priority));
78
0
      fflush(stderr);
79
0
    }
80
10.3k
    entry->task->Run();
81
    // See NodePlatform::DrainTasks().
82
10.3k
    if (entry->is_outstanding()) {
83
2.90k
      pending_worker_tasks->Lock().NotifyOfOutstandingCompletion();
84
2.90k
    }
85
10.3k
  }
86
35
}
87
88
35
static int GetActualThreadPoolSize(int thread_pool_size) {
89
35
  if (thread_pool_size < 1) {
90
0
    thread_pool_size = uv_available_parallelism() - 1;
91
0
  }
92
35
  return std::max(thread_pool_size, 1);
93
35
}
94
95
}  // namespace
96
97
class WorkerThreadsTaskRunner::DelayedTaskScheduler {
98
 public:
99
  explicit DelayedTaskScheduler(TaskQueue<TaskQueueEntry>* tasks)
100
35
      : pending_worker_tasks_(tasks) {}
101
102
35
  std::unique_ptr<uv_thread_t> Start() {
103
35
    auto start_thread = [](void* data) {
104
35
      uv_thread_setname("DelayedTaskSchedulerWorker");
105
35
      static_cast<DelayedTaskScheduler*>(data)->Run();
106
35
    };
107
35
    std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
108
35
    uv_sem_init(&ready_, 0);
109
35
    CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
110
35
    uv_sem_wait(&ready_);
111
35
    uv_sem_destroy(&ready_);
112
35
    return t;
113
35
  }
114
115
  void PostDelayedTask(v8::TaskPriority priority,
116
                       std::unique_ptr<Task> task,
117
0
                       double delay_in_seconds) {
118
0
    auto locked = tasks_.Lock();
119
120
0
    auto entry = std::make_unique<TaskQueueEntry>(std::move(task), priority);
121
0
    auto delayed = std::make_unique<ScheduleTask>(
122
0
        this, std::move(entry), delay_in_seconds);
123
124
    // The delayed task scheuler is on is own thread with its own loop that
125
    // runs the timers for the scheduled tasks to pop the original task back
126
    // into the the worker task queue. This first pushes the tasks that
127
    // schedules the timers into the local task queue that will be flushed
128
    // by the local event loop.
129
0
    locked.Push(std::move(delayed));
130
0
    uv_async_send(&flush_tasks_);
131
0
  }
132
133
35
  void Stop() {
134
35
    auto locked = tasks_.Lock();
135
35
    locked.Push(std::make_unique<StopTask>(this));
136
35
    uv_async_send(&flush_tasks_);
137
35
  }
138
139
 private:
140
35
  void Run() {
141
35
    TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
142
35
                          "WorkerThreadsTaskRunner::DelayedTaskScheduler");
143
35
    loop_.data = this;
144
35
    CHECK_EQ(0, uv_loop_init(&loop_));
145
35
    flush_tasks_.data = this;
146
35
    CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
147
35
    uv_sem_post(&ready_);
148
149
35
    uv_run(&loop_, UV_RUN_DEFAULT);
150
35
    CheckedUvLoopClose(&loop_);
151
35
  }
152
153
35
  static void FlushTasks(uv_async_t* flush_tasks) {
154
35
    DelayedTaskScheduler* scheduler =
155
35
        ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
156
157
35
    auto tasks_to_run = scheduler->tasks_.Lock().PopAll();
158
70
    while (!tasks_to_run.empty()) {
159
      // We have to use const_cast because std::priority_queue::top() does not
160
      // return a movable item.
161
35
      std::unique_ptr<Task> task =
162
35
          std::move(const_cast<std::unique_ptr<Task>&>(tasks_to_run.top()));
163
35
      tasks_to_run.pop();
164
      // This runs either the ScheduleTasks that scheduels the timers to
165
      // pop the tasks back into the worker task runner queue, or the
166
      // or the StopTasks to stop the timers and drop all the pending tasks.
167
35
      task->Run();
168
35
    }
169
35
  }
170
171
  class StopTask : public Task {
172
   public:
173
35
    explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
174
175
35
    void Run() override {
176
35
      std::vector<uv_timer_t*> timers;
177
35
      for (uv_timer_t* timer : scheduler_->timers_)
178
0
        timers.push_back(timer);
179
35
      for (uv_timer_t* timer : timers)
180
0
        scheduler_->TakeTimerTask(timer);
181
35
      uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
182
35
               [](uv_handle_t* handle) {});
183
35
    }
184
185
   private:
186
     DelayedTaskScheduler* scheduler_;
187
  };
188
189
  class ScheduleTask : public Task {
190
   public:
191
    ScheduleTask(DelayedTaskScheduler* scheduler,
192
                 std::unique_ptr<TaskQueueEntry> task,
193
                 double delay_in_seconds)
194
0
        : scheduler_(scheduler),
195
0
          task_(std::move(task)),
196
0
          delay_in_seconds_(delay_in_seconds) {}
197
198
0
    void Run() override {
199
0
      uint64_t delay_millis = llround(delay_in_seconds_ * 1000);
200
0
      std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
201
0
      CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
202
0
      timer->data = task_.release();
203
0
      CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
204
0
      scheduler_->timers_.insert(timer.release());
205
0
    }
206
207
   private:
208
    DelayedTaskScheduler* scheduler_;
209
    std::unique_ptr<TaskQueueEntry> task_;
210
    double delay_in_seconds_;
211
  };
212
213
0
  static void RunTask(uv_timer_t* timer) {
214
0
    DelayedTaskScheduler* scheduler =
215
0
        ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
216
0
    auto entry = scheduler->TakeTimerTask(timer);
217
0
    bool is_outstanding = entry->is_outstanding();
218
0
    scheduler->pending_worker_tasks_->Lock().Push(std::move(entry),
219
0
                                                  is_outstanding);
220
0
  }
221
222
0
  std::unique_ptr<TaskQueueEntry> TakeTimerTask(uv_timer_t* timer) {
223
0
    std::unique_ptr<TaskQueueEntry> task_entry(
224
0
        static_cast<TaskQueueEntry*>(timer->data));
225
0
    uv_timer_stop(timer);
226
0
    uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
227
0
      delete reinterpret_cast<uv_timer_t*>(handle);
228
0
    });
229
0
    timers_.erase(timer);
230
0
    return task_entry;
231
0
  }
232
233
  uv_sem_t ready_;
234
  // Task queue in the worker thread task runner, we push the delayed task back
235
  // to it when the timer expires.
236
  TaskQueue<TaskQueueEntry>* pending_worker_tasks_;
237
238
  // Locally scheduled tasks to be poped into the worker task runner queue.
239
  // It is flushed whenever the next closest timer expires.
240
  TaskQueue<Task> tasks_;
241
  uv_loop_t loop_;
242
  uv_async_t flush_tasks_;
243
  std::unordered_set<uv_timer_t*> timers_;
244
};
245
246
WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(
247
    int thread_pool_size, PlatformDebugLogLevel debug_log_level)
248
35
    : debug_log_level_(debug_log_level) {
249
35
  Mutex platform_workers_mutex;
250
35
  ConditionVariable platform_workers_ready;
251
252
35
  Mutex::ScopedLock lock(platform_workers_mutex);
253
35
  int pending_platform_workers = thread_pool_size;
254
255
35
  delayed_task_scheduler_ = std::make_unique<DelayedTaskScheduler>(
256
35
      &pending_worker_tasks_);
257
35
  threads_.push_back(delayed_task_scheduler_->Start());
258
259
70
  for (int i = 0; i < thread_pool_size; i++) {
260
35
    PlatformWorkerData* worker_data =
261
35
        new PlatformWorkerData{&pending_worker_tasks_,
262
35
                               &platform_workers_mutex,
263
35
                               &platform_workers_ready,
264
35
                               &pending_platform_workers,
265
35
                               i,
266
35
                               debug_log_level_};
267
35
    std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
268
35
    if (uv_thread_create(t.get(), PlatformWorkerThread,
269
35
                         worker_data) != 0) {
270
0
      break;
271
0
    }
272
35
    threads_.push_back(std::move(t));
273
35
  }
274
275
  // Wait for platform workers to initialize before continuing with the
276
  // bootstrap.
277
70
  while (pending_platform_workers > 0) {
278
35
    platform_workers_ready.Wait(lock);
279
35
  }
280
35
}
281
282
void WorkerThreadsTaskRunner::PostTask(v8::TaskPriority priority,
283
                                       std::unique_ptr<v8::Task> task,
284
10.3k
                                       const v8::SourceLocation& location) {
285
10.3k
  auto entry = std::make_unique<TaskQueueEntry>(std::move(task), priority);
286
10.3k
  bool is_outstanding = entry->is_outstanding();
287
10.3k
  pending_worker_tasks_.Lock().Push(std::move(entry), is_outstanding);
288
10.3k
}
289
290
void WorkerThreadsTaskRunner::PostDelayedTask(
291
    v8::TaskPriority priority,
292
    std::unique_ptr<v8::Task> task,
293
    const v8::SourceLocation& location,
294
0
    double delay_in_seconds) {
295
0
  delayed_task_scheduler_->PostDelayedTask(
296
0
      priority, std::move(task), delay_in_seconds);
297
0
}
298
299
758k
void WorkerThreadsTaskRunner::BlockingDrain() {
300
758k
  pending_worker_tasks_.Lock().BlockingDrain();
301
758k
}
302
303
35
void WorkerThreadsTaskRunner::Shutdown() {
304
35
  pending_worker_tasks_.Lock().Stop();
305
35
  delayed_task_scheduler_->Stop();
306
105
  for (size_t i = 0; i < threads_.size(); i++) {
307
70
    CHECK_EQ(0, uv_thread_join(threads_[i].get()));
308
70
  }
309
35
}
310
311
3.32k
int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
312
3.32k
  return threads_.size();
313
3.32k
}
314
315
PerIsolatePlatformData::PerIsolatePlatformData(
316
    Isolate* isolate, uv_loop_t* loop, PlatformDebugLogLevel debug_log_level)
317
35
    : isolate_(isolate), loop_(loop), debug_log_level_(debug_log_level) {
318
35
  flush_tasks_ = new uv_async_t();
319
35
  CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks));
320
35
  flush_tasks_->data = static_cast<void*>(this);
321
35
  uv_unref(reinterpret_cast<uv_handle_t*>(flush_tasks_));
322
35
}
323
324
std::shared_ptr<v8::TaskRunner>
325
2.19k
PerIsolatePlatformData::GetForegroundTaskRunner() {
326
2.19k
  return shared_from_this();
327
2.19k
}
328
329
36
void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
330
36
  auto platform_data = static_cast<PerIsolatePlatformData*>(handle->data);
331
36
  platform_data->FlushForegroundTasksInternal();
332
36
}
333
334
void PerIsolatePlatformData::PostIdleTaskImpl(
335
0
    std::unique_ptr<v8::IdleTask> task, const v8::SourceLocation& location) {
336
0
  UNREACHABLE();
337
0
}
338
339
void PerIsolatePlatformData::PostTaskImpl(std::unique_ptr<Task> task,
340
737k
                                          const v8::SourceLocation& location) {
341
  // The task can be posted from any V8 background worker thread, even when
342
  // the foreground task runner is being cleaned up by Shutdown(). In that
343
  // case, make sure we wait until the shutdown is completed (which leads
344
  // to flush_tasks_ == nullptr, and the task will be discarded).
345
737k
  if (debug_log_level_ != PlatformDebugLogLevel::kNone) {
346
0
    fprintf(stderr, "\nPerIsolatePlatformData::PostTaskImpl %p", task.get());
347
0
    PrintSourceLocation(location);
348
0
    if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) {
349
0
      DumpNativeBacktrace(stderr);
350
0
    }
351
0
    fflush(stderr);
352
0
  }
353
354
737k
  auto locked = foreground_tasks_.Lock();
355
737k
  if (flush_tasks_ == nullptr) return;
356
  // All foreground tasks are treated as user blocking tasks.
357
737k
  locked.Push(std::make_unique<TaskQueueEntry>(
358
737k
      std::move(task), v8::TaskPriority::kUserBlocking));
359
737k
  uv_async_send(flush_tasks_);
360
737k
}
361
362
void PerIsolatePlatformData::PostDelayedTaskImpl(
363
    std::unique_ptr<Task> task,
364
    double delay_in_seconds,
365
105
    const v8::SourceLocation& location) {
366
105
  if (debug_log_level_ != PlatformDebugLogLevel::kNone) {
367
0
    fprintf(stderr,
368
0
            "\nPerIsolatePlatformData::PostDelayedTaskImpl %p %f",
369
0
            task.get(),
370
0
            delay_in_seconds);
371
0
    PrintSourceLocation(location);
372
0
    if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) {
373
0
      DumpNativeBacktrace(stderr);
374
0
    }
375
0
    fflush(stderr);
376
0
  }
377
378
105
  auto locked = foreground_delayed_tasks_.Lock();
379
105
  if (flush_tasks_ == nullptr) return;
380
105
  std::unique_ptr<DelayedTask> delayed(new DelayedTask());
381
105
  delayed->task = std::move(task);
382
105
  delayed->platform_data = shared_from_this();
383
105
  delayed->timeout = delay_in_seconds;
384
  // All foreground tasks are treated as user blocking tasks.
385
105
  delayed->priority = v8::TaskPriority::kUserBlocking;
386
105
  locked.Push(std::move(delayed));
387
105
  uv_async_send(flush_tasks_);
388
105
}
389
390
void PerIsolatePlatformData::PostNonNestableTaskImpl(
391
737k
    std::unique_ptr<Task> task, const v8::SourceLocation& location) {
392
737k
  PostTaskImpl(std::move(task), location);
393
737k
}
394
395
void PerIsolatePlatformData::PostNonNestableDelayedTaskImpl(
396
    std::unique_ptr<Task> task,
397
    double delay_in_seconds,
398
69
    const v8::SourceLocation& location) {
399
69
  PostDelayedTaskImpl(std::move(task), delay_in_seconds, location);
400
69
}
401
402
0
PerIsolatePlatformData::~PerIsolatePlatformData() {
403
0
  CHECK(!flush_tasks_);
404
0
}
405
406
void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
407
0
                                                 void* data) {
408
0
  shutdown_callbacks_.emplace_back(ShutdownCallback { callback, data });
409
0
}
410
411
35
void PerIsolatePlatformData::Shutdown() {
412
35
  auto foreground_tasks_locked = foreground_tasks_.Lock();
413
35
  auto foreground_delayed_tasks_locked = foreground_delayed_tasks_.Lock();
414
415
35
  foreground_delayed_tasks_locked.PopAll();
416
35
  foreground_tasks_locked.PopAll();
417
35
  scheduled_delayed_tasks_.clear();
418
419
35
  if (flush_tasks_ != nullptr) {
420
    // Both destroying the scheduled_delayed_tasks_ lists and closing
421
    // flush_tasks_ handle add tasks to the event loop. We keep a count of all
422
    // non-closed handles, and when that reaches zero, we inform any shutdown
423
    // callbacks that the platform is done as far as this Isolate is concerned.
424
35
    self_reference_ = shared_from_this();
425
35
    uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
426
35
             [](uv_handle_t* handle) {
427
0
               std::unique_ptr<uv_async_t> flush_tasks{
428
0
                   reinterpret_cast<uv_async_t*>(handle)};
429
0
               PerIsolatePlatformData* platform_data =
430
0
                   static_cast<PerIsolatePlatformData*>(flush_tasks->data);
431
0
               platform_data->DecreaseHandleCount();
432
0
               platform_data->self_reference_.reset();
433
0
             });
434
35
    flush_tasks_ = nullptr;
435
35
  }
436
35
}
437
438
70
void PerIsolatePlatformData::DecreaseHandleCount() {
439
70
  CHECK_GE(uv_handle_count_, 1);
440
70
  if (--uv_handle_count_ == 0) {
441
0
    for (const auto& callback : shutdown_callbacks_)
442
0
      callback.cb(callback.data);
443
0
  }
444
70
}
445
446
NodePlatform::NodePlatform(int thread_pool_size,
447
                           v8::TracingController* tracing_controller,
448
35
                           v8::PageAllocator* page_allocator) {
449
35
  if (per_process::enabled_debug_list.enabled(
450
35
          DebugCategory::PLATFORM_VERBOSE)) {
451
0
    debug_log_level_ = PlatformDebugLogLevel::kVerbose;
452
35
  } else if (per_process::enabled_debug_list.enabled(
453
35
                 DebugCategory::PLATFORM_MINIMAL)) {
454
0
    debug_log_level_ = PlatformDebugLogLevel::kMinimal;
455
35
  } else {
456
35
    debug_log_level_ = PlatformDebugLogLevel::kNone;
457
35
  }
458
459
35
  if (tracing_controller != nullptr) {
460
0
    tracing_controller_ = tracing_controller;
461
35
  } else {
462
35
    tracing_controller_ = new v8::TracingController();
463
35
  }
464
465
  // V8 will default to its built in allocator if none is provided.
466
35
  page_allocator_ = page_allocator;
467
468
  // TODO(addaleax): It's a bit icky that we use global state here, but we can't
469
  // really do anything about it unless V8 starts exposing a way to access the
470
  // current v8::Platform instance.
471
35
  SetTracingController(tracing_controller_);
472
35
  DCHECK_EQ(GetTracingController(), tracing_controller_);
473
474
35
  thread_pool_size = GetActualThreadPoolSize(thread_pool_size);
475
35
  worker_thread_task_runner_ = std::make_shared<WorkerThreadsTaskRunner>(
476
35
      thread_pool_size, debug_log_level_);
477
35
}
478
479
35
NodePlatform::~NodePlatform() {
480
35
  Shutdown();
481
35
}
482
483
35
void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) {
484
35
  Mutex::ScopedLock lock(per_isolate_mutex_);
485
35
  auto delegate =
486
35
      std::make_shared<PerIsolatePlatformData>(isolate, loop, debug_log_level_);
487
35
  IsolatePlatformDelegate* ptr = delegate.get();
488
35
  auto insertion = per_isolate_.emplace(
489
35
    isolate,
490
35
    std::make_pair(ptr, std::move(delegate)));
491
35
  CHECK(insertion.second);
492
35
}
493
494
void NodePlatform::RegisterIsolate(Isolate* isolate,
495
0
                                   IsolatePlatformDelegate* delegate) {
496
0
  Mutex::ScopedLock lock(per_isolate_mutex_);
497
0
  auto insertion = per_isolate_.emplace(
498
0
    isolate,
499
0
    std::make_pair(delegate, std::shared_ptr<PerIsolatePlatformData>{}));
500
0
  CHECK(insertion.second);
501
0
}
502
503
35
void NodePlatform::UnregisterIsolate(Isolate* isolate) {
504
35
  Mutex::ScopedLock lock(per_isolate_mutex_);
505
35
  auto existing_it = per_isolate_.find(isolate);
506
35
  CHECK_NE(existing_it, per_isolate_.end());
507
35
  auto& existing = existing_it->second;
508
35
  if (existing.second) {
509
35
    existing.second->Shutdown();
510
35
  }
511
35
  per_isolate_.erase(existing_it);
512
35
}
513
514
void NodePlatform::AddIsolateFinishedCallback(Isolate* isolate,
515
0
                                              void (*cb)(void*), void* data) {
516
0
  Mutex::ScopedLock lock(per_isolate_mutex_);
517
0
  auto it = per_isolate_.find(isolate);
518
0
  if (it == per_isolate_.end()) {
519
0
    cb(data);
520
0
    return;
521
0
  }
522
0
  CHECK(it->second.second);
523
0
  it->second.second->AddShutdownCallback(cb, data);
524
0
}
525
526
70
void NodePlatform::Shutdown() {
527
70
  if (has_shut_down_) return;
528
35
  has_shut_down_ = true;
529
35
  worker_thread_task_runner_->Shutdown();
530
531
35
  {
532
35
    Mutex::ScopedLock lock(per_isolate_mutex_);
533
35
    per_isolate_.clear();
534
35
  }
535
35
}
536
537
3.32k
int NodePlatform::NumberOfWorkerThreads() {
538
3.32k
  return worker_thread_task_runner_->NumberOfWorkerThreads();
539
3.32k
}
540
541
737k
void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
542
737k
  if (isolate_->IsExecutionTerminating()) return;
543
737k
  DebugSealHandleScope scope(isolate_);
544
737k
  Environment* env = Environment::GetCurrent(isolate_);
545
737k
  if (env != nullptr) {
546
737k
    v8::HandleScope scope(isolate_);
547
737k
    InternalCallbackScope cb_scope(env, Object::New(isolate_), { 0, 0 },
548
737k
                                   InternalCallbackScope::kNoFlags);
549
737k
    task->Run();
550
737k
  } else {
551
    // When the Environment was freed, the tasks of the Isolate should also be
552
    // canceled by `NodePlatform::UnregisterIsolate`. However, if the embedder
553
    // request to run the foreground task after the Environment was freed, run
554
    // the task without InternalCallbackScope.
555
556
    // The task is moved out of InternalCallbackScope if env is not available.
557
    // This is a required else block, and should not be removed.
558
    // See comment: https://github.com/nodejs/node/pull/34688#pullrequestreview-463867489
559
0
    task->Run();
560
0
  }
561
737k
}
562
563
70
void PerIsolatePlatformData::DeleteFromScheduledTasks(DelayedTask* task) {
564
70
  auto it =
565
70
      std::ranges::find_if(scheduled_delayed_tasks_,
566
74
                           [task](const DelayedTaskPointer& delayed) -> bool {
567
74
                             return delayed.get() == task;
568
74
                           });
569
70
  CHECK_NE(it, scheduled_delayed_tasks_.end());
570
70
  scheduled_delayed_tasks_.erase(it);
571
70
}
572
573
70
void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
574
70
  DelayedTask* delayed = ContainerOf(&DelayedTask::timer, handle);
575
70
  delayed->platform_data->RunForegroundTask(std::move(delayed->task));
576
70
  delayed->platform_data->DeleteFromScheduledTasks(delayed);
577
70
}
578
579
22.1k
void NodePlatform::DrainTasks(Isolate* isolate) {
580
22.1k
  std::shared_ptr<PerIsolatePlatformData> per_isolate = ForNodeIsolate(isolate);
581
22.1k
  if (!per_isolate) return;
582
583
758k
  do {
584
    // FIXME(54918): we should not be blocking on the worker tasks on the
585
    // main thread in one go. Doing so leads to two problems:
586
    // 1. If any of the worker tasks post another foreground task and wait
587
    //    for it to complete, and that foreground task is posted right after
588
    //    we flush the foreground task queue and before the foreground thread
589
    //    goes into sleep, we'll never be able to wake up to execute that
590
    //    foreground task and in turn the worker task will never complete, and
591
    //    we have a deadlock.
592
    // 2. Worker tasks can be posted from any thread, not necessarily associated
593
    //    with the current isolate, and we can be blocking on a worker task that
594
    //    is associated with a completely unrelated isolate in the event loop.
595
    //    This is suboptimal.
596
    //
597
    // However, not blocking on the worker tasks at all can lead to loss of some
598
    // critical user-blocking worker tasks e.g. wasm async compilation tasks,
599
    // which should block the main thread until they are completed, as the
600
    // documentation suggets. As a compromise, we currently only block on
601
    // user-blocking tasks to reduce the chance of deadlocks while making sure
602
    // that criticl user-blocking tasks are not lost.
603
758k
    worker_thread_task_runner_->BlockingDrain();
604
758k
  } while (per_isolate->FlushForegroundTasksInternal());
605
22.1k
}
606
607
758k
bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
608
758k
  bool did_work = false;
609
610
758k
  auto delayed_tasks_to_schedule = foreground_delayed_tasks_.Lock().PopAll();
611
758k
  while (!delayed_tasks_to_schedule.empty()) {
612
    // We have to use const_cast because std::priority_queue::top() does not
613
    // return a movable item.
614
105
    std::unique_ptr<DelayedTask> delayed =
615
105
        std::move(const_cast<std::unique_ptr<DelayedTask>&>(
616
105
            delayed_tasks_to_schedule.top()));
617
105
    delayed_tasks_to_schedule.pop();
618
619
105
    did_work = true;
620
105
    uint64_t delay_millis = llround(delayed->timeout * 1000);
621
622
105
    delayed->timer.data = static_cast<void*>(delayed.get());
623
105
    uv_timer_init(loop_, &delayed->timer);
624
    // Timers may not guarantee queue ordering of events with the same delay
625
    // if the delay is non-zero. This should not be a problem in practice.
626
105
    uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
627
105
    uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
628
105
    uv_handle_count_++;
629
630
105
    scheduled_delayed_tasks_.emplace_back(
631
105
        delayed.release(), [](DelayedTask* delayed) {
632
105
          uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
633
105
                   [](uv_handle_t* handle) {
634
70
                     std::unique_ptr<DelayedTask> task{
635
70
                         static_cast<DelayedTask*>(handle->data)};
636
70
                     task->platform_data->DecreaseHandleCount();
637
70
                   });
638
105
        });
639
105
  }
640
641
758k
  TaskQueue<TaskQueueEntry>::PriorityQueue tasks;
642
758k
  {
643
758k
    auto locked = foreground_tasks_.Lock();
644
758k
    tasks = locked.PopAll();
645
758k
  }
646
647
1.49M
  while (!tasks.empty()) {
648
    // We have to use const_cast because std::priority_queue::top() does not
649
    // return a movable item.
650
737k
    std::unique_ptr<TaskQueueEntry> entry =
651
737k
        std::move(const_cast<std::unique_ptr<TaskQueueEntry>&>(tasks.top()));
652
737k
    tasks.pop();
653
737k
    did_work = true;
654
737k
    RunForegroundTask(std::move(entry->task));
655
737k
  }
656
657
758k
  return did_work;
658
758k
}
659
660
void NodePlatform::PostTaskOnWorkerThreadImpl(
661
    v8::TaskPriority priority,
662
    std::unique_ptr<v8::Task> task,
663
10.3k
    const v8::SourceLocation& location) {
664
10.3k
  if (debug_log_level_ != PlatformDebugLogLevel::kNone) {
665
0
    fprintf(stderr,
666
0
            "\nNodePlatform::PostTaskOnWorkerThreadImpl %s %p",
667
0
            GetTaskPriorityName(priority),
668
0
            task.get());
669
0
    PrintSourceLocation(location);
670
0
    if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) {
671
0
      DumpNativeBacktrace(stderr);
672
0
    }
673
0
    fflush(stderr);
674
0
  }
675
10.3k
  worker_thread_task_runner_->PostTask(priority, std::move(task), location);
676
10.3k
}
677
678
void NodePlatform::PostDelayedTaskOnWorkerThreadImpl(
679
    v8::TaskPriority priority,
680
    std::unique_ptr<v8::Task> task,
681
    double delay_in_seconds,
682
0
    const v8::SourceLocation& location) {
683
0
  if (debug_log_level_ != PlatformDebugLogLevel::kNone) {
684
0
    fprintf(stderr,
685
0
            "\nNodePlatform::PostDelayedTaskOnWorkerThreadImpl %s %p %f",
686
0
            GetTaskPriorityName(priority),
687
0
            task.get(),
688
0
            delay_in_seconds);
689
0
    PrintSourceLocation(location);
690
0
    if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) {
691
0
      DumpNativeBacktrace(stderr);
692
0
    }
693
0
    fflush(stderr);
694
0
  }
695
0
  worker_thread_task_runner_->PostDelayedTask(
696
0
      priority, std::move(task), location, delay_in_seconds);
697
0
}
698
699
2.19k
IsolatePlatformDelegate* NodePlatform::ForIsolate(Isolate* isolate) {
700
2.19k
  Mutex::ScopedLock lock(per_isolate_mutex_);
701
2.19k
  auto data = per_isolate_[isolate];
702
2.19k
  CHECK_NOT_NULL(data.first);
703
2.19k
  return data.first;
704
2.19k
}
705
706
std::shared_ptr<PerIsolatePlatformData>
707
22.1k
NodePlatform::ForNodeIsolate(Isolate* isolate) {
708
22.1k
  Mutex::ScopedLock lock(per_isolate_mutex_);
709
22.1k
  auto data = per_isolate_[isolate];
710
22.1k
  CHECK_NOT_NULL(data.first);
711
22.1k
  return data.second;
712
22.1k
}
713
714
0
bool NodePlatform::FlushForegroundTasks(Isolate* isolate) {
715
0
  std::shared_ptr<PerIsolatePlatformData> per_isolate = ForNodeIsolate(isolate);
716
0
  if (!per_isolate) return false;
717
0
  return per_isolate->FlushForegroundTasksInternal();
718
0
}
719
720
std::unique_ptr<v8::JobHandle> NodePlatform::CreateJobImpl(
721
    v8::TaskPriority priority,
722
    std::unique_ptr<v8::JobTask> job_task,
723
3.21k
    const v8::SourceLocation& location) {
724
3.21k
  if (debug_log_level_ != PlatformDebugLogLevel::kNone) {
725
0
    fprintf(stderr,
726
0
            "\nNodePlatform::CreateJobImpl %s %p",
727
0
            GetTaskPriorityName(priority),
728
0
            job_task.get());
729
0
    PrintSourceLocation(location);
730
0
    if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) {
731
0
      DumpNativeBacktrace(stderr);
732
0
    }
733
0
    fflush(stderr);
734
0
  }
735
3.21k
  return v8::platform::NewDefaultJobHandle(
736
3.21k
      this, priority, std::move(job_task), NumberOfWorkerThreads());
737
3.21k
}
738
739
0
bool NodePlatform::IdleTasksEnabled(Isolate* isolate) {
740
0
  return ForIsolate(isolate)->IdleTasksEnabled();
741
0
}
742
743
std::shared_ptr<v8::TaskRunner> NodePlatform::GetForegroundTaskRunner(
744
2.19k
    Isolate* isolate, v8::TaskPriority priority) {
745
2.19k
  return ForIsolate(isolate)->GetForegroundTaskRunner();
746
2.19k
}
747
748
13.0k
double NodePlatform::MonotonicallyIncreasingTime() {
749
  // Convert nanos to seconds.
750
13.0k
  return uv_hrtime() / 1e9;
751
13.0k
}
752
753
7
double NodePlatform::CurrentClockTimeMillis() {
754
7
  return SystemClockTimeMillis();
755
7
}
756
757
2.66k
v8::TracingController* NodePlatform::GetTracingController() {
758
2.66k
  CHECK_NOT_NULL(tracing_controller_);
759
2.66k
  return tracing_controller_;
760
2.66k
}
761
762
35
Platform::StackTracePrinter NodePlatform::GetStackTracePrinter() {
763
35
  return []() {
764
0
    fprintf(stderr, "\n");
765
0
    DumpNativeBacktrace(stderr);
766
0
    fflush(stderr);
767
0
  };
768
35
}
769
770
105
v8::PageAllocator* NodePlatform::GetPageAllocator() {
771
105
  return page_allocator_;
772
105
}
773
774
template <class T>
775
TaskQueue<T>::TaskQueue()
776
140
    : lock_(),
777
140
      tasks_available_(),
778
140
      outstanding_tasks_drained_(),
779
140
      outstanding_tasks_(0),
780
140
      stopped_(false),
781
140
      task_queue_() {}
node::TaskQueue<node::TaskQueueEntry>::TaskQueue()
Line
Count
Source
776
70
    : lock_(),
777
70
      tasks_available_(),
778
70
      outstanding_tasks_drained_(),
779
70
      outstanding_tasks_(0),
780
70
      stopped_(false),
781
70
      task_queue_() {}
node::TaskQueue<v8::Task>::TaskQueue()
Line
Count
Source
776
35
    : lock_(),
777
35
      tasks_available_(),
778
35
      outstanding_tasks_drained_(),
779
35
      outstanding_tasks_(0),
780
35
      stopped_(false),
781
35
      task_queue_() {}
node::TaskQueue<node::DelayedTask>::TaskQueue()
Line
Count
Source
776
35
    : lock_(),
777
35
      tasks_available_(),
778
35
      outstanding_tasks_drained_(),
779
35
      outstanding_tasks_(0),
780
35
      stopped_(false),
781
35
      task_queue_() {}
782
783
template <class T>
784
TaskQueue<T>::Locked::Locked(TaskQueue* queue)
785
3.03M
    : queue_(queue), lock_(queue->lock_) {}
node::TaskQueue<v8::Task>::Locked::Locked(node::TaskQueue<v8::Task>*)
Line
Count
Source
785
70
    : queue_(queue), lock_(queue->lock_) {}
node::TaskQueue<node::TaskQueueEntry>::Locked::Locked(node::TaskQueue<node::TaskQueueEntry>*)
Line
Count
Source
785
2.27M
    : queue_(queue), lock_(queue->lock_) {}
node::TaskQueue<node::DelayedTask>::Locked::Locked(node::TaskQueue<node::DelayedTask>*)
Line
Count
Source
785
758k
    : queue_(queue), lock_(queue->lock_) {}
786
787
template <class T>
788
747k
void TaskQueue<T>::Locked::Push(std::unique_ptr<T> task, bool outstanding) {
789
747k
  if (outstanding) {
790
2.90k
    queue_->outstanding_tasks_++;
791
2.90k
  }
792
747k
  queue_->task_queue_.push(std::move(task));
793
747k
  queue_->tasks_available_.Signal(lock_);
794
747k
}
node::TaskQueue<v8::Task>::Locked::Push(std::__1::unique_ptr<v8::Task, std::__1::default_delete<v8::Task> >, bool)
Line
Count
Source
788
35
void TaskQueue<T>::Locked::Push(std::unique_ptr<T> task, bool outstanding) {
789
35
  if (outstanding) {
790
0
    queue_->outstanding_tasks_++;
791
0
  }
792
35
  queue_->task_queue_.push(std::move(task));
793
35
  queue_->tasks_available_.Signal(lock_);
794
35
}
node::TaskQueue<node::TaskQueueEntry>::Locked::Push(std::__1::unique_ptr<node::TaskQueueEntry, std::__1::default_delete<node::TaskQueueEntry> >, bool)
Line
Count
Source
788
747k
void TaskQueue<T>::Locked::Push(std::unique_ptr<T> task, bool outstanding) {
789
747k
  if (outstanding) {
790
2.90k
    queue_->outstanding_tasks_++;
791
2.90k
  }
792
747k
  queue_->task_queue_.push(std::move(task));
793
747k
  queue_->tasks_available_.Signal(lock_);
794
747k
}
node::TaskQueue<node::DelayedTask>::Locked::Push(std::__1::unique_ptr<node::DelayedTask, std::__1::default_delete<node::DelayedTask> >, bool)
Line
Count
Source
788
105
void TaskQueue<T>::Locked::Push(std::unique_ptr<T> task, bool outstanding) {
789
105
  if (outstanding) {
790
0
    queue_->outstanding_tasks_++;
791
0
  }
792
105
  queue_->task_queue_.push(std::move(task));
793
105
  queue_->tasks_available_.Signal(lock_);
794
105
}
795
796
template <class T>
797
std::unique_ptr<T> TaskQueue<T>::Locked::Pop() {
798
  if (queue_->task_queue_.empty()) {
799
    return std::unique_ptr<T>(nullptr);
800
  }
801
  std::unique_ptr<T> result = std::move(
802
      std::move(const_cast<std::unique_ptr<T>&>(queue_->task_queue_.top())));
803
  queue_->task_queue_.pop();
804
  return result;
805
}
806
807
template <class T>
808
10.3k
std::unique_ptr<T> TaskQueue<T>::Locked::BlockingPop() {
809
17.4k
  while (queue_->task_queue_.empty() && !queue_->stopped_) {
810
7.04k
    queue_->tasks_available_.Wait(lock_);
811
7.04k
  }
812
10.3k
  if (queue_->stopped_) {
813
35
    return std::unique_ptr<T>(nullptr);
814
35
  }
815
10.3k
  std::unique_ptr<T> result = std::move(
816
10.3k
      std::move(const_cast<std::unique_ptr<T>&>(queue_->task_queue_.top())));
817
10.3k
  queue_->task_queue_.pop();
818
10.3k
  return result;
819
10.3k
}
820
821
template <class T>
822
2.90k
void TaskQueue<T>::Locked::NotifyOfOutstandingCompletion() {
823
2.90k
  if (--queue_->outstanding_tasks_ == 0) {
824
1.81k
    queue_->outstanding_tasks_drained_.Broadcast(lock_);
825
1.81k
  }
826
2.90k
}
827
828
template <class T>
829
758k
void TaskQueue<T>::Locked::BlockingDrain() {
830
758k
  while (queue_->outstanding_tasks_ > 0) {
831
28
    queue_->outstanding_tasks_drained_.Wait(lock_);
832
28
  }
833
758k
}
834
835
template <class T>
836
35
void TaskQueue<T>::Locked::Stop() {
837
35
  queue_->stopped_ = true;
838
35
  queue_->tasks_available_.Broadcast(lock_);
839
35
}
840
841
template <class T>
842
1.51M
TaskQueue<T>::PriorityQueue TaskQueue<T>::Locked::PopAll() {
843
1.51M
  TaskQueue<T>::PriorityQueue result;
844
1.51M
  result.swap(queue_->task_queue_);
845
1.51M
  return result;
846
1.51M
}
node::TaskQueue<v8::Task>::Locked::PopAll()
Line
Count
Source
842
35
TaskQueue<T>::PriorityQueue TaskQueue<T>::Locked::PopAll() {
843
35
  TaskQueue<T>::PriorityQueue result;
844
35
  result.swap(queue_->task_queue_);
845
35
  return result;
846
35
}
node::TaskQueue<node::DelayedTask>::Locked::PopAll()
Line
Count
Source
842
758k
TaskQueue<T>::PriorityQueue TaskQueue<T>::Locked::PopAll() {
843
758k
  TaskQueue<T>::PriorityQueue result;
844
758k
  result.swap(queue_->task_queue_);
845
758k
  return result;
846
758k
}
node::TaskQueue<node::TaskQueueEntry>::Locked::PopAll()
Line
Count
Source
842
758k
TaskQueue<T>::PriorityQueue TaskQueue<T>::Locked::PopAll() {
843
758k
  TaskQueue<T>::PriorityQueue result;
844
758k
  result.swap(queue_->task_queue_);
845
758k
  return result;
846
758k
}
847
848
35
void MultiIsolatePlatform::DisposeIsolate(Isolate* isolate) {
849
  // The order of these calls is important. When the Isolate is disposed,
850
  // it may still post tasks to the platform, so it must still be registered
851
  // for the task runner to be found from the map. After the isolate is torn
852
  // down, we need to remove it from the map before we can free the address,
853
  // so that when another Isolate::Allocate() is called, that would not be
854
  // allocated to the same address and be registered on an existing map
855
  // entry.
856
  // Refs: https://github.com/nodejs/node/issues/30846
857
35
  isolate->Deinitialize();
858
35
  this->UnregisterIsolate(isolate);
859
35
  Isolate::Free(isolate);
860
35
}
861
862
}  // namespace node