Line data Source code
1 : #include "source/extensions/common/async_files/async_file_context_thread_pool.h"
2 :
3 : #include <fcntl.h>
4 :
5 : #include <memory>
6 : #include <string>
7 : #include <utility>
8 :
9 : #include "source/common/buffer/buffer_impl.h"
10 : #include "source/extensions/common/async_files/async_file_action.h"
11 : #include "source/extensions/common/async_files/async_file_context_base.h"
12 : #include "source/extensions/common/async_files/async_file_manager_thread_pool.h"
13 : #include "source/extensions/common/async_files/status_after_file_error.h"
14 :
15 : namespace Envoy {
16 : namespace Extensions {
17 : namespace Common {
18 : namespace AsyncFiles {
19 :
20 : namespace {
21 :
22 : template <typename T> class AsyncFileActionThreadPool : public AsyncFileActionWithResult<T> {
23 : public:
24 : explicit AsyncFileActionThreadPool(AsyncFileHandle handle, std::function<void(T)> on_complete)
25 0 : : AsyncFileActionWithResult<T>(on_complete), handle_(std::move(handle)) {}
26 :
27 : protected:
28 0 : int& fileDescriptor() { return context()->fileDescriptor(); }
29 0 : AsyncFileContextThreadPool* context() const {
30 0 : return static_cast<AsyncFileContextThreadPool*>(handle_.get());
31 0 : }
32 :
33 0 : Api::OsSysCalls& posix() const {
34 0 : return static_cast<AsyncFileManagerThreadPool&>(context()->manager()).posix();
35 0 : }
36 :
37 : AsyncFileHandle handle_;
38 : };
39 :
40 : class ActionStat : public AsyncFileActionThreadPool<absl::StatusOr<struct stat>> {
41 : public:
42 : ActionStat(AsyncFileHandle handle, std::function<void(absl::StatusOr<struct stat>)> on_complete)
43 0 : : AsyncFileActionThreadPool<absl::StatusOr<struct stat>>(handle, on_complete) {}
44 :
45 0 : absl::StatusOr<struct stat> executeImpl() override {
46 0 : ASSERT(fileDescriptor() != -1);
47 0 : struct stat stat_result;
48 0 : auto result = posix().fstat(fileDescriptor(), &stat_result);
49 0 : if (result.return_value_ != 0) {
50 0 : return statusAfterFileError(result);
51 0 : }
52 0 : return stat_result;
53 0 : }
54 : };
55 :
56 : class ActionCreateHardLink : public AsyncFileActionThreadPool<absl::Status> {
57 : public:
58 : ActionCreateHardLink(AsyncFileHandle handle, absl::string_view filename,
59 : std::function<void(absl::Status)> on_complete)
60 0 : : AsyncFileActionThreadPool<absl::Status>(handle, on_complete), filename_(filename) {}
61 :
62 0 : absl::Status executeImpl() override {
63 0 : ASSERT(fileDescriptor() != -1);
64 0 : std::string procfile = absl::StrCat("/proc/self/fd/", fileDescriptor());
65 0 : auto result = posix().linkat(fileDescriptor(), procfile.c_str(), AT_FDCWD, filename_.c_str(),
66 0 : AT_SYMLINK_FOLLOW);
67 0 : if (result.return_value_ == -1) {
68 0 : return statusAfterFileError(result);
69 0 : }
70 0 : return absl::OkStatus();
71 0 : }
72 :
73 0 : void onCancelledBeforeCallback(absl::Status result) override {
74 0 : if (result.ok()) {
75 0 : posix().unlink(filename_.c_str());
76 0 : }
77 0 : }
78 :
79 : private:
80 : const std::string filename_;
81 : };
82 :
83 : class ActionCloseFile : public AsyncFileActionThreadPool<absl::Status> {
84 : public:
85 : // Here we take a copy of the AsyncFileContext's file descriptor, because the close function
86 : // sets the AsyncFileContext's file descriptor to -1. This way there will be no race of trying
87 : // to use the handle again while the close is in flight.
88 : explicit ActionCloseFile(AsyncFileHandle handle, std::function<void(absl::Status)> on_complete)
89 : : AsyncFileActionThreadPool<absl::Status>(handle, on_complete),
90 0 : file_descriptor_(fileDescriptor()) {}
91 :
92 0 : absl::Status executeImpl() override {
93 0 : auto result = posix().close(file_descriptor_);
94 0 : if (result.return_value_ == -1) {
95 0 : return statusAfterFileError(result);
96 0 : }
97 0 : return absl::OkStatus();
98 0 : }
99 :
100 : private:
101 : const int file_descriptor_;
102 : };
103 :
104 : class ActionReadFile : public AsyncFileActionThreadPool<absl::StatusOr<Buffer::InstancePtr>> {
105 : public:
106 : ActionReadFile(AsyncFileHandle handle, off_t offset, size_t length,
107 : std::function<void(absl::StatusOr<Buffer::InstancePtr>)> on_complete)
108 : : AsyncFileActionThreadPool<absl::StatusOr<Buffer::InstancePtr>>(handle, on_complete),
109 0 : offset_(offset), length_(length) {}
110 :
111 0 : absl::StatusOr<Buffer::InstancePtr> executeImpl() override {
112 0 : ASSERT(fileDescriptor() != -1);
113 0 : auto result = std::make_unique<Buffer::OwnedImpl>();
114 0 : auto reservation = result->reserveSingleSlice(length_);
115 0 : auto bytes_read = posix().pread(fileDescriptor(), reservation.slice().mem_, length_, offset_);
116 0 : if (bytes_read.return_value_ == -1) {
117 0 : return statusAfterFileError(bytes_read);
118 0 : }
119 0 : if (static_cast<size_t>(bytes_read.return_value_) != length_) {
120 0 : result =
121 0 : std::make_unique<Buffer::OwnedImpl>(reservation.slice().mem_, bytes_read.return_value_);
122 0 : } else {
123 0 : reservation.commit(bytes_read.return_value_);
124 0 : }
125 0 : return result;
126 0 : }
127 :
128 : private:
129 : const off_t offset_;
130 : const size_t length_;
131 : };
132 :
133 : class ActionWriteFile : public AsyncFileActionThreadPool<absl::StatusOr<size_t>> {
134 : public:
135 : ActionWriteFile(AsyncFileHandle handle, Buffer::Instance& contents, off_t offset,
136 : std::function<void(absl::StatusOr<size_t>)> on_complete)
137 0 : : AsyncFileActionThreadPool<absl::StatusOr<size_t>>(handle, on_complete), offset_(offset) {
138 0 : contents_.move(contents);
139 0 : }
140 :
141 0 : absl::StatusOr<size_t> executeImpl() override {
142 0 : ASSERT(fileDescriptor() != -1);
143 0 : auto slices = contents_.getRawSlices();
144 0 : size_t total_bytes_written = 0;
145 0 : for (const auto& slice : slices) {
146 0 : size_t slice_bytes_written = 0;
147 0 : while (slice_bytes_written < slice.len_) {
148 0 : auto bytes_just_written =
149 0 : posix().pwrite(fileDescriptor(), static_cast<char*>(slice.mem_) + slice_bytes_written,
150 0 : slice.len_ - slice_bytes_written, offset_ + total_bytes_written);
151 0 : if (bytes_just_written.return_value_ == -1) {
152 0 : return statusAfterFileError(bytes_just_written);
153 0 : }
154 0 : slice_bytes_written += bytes_just_written.return_value_;
155 0 : total_bytes_written += bytes_just_written.return_value_;
156 0 : }
157 0 : }
158 0 : return total_bytes_written;
159 0 : }
160 :
161 : private:
162 : Buffer::OwnedImpl contents_;
163 : const off_t offset_;
164 : };
165 :
166 : class ActionDuplicateFile : public AsyncFileActionThreadPool<absl::StatusOr<AsyncFileHandle>> {
167 : public:
168 : ActionDuplicateFile(AsyncFileHandle handle,
169 : std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete)
170 0 : : AsyncFileActionThreadPool<absl::StatusOr<AsyncFileHandle>>(handle, on_complete) {}
171 :
172 0 : absl::StatusOr<AsyncFileHandle> executeImpl() override {
173 0 : ASSERT(fileDescriptor() != -1);
174 0 : auto newfd = posix().duplicate(fileDescriptor());
175 0 : if (newfd.return_value_ == -1) {
176 0 : return statusAfterFileError(newfd);
177 0 : }
178 0 : return std::make_shared<AsyncFileContextThreadPool>(context()->manager(), newfd.return_value_);
179 0 : }
180 :
181 0 : void onCancelledBeforeCallback(absl::StatusOr<AsyncFileHandle> result) override {
182 0 : if (result.ok()) {
183 0 : result.value()->close([](absl::Status) {}).IgnoreError();
184 0 : }
185 0 : }
186 : };
187 :
188 : } // namespace
189 :
190 : absl::StatusOr<CancelFunction>
191 0 : AsyncFileContextThreadPool::stat(std::function<void(absl::StatusOr<struct stat>)> on_complete) {
192 0 : return checkFileAndEnqueue(std::make_shared<ActionStat>(handle(), std::move(on_complete)));
193 0 : }
194 :
195 : absl::StatusOr<CancelFunction>
196 : AsyncFileContextThreadPool::createHardLink(absl::string_view filename,
197 0 : std::function<void(absl::Status)> on_complete) {
198 0 : return checkFileAndEnqueue(
199 0 : std::make_shared<ActionCreateHardLink>(handle(), filename, std::move(on_complete)));
200 0 : }
201 :
202 0 : absl::Status AsyncFileContextThreadPool::close(std::function<void(absl::Status)> on_complete) {
203 0 : auto status =
204 0 : checkFileAndEnqueue(std::make_shared<ActionCloseFile>(handle(), std::move(on_complete)))
205 0 : .status();
206 0 : fileDescriptor() = -1;
207 0 : return status;
208 0 : }
209 :
210 : absl::StatusOr<CancelFunction> AsyncFileContextThreadPool::read(
211 : off_t offset, size_t length,
212 0 : std::function<void(absl::StatusOr<Buffer::InstancePtr>)> on_complete) {
213 0 : return checkFileAndEnqueue(
214 0 : std::make_shared<ActionReadFile>(handle(), offset, length, std::move(on_complete)));
215 0 : }
216 :
217 : absl::StatusOr<CancelFunction>
218 : AsyncFileContextThreadPool::write(Buffer::Instance& contents, off_t offset,
219 0 : std::function<void(absl::StatusOr<size_t>)> on_complete) {
220 0 : return checkFileAndEnqueue(
221 0 : std::make_shared<ActionWriteFile>(handle(), contents, offset, std::move(on_complete)));
222 0 : }
223 :
224 : absl::StatusOr<CancelFunction> AsyncFileContextThreadPool::duplicate(
225 0 : std::function<void(absl::StatusOr<AsyncFileHandle>)> on_complete) {
226 0 : return checkFileAndEnqueue(
227 0 : std::make_shared<ActionDuplicateFile>(handle(), std::move(on_complete)));
228 0 : }
229 :
230 : absl::StatusOr<CancelFunction>
231 0 : AsyncFileContextThreadPool::checkFileAndEnqueue(std::shared_ptr<AsyncFileAction> action) {
232 0 : if (fileDescriptor() == -1) {
233 0 : return absl::FailedPreconditionError("file was already closed");
234 0 : }
235 0 : return enqueue(action);
236 0 : }
237 :
238 : AsyncFileContextThreadPool::AsyncFileContextThreadPool(AsyncFileManager& manager, int fd)
239 0 : : AsyncFileContextBase(manager), file_descriptor_(fd) {}
240 :
241 0 : AsyncFileContextThreadPool::~AsyncFileContextThreadPool() { ASSERT(file_descriptor_ == -1); }
242 :
243 : } // namespace AsyncFiles
244 : } // namespace Common
245 : } // namespace Extensions
246 : } // namespace Envoy
|