Line data Source code
1 : #include "source/extensions/common/async_files/async_file_manager_thread_pool.h"
2 :
3 : #include <memory>
4 : #include <queue>
5 : #include <thread>
6 : #include <utility>
7 : #include <vector>
8 :
9 : #include "source/extensions/common/async_files/async_file_action.h"
10 : #include "source/extensions/common/async_files/async_file_context_thread_pool.h"
11 : #include "source/extensions/common/async_files/status_after_file_error.h"
12 :
13 : namespace Envoy {
14 : namespace Extensions {
15 : namespace Common {
16 : namespace AsyncFiles {
17 :
18 : namespace {
19 : // ThreadNextAction is per worker thread; if enqueue is called from a callback
20 : // the action goes directly into ThreadNextAction, otherwise it goes into the
21 : // queue and is eventually pulled out into ThreadNextAction by a worker thread.
22 : thread_local std::shared_ptr<AsyncFileAction> ThreadNextAction;
23 :
24 : // ThreadIsWorker is set to true for worker threads, and will be false
25 : // for all other threads.
26 : thread_local bool ThreadIsWorker = false;
27 : } // namespace
28 :
29 : AsyncFileManagerThreadPool::AsyncFileManagerThreadPool(
30 : const envoy::extensions::common::async_files::v3::AsyncFileManagerConfig& config,
31 : Api::OsSysCalls& posix)
32 2 : : posix_(posix) {
33 2 : if (!posix.supportsAllPosixFileOperations()) {
34 0 : throw EnvoyException("AsyncFileManagerThreadPool not supported");
35 0 : }
36 2 : unsigned int thread_pool_size = config.thread_pool().thread_count();
37 2 : if (thread_pool_size == 0) {
38 2 : thread_pool_size = std::thread::hardware_concurrency();
39 2 : }
40 2 : ENVOY_LOG(info, fmt::format("AsyncFileManagerThreadPool created with id '{}', with {} threads",
41 2 : config.id(), thread_pool_size));
42 2 : thread_pool_.reserve(thread_pool_size);
43 6 : while (thread_pool_.size() < thread_pool_size) {
44 4 : thread_pool_.emplace_back([this]() { worker(); });
45 4 : }
46 2 : }
47 :
48 2 : AsyncFileManagerThreadPool::~AsyncFileManagerThreadPool() ABSL_LOCKS_EXCLUDED(queue_mutex_) {
49 2 : {
50 2 : absl::MutexLock lock(&queue_mutex_);
51 2 : terminate_ = true;
52 2 : }
53 6 : while (!thread_pool_.empty()) {
54 4 : thread_pool_.back().join();
55 4 : thread_pool_.pop_back();
56 4 : }
57 2 : }
58 :
59 0 : std::string AsyncFileManagerThreadPool::describe() const {
60 0 : return absl::StrCat("thread_pool_size = ", thread_pool_.size());
61 0 : }
62 :
63 0 : std::function<void()> AsyncFileManagerThreadPool::enqueue(std::shared_ptr<AsyncFileAction> action) {
64 0 : auto cancel_func = [action]() { action->cancel(); };
65 : // If an action is being enqueued from within a callback, we don't have to actually queue it,
66 : // we can just set it as the thread's next action - this acts to chain the actions without
67 : // yielding to another file.
68 0 : if (ThreadIsWorker) {
69 0 : ASSERT(!ThreadNextAction); // only do one file action per callback.
70 0 : ThreadNextAction = std::move(action);
71 0 : return cancel_func;
72 0 : }
73 0 : absl::MutexLock lock(&queue_mutex_);
74 0 : queue_.push(std::move(action));
75 0 : return cancel_func;
76 0 : }
77 :
78 4 : void AsyncFileManagerThreadPool::worker() {
79 4 : ThreadIsWorker = true;
80 4 : while (true) {
81 14 : const auto condition = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(queue_mutex_) {
82 14 : return !queue_.empty() || terminate_;
83 14 : };
84 4 : {
85 4 : absl::MutexLock lock(&queue_mutex_);
86 4 : queue_mutex_.Await(absl::Condition(&condition));
87 4 : if (terminate_) {
88 4 : return;
89 4 : }
90 0 : ThreadNextAction = std::move(queue_.front());
91 0 : queue_.pop();
92 0 : }
93 0 : resolveActions();
94 0 : }
95 4 : }
96 :
97 0 : void AsyncFileManagerThreadPool::resolveActions() {
98 0 : while (ThreadNextAction) {
99 : // Move the action out of ThreadNextAction so that its callback can enqueue
100 : // a different ThreadNextAction without self-destructing.
101 0 : std::shared_ptr<AsyncFileAction> action = std::move(ThreadNextAction);
102 0 : action->execute();
103 0 : }
104 0 : }
105 :
106 : namespace {
107 :
108 : class ActionWithFileResult : public AsyncFileActionWithResult<absl::StatusOr<AsyncFileHandle>> {
109 : public:
110 : ActionWithFileResult(AsyncFileManagerThreadPool& manager,
111 : std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete)
112 0 : : AsyncFileActionWithResult(on_complete), manager_(manager) {}
113 :
114 : protected:
115 0 : void onCancelledBeforeCallback(absl::StatusOr<AsyncFileHandle> result) override {
116 0 : if (result.ok()) {
117 0 : result.value()->close([](absl::Status) {}).IgnoreError();
118 0 : }
119 0 : }
120 : AsyncFileManagerThreadPool& manager_;
121 0 : Api::OsSysCalls& posix() { return manager_.posix(); }
122 : };
123 :
124 : class ActionCreateAnonymousFile : public ActionWithFileResult {
125 : public:
126 : ActionCreateAnonymousFile(AsyncFileManagerThreadPool& manager, absl::string_view path,
127 : std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete)
128 0 : : ActionWithFileResult(manager, on_complete), path_(path) {}
129 :
130 0 : absl::StatusOr<AsyncFileHandle> executeImpl() override {
131 0 : Api::SysCallIntResult open_result;
132 0 : #ifdef O_TMPFILE
133 0 : bool was_successful_first_call = false;
134 0 : std::call_once(manager_.once_flag_, [this, &was_successful_first_call, &open_result]() {
135 0 : open_result = posix().open(path_.c_str(), O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR);
136 0 : was_successful_first_call = manager_.supports_o_tmpfile_ = (open_result.return_value_ != -1);
137 0 : });
138 0 : if (manager_.supports_o_tmpfile_) {
139 0 : if (was_successful_first_call) {
140 : // This was the thread doing the very first open(O_TMPFILE), and it worked, so no need to do
141 : // anything else.
142 0 : return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
143 0 : }
144 : // This was any other thread, but O_TMPFILE proved it worked, so we can do it again.
145 0 : open_result = posix().open(path_.c_str(), O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR);
146 0 : if (open_result.return_value_ == -1) {
147 0 : return statusAfterFileError(open_result);
148 0 : }
149 0 : return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
150 0 : }
151 0 : #endif // O_TMPFILE
152 : // If O_TMPFILE didn't work, fall back to creating a named file and unlinking it.
153 : // Use a fixed-size buffer because we're going to be using C file functions anyway, and it saves
154 : // a heap allocation.
155 0 : char filename[4096];
156 0 : static const char file_suffix[] = "/buffer.XXXXXX";
157 0 : if (path_.size() + sizeof(file_suffix) > sizeof(filename)) {
158 0 : return absl::InvalidArgumentError(
159 0 : "AsyncFileManagerThreadPool::createAnonymousFile: pathname too long for tmpfile");
160 0 : }
161 : // Using C-style functions here because `mkstemp` requires a writable
162 : // char buffer, and we can't give it that with C++ strings.
163 0 : snprintf(filename, sizeof(filename), "%s%s", path_.c_str(), file_suffix);
164 0 : open_result = posix().mkstemp(filename);
165 0 : if (open_result.return_value_ == -1) {
166 0 : return statusAfterFileError(open_result);
167 0 : }
168 0 : if (posix().unlink(filename).return_value_ != 0) {
169 : // Most likely the problem here is we can't unlink a file while it's open - since that's a
170 : // prerequisite of the desired behavior of this function, and we don't want to accidentally
171 : // fill a disk with named tmp files, if this happens we close the file, unlink it, and report
172 : // an error.
173 0 : posix().close(open_result.return_value_);
174 0 : posix().unlink(filename);
175 0 : return absl::UnimplementedError(
176 0 : "AsyncFileManagerThreadPool::createAnonymousFile: not supported for "
177 0 : "target filesystem (failed to unlink an open file)");
178 0 : }
179 0 : return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
180 0 : }
181 :
182 : private:
183 : const std::string path_;
184 : };
185 :
186 : class ActionOpenExistingFile : public ActionWithFileResult {
187 : public:
188 : ActionOpenExistingFile(AsyncFileManagerThreadPool& manager, absl::string_view filename,
189 : AsyncFileManager::Mode mode,
190 : std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete)
191 0 : : ActionWithFileResult(manager, on_complete), filename_(filename), mode_(mode) {}
192 :
193 0 : absl::StatusOr<AsyncFileHandle> executeImpl() override {
194 0 : auto open_result = posix().open(filename_.c_str(), openFlags());
195 0 : if (open_result.return_value_ == -1) {
196 0 : return statusAfterFileError(open_result);
197 0 : }
198 0 : return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
199 0 : }
200 :
201 : private:
202 0 : int openFlags() const {
203 0 : switch (mode_) {
204 0 : case AsyncFileManager::Mode::ReadOnly:
205 0 : return O_RDONLY;
206 0 : case AsyncFileManager::Mode::WriteOnly:
207 0 : return O_WRONLY;
208 0 : case AsyncFileManager::Mode::ReadWrite:
209 0 : return O_RDWR;
210 0 : }
211 0 : PANIC_DUE_TO_CORRUPT_ENUM;
212 0 : }
213 : const std::string filename_;
214 : const AsyncFileManager::Mode mode_;
215 : };
216 :
217 : class ActionStat : public AsyncFileActionWithResult<absl::StatusOr<struct stat>> {
218 : public:
219 : ActionStat(Api::OsSysCalls& posix, absl::string_view filename,
220 : std::function<void(absl::StatusOr<struct stat>)> on_complete)
221 0 : : AsyncFileActionWithResult(on_complete), posix_(posix), filename_(filename) {}
222 :
223 0 : absl::StatusOr<struct stat> executeImpl() override {
224 0 : struct stat ret;
225 0 : Api::SysCallIntResult stat_result = posix_.stat(filename_.c_str(), &ret);
226 0 : if (stat_result.return_value_ == -1) {
227 0 : return statusAfterFileError(stat_result);
228 0 : }
229 0 : return ret;
230 0 : }
231 :
232 : private:
233 : Api::OsSysCalls& posix_;
234 : const std::string filename_;
235 : };
236 :
237 : class ActionUnlink : public AsyncFileActionWithResult<absl::Status> {
238 : public:
239 : ActionUnlink(Api::OsSysCalls& posix, absl::string_view filename,
240 : std::function<void(absl::Status)> on_complete)
241 0 : : AsyncFileActionWithResult(on_complete), posix_(posix), filename_(filename) {}
242 :
243 0 : absl::Status executeImpl() override {
244 0 : Api::SysCallIntResult unlink_result = posix_.unlink(filename_.c_str());
245 0 : if (unlink_result.return_value_ == -1) {
246 0 : return statusAfterFileError(unlink_result);
247 0 : }
248 0 : return absl::OkStatus();
249 0 : }
250 :
251 : private:
252 : Api::OsSysCalls& posix_;
253 : const std::string filename_;
254 : };
255 :
256 : } // namespace
257 :
258 : CancelFunction AsyncFileManagerThreadPool::createAnonymousFile(
259 0 : absl::string_view path, std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete) {
260 0 : return enqueue(std::make_shared<ActionCreateAnonymousFile>(*this, path, on_complete));
261 0 : }
262 :
263 : CancelFunction AsyncFileManagerThreadPool::openExistingFile(
264 : absl::string_view filename, Mode mode,
265 0 : std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete) {
266 0 : return enqueue(std::make_shared<ActionOpenExistingFile>(*this, filename, mode, on_complete));
267 0 : }
268 :
269 : CancelFunction
270 : AsyncFileManagerThreadPool::stat(absl::string_view filename,
271 0 : std::function<void(absl::StatusOr<struct stat>)> on_complete) {
272 0 : return enqueue(std::make_shared<ActionStat>(posix(), filename, on_complete));
273 0 : }
274 :
275 : CancelFunction AsyncFileManagerThreadPool::unlink(absl::string_view filename,
276 0 : std::function<void(absl::Status)> on_complete) {
277 0 : return enqueue(std::make_shared<ActionUnlink>(posix(), filename, on_complete));
278 0 : }
279 :
280 : } // namespace AsyncFiles
281 : } // namespace Common
282 : } // namespace Extensions
283 : } // namespace Envoy
|