1
#pragma once
2

            
3
#include <memory>
4
#include <queue>
5
#include <string>
6
#include <thread>
7

            
8
#include "envoy/extensions/common/async_files/v3/async_file_manager.pb.h"
9

            
10
#include "source/extensions/common/async_files/async_file_handle.h"
11
#include "source/extensions/common/async_files/async_file_manager.h"
12

            
13
#include "absl/base/thread_annotations.h"
14
#include "absl/status/statusor.h"
15

            
16
namespace Envoy {
17
namespace Extensions {
18
namespace Common {
19
namespace AsyncFiles {
20

            
21
// An AsyncFileManager which performs file operations in a thread pool.
22
// The operations are passed in through a queue, and performed in the order they are
23
// received, except when operations are chained, in which case the thread that
24
// performed the previous action in the chain immediately performs the newly chained
25
// action.
26
class AsyncFileManagerThreadPool : public AsyncFileManager,
27
                                   public std::enable_shared_from_this<AsyncFileManagerThreadPool>,
28
                                   protected Logger::Loggable<Logger::Id::main> {
29
public:
30
  explicit AsyncFileManagerThreadPool(
31
      const envoy::extensions::common::async_files::v3::AsyncFileManagerConfig& config,
32
      Api::OsSysCalls& posix);
33
  ~AsyncFileManagerThreadPool() ABSL_LOCKS_EXCLUDED(queue_mutex_) override;
34
  CancelFunction createAnonymousFile(
35
      Event::Dispatcher* dispatcher, absl::string_view path,
36
      absl::AnyInvocable<void(absl::StatusOr<AsyncFileHandle>)> on_complete) override;
37
  CancelFunction
38
  openExistingFile(Event::Dispatcher* dispatcher, absl::string_view filename, Mode mode,
39
                   absl::AnyInvocable<void(absl::StatusOr<AsyncFileHandle>)> on_complete) override;
40
  CancelFunction stat(Event::Dispatcher* dispatcher, absl::string_view filename,
41
                      absl::AnyInvocable<void(absl::StatusOr<struct stat>)> on_complete) override;
42
  CancelFunction unlink(Event::Dispatcher* dispatcher, absl::string_view filename,
43
                        absl::AnyInvocable<void(absl::Status)> on_complete) override;
44
  std::string describe() const override;
45
  void waitForIdle() override;
46
4267
  Api::OsSysCalls& posix() const { return posix_; }
47

            
48
#ifdef O_TMPFILE
49
  // The first time we try to open an anonymous file, these values are used to capture whether
50
  // opening with O_TMPFILE works. If it does not, the first open is retried using 'mkstemp',
51
  // and all subsequent tries are redirected down that path. If it works, the first try is
52
  // used, and all subsequent tries use the same mechanism.
53
  // This is per-manager rather than static in part to facilitate testing, but also because
54
  // if there are multiple managers they may be operating on different file-systems with
55
  // different capabilities.
56
  std::once_flag once_flag_;
57
  bool supports_o_tmpfile_;
58
#endif // O_TMPFILE
59

            
60
private:
61
  absl::AnyInvocable<void()> enqueue(Event::Dispatcher* dispatcher,
62
                                     std::unique_ptr<AsyncFileAction> action)
63
      ABSL_LOCKS_EXCLUDED(queue_mutex_) override;
64
  void postCancelledActionForCleanup(std::unique_ptr<AsyncFileAction> action)
65
      ABSL_LOCKS_EXCLUDED(queue_mutex_) override;
66
  void worker() ABSL_LOCKS_EXCLUDED(queue_mutex_);
67

            
68
  absl::Mutex queue_mutex_;
69
  void executeAction(QueuedAction&& action);
70
  std::queue<QueuedAction> queue_ ABSL_GUARDED_BY(queue_mutex_);
71
  int active_workers_ ABSL_GUARDED_BY(queue_mutex_) = 0;
72
  std::queue<std::unique_ptr<AsyncFileAction>> cleanup_queue_ ABSL_GUARDED_BY(queue_mutex_);
73
  bool terminate_ ABSL_GUARDED_BY(queue_mutex_) = false;
74

            
75
  std::vector<std::thread> thread_pool_;
76
  Api::OsSysCalls& posix_;
77
};
78

            
79
} // namespace AsyncFiles
80
} // namespace Common
81
} // namespace Extensions
82
} // namespace Envoy