1
#include "source/extensions/common/async_files/async_file_manager_thread_pool.h"
2

            
3
#include <memory>
4
#include <queue>
5
#include <thread>
6
#include <utility>
7
#include <vector>
8

            
9
#include "source/extensions/common/async_files/async_file_action.h"
10
#include "source/extensions/common/async_files/async_file_context_thread_pool.h"
11
#include "source/extensions/common/async_files/status_after_file_error.h"
12

            
13
namespace Envoy {
14
namespace Extensions {
15
namespace Common {
16
namespace AsyncFiles {
17

            
18
AsyncFileManagerThreadPool::AsyncFileManagerThreadPool(
19
    const envoy::extensions::common::async_files::v3::AsyncFileManagerConfig& config,
20
    Api::OsSysCalls& posix)
21
176
    : posix_(posix) {
22
176
  if (!posix.supportsAllPosixFileOperations()) {
23
1
    throw EnvoyException("AsyncFileManagerThreadPool not supported");
24
1
  }
25
175
  unsigned int thread_pool_size = config.thread_pool().thread_count();
26
175
  if (thread_pool_size == 0) {
27
1
    thread_pool_size = std::thread::hardware_concurrency();
28
1
  }
29
175
  ENVOY_LOG(info, fmt::format("AsyncFileManagerThreadPool created with id '{}', with {} threads",
30
175
                              config.id(), thread_pool_size));
31
175
  thread_pool_.reserve(thread_pool_size);
32
358
  while (thread_pool_.size() < thread_pool_size) {
33
183
    thread_pool_.emplace_back([this]() { worker(); });
34
183
  }
35
175
}
36

            
37
175
AsyncFileManagerThreadPool::~AsyncFileManagerThreadPool() ABSL_LOCKS_EXCLUDED(queue_mutex_) {
38
175
  {
39
175
    absl::MutexLock lock(queue_mutex_);
40
175
    terminate_ = true;
41
175
  }
42
  // This destructor will be blocked by this loop until all queued file actions are complete.
43
358
  while (!thread_pool_.empty()) {
44
183
    thread_pool_.back().join();
45
183
    thread_pool_.pop_back();
46
183
  }
47
175
}
48

            
49
6
std::string AsyncFileManagerThreadPool::describe() const {
50
6
  return absl::StrCat("thread_pool_size = ", thread_pool_.size());
51
6
}
52

            
53
1881
void AsyncFileManagerThreadPool::waitForIdle() {
54
2618
  const auto condition = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(queue_mutex_) {
55
2618
    return active_workers_ == 0 && queue_.empty() && cleanup_queue_.empty();
56
2618
  };
57
1881
  absl::MutexLock lock(queue_mutex_);
58
1881
  queue_mutex_.Await(absl::Condition(&condition));
59
1881
}
60

            
61
absl::AnyInvocable<void()>
62
AsyncFileManagerThreadPool::enqueue(Event::Dispatcher* dispatcher,
63
4080
                                    std::unique_ptr<AsyncFileAction> action) {
64
4080
  QueuedAction entry{std::move(action), dispatcher};
65
4080
  auto cancel_func = [dispatcher, state = entry.state_]() {
66
24
    ASSERT(dispatcher == nullptr || dispatcher->isThreadSafe());
67
24
    state->store(QueuedAction::State::Cancelled);
68
24
  };
69
4080
  absl::MutexLock lock(queue_mutex_);
70
4080
  queue_.push(std::move(entry));
71
4080
  return cancel_func;
72
4080
}
73

            
74
void AsyncFileManagerThreadPool::postCancelledActionForCleanup(
75
2
    std::unique_ptr<AsyncFileAction> action) {
76
2
  absl::MutexLock lock(queue_mutex_);
77
2
  cleanup_queue_.push(std::move(action));
78
2
}
79

            
80
4080
void AsyncFileManagerThreadPool::executeAction(QueuedAction&& queued_action) {
81
4080
  using State = QueuedAction::State;
82
4080
  State expected = State::Queued;
83
4080
  std::shared_ptr<std::atomic<State>> state = std::move(queued_action.state_);
84
4080
  std::unique_ptr<AsyncFileAction> action = std::move(queued_action.action_);
85
4080
  if (!state->compare_exchange_strong(expected, State::Executing)) {
86
3
    ASSERT(expected == State::Cancelled);
87
3
    if (action->executesEvenIfCancelled()) {
88
1
      action->execute();
89
1
    }
90
3
    return;
91
3
  }
92
4077
  action->execute();
93
4077
  expected = State::Executing;
94
4077
  if (!state->compare_exchange_strong(expected, State::InCallback)) {
95
6
    ASSERT(expected == State::Cancelled);
96
6
    action->onCancelledBeforeCallback();
97
6
    return;
98
6
  }
99
4071
  if (queued_action.dispatcher_ == nullptr) {
100
    // No need to bother arranging the callback, because a dispatcher was not provided.
101
116
    return;
102
116
  }
103
  // If it is necessary to explicitly undo an action on cancel then the lambda will need a
104
  // pointer to this manager that is guaranteed to outlive the lambda, in order to be able
105
  // to perform that cancel operation on a thread belonging to the file manager.
106
  // So capture a shared_ptr if necessary, but, to avoid unnecessary shared_ptr wrangling,
107
  // leave it empty if the action doesn't have an associated cancel operation.
108
3955
  std::shared_ptr<AsyncFileManagerThreadPool> manager;
109
3955
  if (action->hasActionIfCancelledBeforeCallback()) {
110
238
    manager = shared_from_this();
111
238
  }
112
3955
  queued_action.dispatcher_->post([manager = std::move(manager), action = std::move(action),
113
3955
                                   state = std::move(state)]() mutable {
114
    // This callback runs on the caller's thread.
115
3954
    State expected = State::InCallback;
116
3954
    if (state->compare_exchange_strong(expected, State::Done)) {
117
      // Action was not cancelled; run the captured callback on the caller's thread.
118
3950
      action->onComplete();
119
3950
      return;
120
3950
    }
121
4
    ASSERT(expected == State::Cancelled);
122
4
    if (manager == nullptr) {
123
      // Action had a "do nothing" cancellation so we don't need to post a cleanup action.
124
2
      return;
125
2
    }
126
    // If an action with side-effects was cancelled after being posted, its
127
    // side-effects need to be undone as the caller can no longer receive the
128
    // returned context. That undo action will need to be done on one of the
129
    // file manager's threads, as it is file related, so post it to the thread pool.
130
2
    manager->postCancelledActionForCleanup(std::move(action));
131
2
  });
132
3955
}
133

            
134
183
void AsyncFileManagerThreadPool::worker() {
135
14494
  const auto condition = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(queue_mutex_) {
136
14494
    return !queue_.empty() || !cleanup_queue_.empty() || terminate_;
137
14494
  };
138
183
  {
139
183
    absl::MutexLock lock(queue_mutex_);
140
183
    active_workers_++;
141
183
  }
142
4264
  while (true) {
143
4264
    QueuedAction action;
144
4264
    std::unique_ptr<AsyncFileAction> cleanup_action;
145
4264
    {
146
4264
      absl::MutexLock lock(queue_mutex_);
147
4264
      active_workers_--;
148
4264
      queue_mutex_.Await(absl::Condition(&condition));
149
4264
      if (terminate_ && queue_.empty() && cleanup_queue_.empty()) {
150
183
        return;
151
183
      }
152
4081
      active_workers_++;
153
4081
      if (!queue_.empty()) {
154
4080
        action = std::move(queue_.front());
155
4080
        queue_.pop();
156
4080
      }
157
4081
      if (!cleanup_queue_.empty()) {
158
2
        cleanup_action = std::move(cleanup_queue_.front());
159
2
        cleanup_queue_.pop();
160
2
      }
161
4081
    }
162
4081
    if (action.action_ != nullptr) {
163
4080
      executeAction(std::move(action));
164
4080
      action.action_ = nullptr;
165
4080
    }
166
4081
    if (cleanup_action != nullptr) {
167
2
      std::move(cleanup_action)->onCancelledBeforeCallback();
168
2
      cleanup_action = nullptr;
169
2
    }
170
4081
  }
171
183
}
172

            
173
namespace {
174

            
175
class ActionWithFileResult : public AsyncFileActionWithResult<absl::StatusOr<AsyncFileHandle>> {
176
public:
177
  ActionWithFileResult(AsyncFileManagerThreadPool& manager,
178
                       absl::AnyInvocable<void(absl::StatusOr<AsyncFileHandle>)> on_complete)
179
186
      : AsyncFileActionWithResult(std::move(on_complete)), manager_(manager) {}
180

            
181
protected:
182
3
  void onCancelledBeforeCallback() override {
183
3
    if (result_.value().ok()) {
184
3
      result_.value().value()->close(nullptr, [](absl::Status) {}).IgnoreError();
185
3
    }
186
3
  }
187
184
  bool hasActionIfCancelledBeforeCallback() const override { return true; }
188

            
189
  AsyncFileManagerThreadPool& manager_;
190
193
  Api::OsSysCalls& posix() { return manager_.posix(); }
191
};
192

            
193
class ActionCreateAnonymousFile : public ActionWithFileResult {
194
public:
195
  ActionCreateAnonymousFile(AsyncFileManagerThreadPool& manager, absl::string_view path,
196
                            absl::AnyInvocable<void(absl::StatusOr<AsyncFileHandle>)> on_complete)
197
78
      : ActionWithFileResult(manager, std::move(on_complete)), path_(path) {}
198

            
199
78
  absl::StatusOr<AsyncFileHandle> executeImpl() override {
200
78
    Api::SysCallIntResult open_result;
201
78
#ifdef O_TMPFILE
202
78
    bool was_successful_first_call = false;
203
78
    std::call_once(manager_.once_flag_, [this, &was_successful_first_call, &open_result]() {
204
58
      open_result = posix().open(path_.c_str(), O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR);
205
58
      was_successful_first_call = manager_.supports_o_tmpfile_ = (open_result.return_value_ != -1);
206
58
    });
207
78
    if (manager_.supports_o_tmpfile_) {
208
74
      if (was_successful_first_call) {
209
        // This was the thread doing the very first open(O_TMPFILE), and it worked, so no need to do
210
        // anything else.
211
54
        return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
212
54
      }
213
      // This was any other thread, but O_TMPFILE proved it worked, so we can do it again.
214
20
      open_result = posix().open(path_.c_str(), O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR);
215
20
      if (open_result.return_value_ == -1) {
216
1
        return statusAfterFileError(open_result);
217
1
      }
218
19
      return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
219
20
    }
220
4
#endif // O_TMPFILE
221
    // If O_TMPFILE didn't work, fall back to creating a named file and unlinking it.
222
    // Use a fixed-size buffer because we're going to be using C file functions anyway, and it saves
223
    // a heap allocation.
224
4
    char filename[4096];
225
4
    static const char file_suffix[] = "/buffer.XXXXXX";
226
4
    if (path_.size() + sizeof(file_suffix) > sizeof(filename)) {
227
1
      return absl::InvalidArgumentError(
228
1
          "AsyncFileManagerThreadPool::createAnonymousFile: pathname too long for tmpfile");
229
1
    }
230
    // Using C-style functions here because `mkstemp` requires a writable
231
    // char buffer, and we can't give it that with C++ strings.
232
3
    snprintf(filename, sizeof(filename), "%s%s", path_.c_str(), file_suffix);
233
3
    open_result = posix().mkstemp(filename);
234
3
    if (open_result.return_value_ == -1) {
235
1
      return statusAfterFileError(open_result);
236
1
    }
237
2
    if (posix().unlink(filename).return_value_ != 0) {
238
      // Most likely the problem here is we can't unlink a file while it's open - since that's a
239
      // prerequisite of the desired behavior of this function, and we don't want to accidentally
240
      // fill a disk with named tmp files, if this happens we close the file, unlink it, and report
241
      // an error.
242
1
      posix().close(open_result.return_value_);
243
1
      posix().unlink(filename);
244
1
      return absl::UnimplementedError(
245
1
          "AsyncFileManagerThreadPool::createAnonymousFile: not supported for "
246
1
          "target filesystem (failed to unlink an open file)");
247
1
    }
248
1
    return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
249
2
  }
250

            
251
private:
252
  const std::string path_;
253
};
254

            
255
class ActionOpenExistingFile : public ActionWithFileResult {
256
public:
257
  ActionOpenExistingFile(AsyncFileManagerThreadPool& manager, absl::string_view filename,
258
                         AsyncFileManager::Mode mode,
259
                         absl::AnyInvocable<void(absl::StatusOr<AsyncFileHandle>)> on_complete)
260
108
      : ActionWithFileResult(manager, std::move(on_complete)), filename_(filename), mode_(mode) {}
261

            
262
108
  absl::StatusOr<AsyncFileHandle> executeImpl() override {
263
108
    auto open_result = posix().open(filename_.c_str(), openFlags());
264
108
    if (open_result.return_value_ == -1) {
265
40
      return statusAfterFileError(open_result);
266
40
    }
267
68
    return std::make_shared<AsyncFileContextThreadPool>(manager_, open_result.return_value_);
268
108
  }
269

            
270
private:
271
108
  int openFlags() const {
272
108
    switch (mode_) {
273
100
    case AsyncFileManager::Mode::ReadOnly:
274
100
      return O_RDONLY;
275
2
    case AsyncFileManager::Mode::WriteOnly:
276
2
      return O_WRONLY;
277
6
    case AsyncFileManager::Mode::ReadWrite:
278
6
      return O_RDWR;
279
108
    }
280
    PANIC_DUE_TO_CORRUPT_ENUM;
281
  }
282
  const std::string filename_;
283
  const AsyncFileManager::Mode mode_;
284
};
285

            
286
class ActionStat : public AsyncFileActionWithResult<absl::StatusOr<struct stat>> {
287
public:
288
  ActionStat(Api::OsSysCalls& posix, absl::string_view filename,
289
             absl::AnyInvocable<void(absl::StatusOr<struct stat>)> on_complete)
290
36
      : AsyncFileActionWithResult(std::move(on_complete)), posix_(posix), filename_(filename) {}
291

            
292
36
  absl::StatusOr<struct stat> executeImpl() override {
293
36
    struct stat ret;
294
36
    Api::SysCallIntResult stat_result = posix_.stat(filename_.c_str(), &ret);
295
36
    if (stat_result.return_value_ == -1) {
296
20
      return statusAfterFileError(stat_result);
297
20
    }
298
16
    return ret;
299
36
  }
300

            
301
private:
302
  Api::OsSysCalls& posix_;
303
  const std::string filename_;
304
};
305

            
306
class ActionUnlink : public AsyncFileActionWithResult<absl::Status> {
307
public:
308
  ActionUnlink(Api::OsSysCalls& posix, absl::string_view filename,
309
               absl::AnyInvocable<void(absl::Status)> on_complete)
310
29
      : AsyncFileActionWithResult(std::move(on_complete)), posix_(posix), filename_(filename) {}
311

            
312
28
  absl::Status executeImpl() override {
313
28
    Api::SysCallIntResult unlink_result = posix_.unlink(filename_.c_str());
314
28
    if (unlink_result.return_value_ == -1) {
315
20
      return statusAfterFileError(unlink_result);
316
20
    }
317
8
    return absl::OkStatus();
318
28
  }
319

            
320
private:
321
  Api::OsSysCalls& posix_;
322
  const std::string filename_;
323
};
324

            
325
} // namespace
326

            
327
CancelFunction AsyncFileManagerThreadPool::createAnonymousFile(
328
    Event::Dispatcher* dispatcher, absl::string_view path,
329
78
    absl::AnyInvocable<void(absl::StatusOr<AsyncFileHandle>)> on_complete) {
330
78
  return enqueue(dispatcher,
331
78
                 std::make_unique<ActionCreateAnonymousFile>(*this, path, std::move(on_complete)));
332
78
}
333

            
334
CancelFunction AsyncFileManagerThreadPool::openExistingFile(
335
    Event::Dispatcher* dispatcher, absl::string_view filename, Mode mode,
336
108
    absl::AnyInvocable<void(absl::StatusOr<AsyncFileHandle>)> on_complete) {
337
108
  return enqueue(dispatcher, std::make_unique<ActionOpenExistingFile>(*this, filename, mode,
338
108
                                                                      std::move(on_complete)));
339
108
}
340

            
341
CancelFunction AsyncFileManagerThreadPool::stat(
342
    Event::Dispatcher* dispatcher, absl::string_view filename,
343
36
    absl::AnyInvocable<void(absl::StatusOr<struct stat>)> on_complete) {
344
36
  return enqueue(dispatcher,
345
36
                 std::make_unique<ActionStat>(posix(), filename, std::move(on_complete)));
346
36
}
347

            
348
CancelFunction
349
AsyncFileManagerThreadPool::unlink(Event::Dispatcher* dispatcher, absl::string_view filename,
350
29
                                   absl::AnyInvocable<void(absl::Status)> on_complete) {
351
29
  return enqueue(dispatcher,
352
29
                 std::make_unique<ActionUnlink>(posix(), filename, std::move(on_complete)));
353
29
}
354

            
355
} // namespace AsyncFiles
356
} // namespace Common
357
} // namespace Extensions
358
} // namespace Envoy