LCOV - code coverage report
Current view: top level - source/extensions/common/async_files - async_file_manager_thread_pool.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 36 158 22.8 %
Date: 2024-01-05 06:35:25 Functions: 5 27 18.5 %

          Line data    Source code
       1             : #include "source/extensions/common/async_files/async_file_manager_thread_pool.h"
       2             : 
       3             : #include <memory>
       4             : #include <queue>
       5             : #include <thread>
       6             : #include <utility>
       7             : #include <vector>
       8             : 
       9             : #include "source/extensions/common/async_files/async_file_action.h"
      10             : #include "source/extensions/common/async_files/async_file_context_thread_pool.h"
      11             : #include "source/extensions/common/async_files/status_after_file_error.h"
      12             : 
      13             : namespace Envoy {
      14             : namespace Extensions {
      15             : namespace Common {
      16             : namespace AsyncFiles {
      17             : 
      18             : namespace {
      19             : // ThreadNextAction is per worker thread; if enqueue is called from a callback
      20             : // the action goes directly into ThreadNextAction, otherwise it goes into the
      21             : // queue and is eventually pulled out into ThreadNextAction by a worker thread.
      22             : thread_local std::shared_ptr<AsyncFileAction> ThreadNextAction;
      23             : 
      24             : // ThreadIsWorker is set to true for worker threads, and will be false
      25             : // for all other threads.
      26             : thread_local bool ThreadIsWorker = false;
      27             : } // namespace
      28             : 
      29             : AsyncFileManagerThreadPool::AsyncFileManagerThreadPool(
      30             :     const envoy::extensions::common::async_files::v3::AsyncFileManagerConfig& config,
      31             :     Api::OsSysCalls& posix)
      32           2 :     : posix_(posix) {
      33           2 :   if (!posix.supportsAllPosixFileOperations()) {
      34           0 :     throw EnvoyException("AsyncFileManagerThreadPool not supported");
      35           0 :   }
      36           2 :   unsigned int thread_pool_size = config.thread_pool().thread_count();
      37           2 :   if (thread_pool_size == 0) {
      38           2 :     thread_pool_size = std::thread::hardware_concurrency();
      39           2 :   }
      40           2 :   ENVOY_LOG(info, fmt::format("AsyncFileManagerThreadPool created with id '{}', with {} threads",
      41           2 :                               config.id(), thread_pool_size));
      42           2 :   thread_pool_.reserve(thread_pool_size);
      43           6 :   while (thread_pool_.size() < thread_pool_size) {
      44           4 :     thread_pool_.emplace_back([this]() { worker(); });
      45           4 :   }
      46           2 : }
      47             : 
      48           2 : AsyncFileManagerThreadPool::~AsyncFileManagerThreadPool() ABSL_LOCKS_EXCLUDED(queue_mutex_) {
      49           2 :   {
      50           2 :     absl::MutexLock lock(&queue_mutex_);
      51           2 :     terminate_ = true;
      52           2 :   }
      53           6 :   while (!thread_pool_.empty()) {
      54           4 :     thread_pool_.back().join();
      55           4 :     thread_pool_.pop_back();
      56           4 :   }
      57           2 : }
      58             : 
      59           0 : std::string AsyncFileManagerThreadPool::describe() const {
      60           0 :   return absl::StrCat("thread_pool_size = ", thread_pool_.size());
      61           0 : }
      62             : 
      63           0 : std::function<void()> AsyncFileManagerThreadPool::enqueue(std::shared_ptr<AsyncFileAction> action) {
      64           0 :   auto cancel_func = [action]() { action->cancel(); };
      65             :   // If an action is being enqueued from within a callback, we don't have to actually queue it,
      66             :   // we can just set it as the thread's next action - this acts to chain the actions without
      67             :   // yielding to another file.
      68           0 :   if (ThreadIsWorker) {
      69           0 :     ASSERT(!ThreadNextAction); // only do one file action per callback.
      70           0 :     ThreadNextAction = std::move(action);
      71           0 :     return cancel_func;
      72           0 :   }
      73           0 :   absl::MutexLock lock(&queue_mutex_);
      74           0 :   queue_.push(std::move(action));
      75           0 :   return cancel_func;
      76           0 : }
      77             : 
      78           4 : void AsyncFileManagerThreadPool::worker() {
      79           4 :   ThreadIsWorker = true;
      80           4 :   while (true) {
      81          14 :     const auto condition = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(queue_mutex_) {
      82          14 :       return !queue_.empty() || terminate_;
      83          14 :     };
      84           4 :     {
      85           4 :       absl::MutexLock lock(&queue_mutex_);
      86           4 :       queue_mutex_.Await(absl::Condition(&condition));
      87           4 :       if (terminate_) {
      88           4 :         return;
      89           4 :       }
      90           0 :       ThreadNextAction = std::move(queue_.front());
      91           0 :       queue_.pop();
      92           0 :     }
      93           0 :     resolveActions();
      94           0 :   }
      95           4 : }
      96             : 
      97           0 : void AsyncFileManagerThreadPool::resolveActions() {
      98           0 :   while (ThreadNextAction) {
      99             :     // Move the action out of ThreadNextAction so that its callback can enqueue
     100             :     // a different ThreadNextAction without self-destructing.
     101           0 :     std::shared_ptr<AsyncFileAction> action = std::move(ThreadNextAction);
     102           0 :     action->execute();
     103           0 :   }
     104           0 : }
     105             : 
     106             : namespace {
     107             : 
     108             : class ActionWithFileResult : public AsyncFileActionWithResult<absl::StatusOr<AsyncFileHandle>> {
     109             : public:
     110             :   ActionWithFileResult(AsyncFileManagerThreadPool& manager,
     111             :                        std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete)
     112           0 :       : AsyncFileActionWithResult(on_complete), manager_(manager) {}
     113             : 
     114             : protected:
     115           0 :   void onCancelledBeforeCallback(absl::StatusOr<AsyncFileHandle> result) override {
     116           0 :     if (result.ok()) {
     117           0 :       result.value()->close([](absl::Status) {}).IgnoreError();
     118           0 :     }
     119           0 :   }
     120             :   AsyncFileManagerThreadPool& manager_;
     121           0 :   Api::OsSysCalls& posix() { return manager_.posix(); }
     122             : };
     123             : 
     124             : class ActionCreateAnonymousFile : public ActionWithFileResult {
     125             : public:
     126             :   ActionCreateAnonymousFile(AsyncFileManagerThreadPool& manager, absl::string_view path,
     127             :                             std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete)
     128           0 :       : ActionWithFileResult(manager, on_complete), path_(path) {}
     129             : 
     130           0 :   absl::StatusOr<AsyncFileHandle> executeImpl() override {
     131           0 :     Api::SysCallIntResult open_result;
     132           0 : #ifdef O_TMPFILE
     133           0 :     bool was_successful_first_call = false;
     134           0 :     std::call_once(manager_.once_flag_, [this, &was_successful_first_call, &open_result]() {
     135           0 :       open_result = posix().open(path_.c_str(), O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR);
     136           0 :       was_successful_first_call = manager_.supports_o_tmpfile_ = (open_result.return_value_ != -1);
     137           0 :     });
     138           0 :     if (manager_.supports_o_tmpfile_) {
     139           0 :       if (was_successful_first_call) {
     140             :         // This was the thread doing the very first open(O_TMPFILE), and it worked, so no need to do
     141             :         // anything else.
     142           0 :         return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
     143           0 :       }
     144             :       // This was any other thread, but O_TMPFILE proved it worked, so we can do it again.
     145           0 :       open_result = posix().open(path_.c_str(), O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR);
     146           0 :       if (open_result.return_value_ == -1) {
     147           0 :         return statusAfterFileError(open_result);
     148           0 :       }
     149           0 :       return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
     150           0 :     }
     151           0 : #endif // O_TMPFILE
     152             :     // If O_TMPFILE didn't work, fall back to creating a named file and unlinking it.
     153             :     // Use a fixed-size buffer because we're going to be using C file functions anyway, and it saves
     154             :     // a heap allocation.
     155           0 :     char filename[4096];
     156           0 :     static const char file_suffix[] = "/buffer.XXXXXX";
     157           0 :     if (path_.size() + sizeof(file_suffix) > sizeof(filename)) {
     158           0 :       return absl::InvalidArgumentError(
     159           0 :           "AsyncFileManagerThreadPool::createAnonymousFile: pathname too long for tmpfile");
     160           0 :     }
     161             :     // Using C-style functions here because `mkstemp` requires a writable
     162             :     // char buffer, and we can't give it that with C++ strings.
     163           0 :     snprintf(filename, sizeof(filename), "%s%s", path_.c_str(), file_suffix);
     164           0 :     open_result = posix().mkstemp(filename);
     165           0 :     if (open_result.return_value_ == -1) {
     166           0 :       return statusAfterFileError(open_result);
     167           0 :     }
     168           0 :     if (posix().unlink(filename).return_value_ != 0) {
     169             :       // Most likely the problem here is we can't unlink a file while it's open - since that's a
     170             :       // prerequisite of the desired behavior of this function, and we don't want to accidentally
     171             :       // fill a disk with named tmp files, if this happens we close the file, unlink it, and report
     172             :       // an error.
     173           0 :       posix().close(open_result.return_value_);
     174           0 :       posix().unlink(filename);
     175           0 :       return absl::UnimplementedError(
     176           0 :           "AsyncFileManagerThreadPool::createAnonymousFile: not supported for "
     177           0 :           "target filesystem (failed to unlink an open file)");
     178           0 :     }
     179           0 :     return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
     180           0 :   }
     181             : 
     182             : private:
     183             :   const std::string path_;
     184             : };
     185             : 
     186             : class ActionOpenExistingFile : public ActionWithFileResult {
     187             : public:
     188             :   ActionOpenExistingFile(AsyncFileManagerThreadPool& manager, absl::string_view filename,
     189             :                          AsyncFileManager::Mode mode,
     190             :                          std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete)
     191           0 :       : ActionWithFileResult(manager, on_complete), filename_(filename), mode_(mode) {}
     192             : 
     193           0 :   absl::StatusOr<AsyncFileHandle> executeImpl() override {
     194           0 :     auto open_result = posix().open(filename_.c_str(), openFlags());
     195           0 :     if (open_result.return_value_ == -1) {
     196           0 :       return statusAfterFileError(open_result);
     197           0 :     }
     198           0 :     return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
     199           0 :   }
     200             : 
     201             : private:
     202           0 :   int openFlags() const {
     203           0 :     switch (mode_) {
     204           0 :     case AsyncFileManager::Mode::ReadOnly:
     205           0 :       return O_RDONLY;
     206           0 :     case AsyncFileManager::Mode::WriteOnly:
     207           0 :       return O_WRONLY;
     208           0 :     case AsyncFileManager::Mode::ReadWrite:
     209           0 :       return O_RDWR;
     210           0 :     }
     211           0 :     PANIC_DUE_TO_CORRUPT_ENUM;
     212           0 :   }
     213             :   const std::string filename_;
     214             :   const AsyncFileManager::Mode mode_;
     215             : };
     216             : 
     217             : class ActionStat : public AsyncFileActionWithResult<absl::StatusOr<struct stat>> {
     218             : public:
     219             :   ActionStat(Api::OsSysCalls& posix, absl::string_view filename,
     220             :              std::function<void(absl::StatusOr<struct stat>)> on_complete)
     221           0 :       : AsyncFileActionWithResult(on_complete), posix_(posix), filename_(filename) {}
     222             : 
     223           0 :   absl::StatusOr<struct stat> executeImpl() override {
     224           0 :     struct stat ret;
     225           0 :     Api::SysCallIntResult stat_result = posix_.stat(filename_.c_str(), &ret);
     226           0 :     if (stat_result.return_value_ == -1) {
     227           0 :       return statusAfterFileError(stat_result);
     228           0 :     }
     229           0 :     return ret;
     230           0 :   }
     231             : 
     232             : private:
     233             :   Api::OsSysCalls& posix_;
     234             :   const std::string filename_;
     235             : };
     236             : 
     237             : class ActionUnlink : public AsyncFileActionWithResult<absl::Status> {
     238             : public:
     239             :   ActionUnlink(Api::OsSysCalls& posix, absl::string_view filename,
     240             :                std::function<void(absl::Status)> on_complete)
     241           0 :       : AsyncFileActionWithResult(on_complete), posix_(posix), filename_(filename) {}
     242             : 
     243           0 :   absl::Status executeImpl() override {
     244           0 :     Api::SysCallIntResult unlink_result = posix_.unlink(filename_.c_str());
     245           0 :     if (unlink_result.return_value_ == -1) {
     246           0 :       return statusAfterFileError(unlink_result);
     247           0 :     }
     248           0 :     return absl::OkStatus();
     249           0 :   }
     250             : 
     251             : private:
     252             :   Api::OsSysCalls& posix_;
     253             :   const std::string filename_;
     254             : };
     255             : 
     256             : } // namespace
     257             : 
     258             : CancelFunction AsyncFileManagerThreadPool::createAnonymousFile(
     259           0 :     absl::string_view path, std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete) {
     260           0 :   return enqueue(std::make_shared<ActionCreateAnonymousFile>(*this, path, on_complete));
     261           0 : }
     262             : 
     263             : CancelFunction AsyncFileManagerThreadPool::openExistingFile(
     264             :     absl::string_view filename, Mode mode,
     265           0 :     std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete) {
     266           0 :   return enqueue(std::make_shared<ActionOpenExistingFile>(*this, filename, mode, on_complete));
     267           0 : }
     268             : 
     269             : CancelFunction
     270             : AsyncFileManagerThreadPool::stat(absl::string_view filename,
     271           0 :                                  std::function<void(absl::StatusOr<struct stat>)> on_complete) {
     272           0 :   return enqueue(std::make_shared<ActionStat>(posix(), filename, on_complete));
     273           0 : }
     274             : 
     275             : CancelFunction AsyncFileManagerThreadPool::unlink(absl::string_view filename,
     276           0 :                                                   std::function<void(absl::Status)> on_complete) {
     277           0 :   return enqueue(std::make_shared<ActionUnlink>(posix(), filename, on_complete));
     278           0 : }
     279             : 
     280             : } // namespace AsyncFiles
     281             : } // namespace Common
     282             : } // namespace Extensions
     283             : } // namespace Envoy

Generated by: LCOV version 1.15