1
#include "source/extensions/common/async_files/async_file_context_thread_pool.h"
2

            
3
#include <fcntl.h>
4

            
5
#include <memory>
6
#include <string>
7
#include <utility>
8

            
9
#include "source/common/buffer/buffer_impl.h"
10
#include "source/extensions/common/async_files/async_file_action.h"
11
#include "source/extensions/common/async_files/async_file_context_base.h"
12
#include "source/extensions/common/async_files/async_file_manager_thread_pool.h"
13
#include "source/extensions/common/async_files/status_after_file_error.h"
14

            
15
namespace Envoy {
16
namespace Extensions {
17
namespace Common {
18
namespace AsyncFiles {
19

            
20
namespace {
21

            
22
template <typename T> class AsyncFileActionThreadPool : public AsyncFileActionWithResult<T> {
23
public:
24
  explicit AsyncFileActionThreadPool(AsyncFileHandle handle,
25
                                     absl::AnyInvocable<void(T)> on_complete)
26
3821
      : AsyncFileActionWithResult<T>(std::move(on_complete)), handle_(std::move(handle)) {}
27

            
28
protected:
29
4051
  int& fileDescriptor() { return context()->fileDescriptor(); }
30
8073
  AsyncFileContextThreadPool* context() const {
31
8073
    return static_cast<AsyncFileContextThreadPool*>(handle_.get());
32
8073
  }
33

            
34
4009
  Api::OsSysCalls& posix() const {
35
4009
    return static_cast<AsyncFileManagerThreadPool&>(context()->manager()).posix();
36
4009
  }
37

            
38
  AsyncFileHandle handle_;
39
};
40

            
41
class ActionStat : public AsyncFileActionThreadPool<absl::StatusOr<struct stat>> {
42
public:
43
  ActionStat(AsyncFileHandle handle,
44
             absl::AnyInvocable<void(absl::StatusOr<struct stat>)> on_complete)
45
11
      : AsyncFileActionThreadPool<absl::StatusOr<struct stat>>(handle, std::move(on_complete)) {}
46

            
47
11
  absl::StatusOr<struct stat> executeImpl() override {
48
11
    ASSERT(fileDescriptor() != -1);
49
11
    struct stat stat_result;
50
11
    auto result = posix().fstat(fileDescriptor(), &stat_result);
51
11
    if (result.return_value_ != 0) {
52
1
      return statusAfterFileError(result);
53
1
    }
54
10
    return stat_result;
55
11
  }
56
};
57

            
58
class ActionCreateHardLink : public AsyncFileActionThreadPool<absl::Status> {
59
public:
60
  ActionCreateHardLink(AsyncFileHandle handle, absl::string_view filename,
61
                       absl::AnyInvocable<void(absl::Status)> on_complete)
62
43
      : AsyncFileActionThreadPool<absl::Status>(handle, std::move(on_complete)),
63
43
        filename_(filename) {}
64

            
65
43
  absl::Status executeImpl() override {
66
43
    ASSERT(fileDescriptor() != -1);
67
43
    std::string procfile = absl::StrCat("/proc/self/fd/", fileDescriptor());
68
43
    auto result = posix().linkat(fileDescriptor(), procfile.c_str(), AT_FDCWD, filename_.c_str(),
69
43
                                 AT_SYMLINK_FOLLOW);
70
43
    if (result.return_value_ == -1) {
71
3
      return statusAfterFileError(result);
72
3
    }
73
40
    return absl::OkStatus();
74
43
  }
75

            
76
3
  void onCancelledBeforeCallback() override {
77
3
    if (result_.value().ok()) {
78
2
      posix().unlink(filename_.c_str());
79
2
    }
80
3
  }
81
41
  bool hasActionIfCancelledBeforeCallback() const override { return true; }
82

            
83
private:
84
  const std::string filename_;
85
};
86

            
87
class ActionCloseFile : public AsyncFileActionThreadPool<absl::Status> {
88
public:
89
  // Here we take a copy of the AsyncFileContext's file descriptor, because the close function
90
  // sets the AsyncFileContext's file descriptor to -1. This way there will be no race of trying
91
  // to use the handle again while the close is in flight.
92
  explicit ActionCloseFile(AsyncFileHandle handle,
93
                           absl::AnyInvocable<void(absl::Status)> on_complete)
94
156
      : AsyncFileActionThreadPool<absl::Status>(handle, std::move(on_complete)),
95
156
        file_descriptor_(fileDescriptor()) {}
96

            
97
155
  absl::Status executeImpl() override {
98
155
    auto result = posix().close(file_descriptor_);
99
155
    if (result.return_value_ == -1) {
100
1
      return statusAfterFileError(result);
101
1
    }
102
154
    return absl::OkStatus();
103
155
  }
104

            
105
1
  bool executesEvenIfCancelled() const override { return true; }
106

            
107
private:
108
  const int file_descriptor_;
109
};
110

            
111
class ActionReadFile : public AsyncFileActionThreadPool<absl::StatusOr<Buffer::InstancePtr>> {
112
public:
113
  ActionReadFile(AsyncFileHandle handle, off_t offset, size_t length,
114
                 absl::AnyInvocable<void(absl::StatusOr<Buffer::InstancePtr>)> on_complete)
115
1797
      : AsyncFileActionThreadPool<absl::StatusOr<Buffer::InstancePtr>>(handle,
116
1797
                                                                       std::move(on_complete)),
117
1797
        offset_(offset), length_(length) {}
118

            
119
1797
  absl::StatusOr<Buffer::InstancePtr> executeImpl() override {
120
1797
    ASSERT(fileDescriptor() != -1);
121
1797
    auto result = std::make_unique<Buffer::OwnedImpl>();
122
1797
    auto reservation = result->reserveSingleSlice(length_);
123
1797
    auto bytes_read = posix().pread(fileDescriptor(), reservation.slice().mem_, length_, offset_);
124
1797
    if (bytes_read.return_value_ == -1) {
125
1
      return statusAfterFileError(bytes_read);
126
1
    }
127
1796
    if (static_cast<size_t>(bytes_read.return_value_) != length_) {
128
2
      result =
129
2
          std::make_unique<Buffer::OwnedImpl>(reservation.slice().mem_, bytes_read.return_value_);
130
1794
    } else {
131
1794
      reservation.commit(bytes_read.return_value_);
132
1794
    }
133
1796
    return result;
134
1797
  }
135

            
136
private:
137
  const off_t offset_;
138
  const size_t length_;
139
};
140

            
141
class ActionWriteFile : public AsyncFileActionThreadPool<absl::StatusOr<size_t>> {
142
public:
143
  ActionWriteFile(AsyncFileHandle handle, Buffer::Instance& contents, off_t offset,
144
                  absl::AnyInvocable<void(absl::StatusOr<size_t>)> on_complete)
145
1797
      : AsyncFileActionThreadPool<absl::StatusOr<size_t>>(handle, std::move(on_complete)),
146
1797
        offset_(offset) {
147
1797
    contents_.move(contents);
148
1797
  }
149

            
150
1797
  absl::StatusOr<size_t> executeImpl() override {
151
1797
    ASSERT(fileDescriptor() != -1);
152
1797
    auto slices = contents_.getRawSlices();
153
1797
    size_t total_bytes_written = 0;
154
1983
    for (const auto& slice : slices) {
155
1983
      size_t slice_bytes_written = 0;
156
3966
      while (slice_bytes_written < slice.len_) {
157
1984
        auto bytes_just_written =
158
1984
            posix().pwrite(fileDescriptor(), static_cast<char*>(slice.mem_) + slice_bytes_written,
159
1984
                           slice.len_ - slice_bytes_written, offset_ + total_bytes_written);
160
1984
        if (bytes_just_written.return_value_ == -1) {
161
1
          return statusAfterFileError(bytes_just_written);
162
1
        }
163
1983
        slice_bytes_written += bytes_just_written.return_value_;
164
1983
        total_bytes_written += bytes_just_written.return_value_;
165
1983
      }
166
1983
    }
167
1796
    return total_bytes_written;
168
1797
  }
169

            
170
private:
171
  Buffer::OwnedImpl contents_;
172
  const off_t offset_;
173
};
174

            
175
class ActionTruncateFile : public AsyncFileActionThreadPool<absl::Status> {
176
public:
177
  ActionTruncateFile(AsyncFileHandle handle, size_t length,
178
                     absl::AnyInvocable<void(absl::Status)> on_complete)
179
3
      : AsyncFileActionThreadPool<absl::Status>(handle, std::move(on_complete)), length_(length) {}
180

            
181
3
  absl::Status executeImpl() override {
182
3
    ASSERT(fileDescriptor() != -1);
183
3
    Api::SysCallIntResult result = posix().ftruncate(fileDescriptor(), length_);
184
3
    if (result.return_value_ == -1) {
185
1
      return statusAfterFileError(result);
186
1
    }
187
2
    return absl::OkStatus();
188
3
  }
189

            
190
private:
191
  const size_t length_;
192
};
193

            
194
class ActionDuplicateFile : public AsyncFileActionThreadPool<absl::StatusOr<AsyncFileHandle>> {
195
public:
196
  ActionDuplicateFile(AsyncFileHandle handle,
197
                      absl::AnyInvocable<void(absl::StatusOr<AsyncFileHandle>)> on_complete)
198
14
      : AsyncFileActionThreadPool<absl::StatusOr<AsyncFileHandle>>(handle, std::move(on_complete)) {
199
14
  }
200

            
201
14
  absl::StatusOr<AsyncFileHandle> executeImpl() override {
202
14
    ASSERT(fileDescriptor() != -1);
203
14
    auto newfd = posix().duplicate(fileDescriptor());
204
14
    if (newfd.return_value_ == -1) {
205
1
      return statusAfterFileError(newfd);
206
1
    }
207
13
    return std::make_shared<AsyncFileContextThreadPool>(context()->manager(), newfd.return_value_);
208
14
  }
209

            
210
1
  void onCancelledBeforeCallback() override {
211
1
    if (result_.value().ok()) {
212
1
      result_.value().value()->close(nullptr, [](absl::Status) {}).IgnoreError();
213
1
    }
214
1
  }
215
13
  bool hasActionIfCancelledBeforeCallback() const override { return true; }
216
};
217

            
218
} // namespace
219

            
220
absl::StatusOr<CancelFunction> AsyncFileContextThreadPool::stat(
221
    Event::Dispatcher* dispatcher,
222
11
    absl::AnyInvocable<void(absl::StatusOr<struct stat>)> on_complete) {
223
11
  return checkFileAndEnqueue(dispatcher,
224
11
                             std::make_unique<ActionStat>(handle(), std::move(on_complete)));
225
11
}
226

            
227
absl::StatusOr<CancelFunction>
228
AsyncFileContextThreadPool::createHardLink(Event::Dispatcher* dispatcher,
229
                                           absl::string_view filename,
230
43
                                           absl::AnyInvocable<void(absl::Status)> on_complete) {
231
43
  return checkFileAndEnqueue(dispatcher, std::make_unique<ActionCreateHardLink>(
232
43
                                             handle(), filename, std::move(on_complete)));
233
43
}
234

            
235
absl::StatusOr<CancelFunction>
236
AsyncFileContextThreadPool::close(Event::Dispatcher* dispatcher,
237
156
                                  absl::AnyInvocable<void(absl::Status)> on_complete) {
238
156
  auto ret = checkFileAndEnqueue(
239
156
      dispatcher, std::make_unique<ActionCloseFile>(handle(), std::move(on_complete)));
240
156
  fileDescriptor() = -1;
241
156
  return ret;
242
156
}
243

            
244
absl::StatusOr<CancelFunction> AsyncFileContextThreadPool::read(
245
    Event::Dispatcher* dispatcher, off_t offset, size_t length,
246
1797
    absl::AnyInvocable<void(absl::StatusOr<Buffer::InstancePtr>)> on_complete) {
247
1797
  return checkFileAndEnqueue(dispatcher, std::make_unique<ActionReadFile>(handle(), offset, length,
248
1797
                                                                          std::move(on_complete)));
249
1797
}
250

            
251
absl::StatusOr<CancelFunction>
252
AsyncFileContextThreadPool::write(Event::Dispatcher* dispatcher, Buffer::Instance& contents,
253
                                  off_t offset,
254
1797
                                  absl::AnyInvocable<void(absl::StatusOr<size_t>)> on_complete) {
255
1797
  return checkFileAndEnqueue(dispatcher, std::make_unique<ActionWriteFile>(
256
1797
                                             handle(), contents, offset, std::move(on_complete)));
257
1797
}
258

            
259
absl::StatusOr<CancelFunction> AsyncFileContextThreadPool::duplicate(
260
    Event::Dispatcher* dispatcher,
261
14
    absl::AnyInvocable<void(absl::StatusOr<AsyncFileHandle>)> on_complete) {
262
14
  return checkFileAndEnqueue(
263
14
      dispatcher, std::make_unique<ActionDuplicateFile>(handle(), std::move(on_complete)));
264
14
}
265

            
266
absl::StatusOr<CancelFunction>
267
AsyncFileContextThreadPool::truncate(Event::Dispatcher* dispatcher, size_t length,
268
3
                                     absl::AnyInvocable<void(absl::Status)> on_complete) {
269
3
  return checkFileAndEnqueue(
270
3
      dispatcher, std::make_unique<ActionTruncateFile>(handle(), length, std::move(on_complete)));
271
3
}
272

            
273
absl::StatusOr<CancelFunction>
274
AsyncFileContextThreadPool::checkFileAndEnqueue(Event::Dispatcher* dispatcher,
275
3821
                                                std::unique_ptr<AsyncFileAction> action) {
276
3821
  if (fileDescriptor() == -1) {
277
1
    return absl::FailedPreconditionError("file was already closed");
278
1
  }
279
3820
  return enqueue(dispatcher, std::move(action));
280
3821
}
281

            
282
AsyncFileContextThreadPool::AsyncFileContextThreadPool(AsyncFileManager& manager, int fd)
283
155
    : AsyncFileContextBase(manager), file_descriptor_(fd) {}
284

            
285
155
AsyncFileContextThreadPool::~AsyncFileContextThreadPool() { ASSERT(file_descriptor_ == -1); }
286

            
287
} // namespace AsyncFiles
288
} // namespace Common
289
} // namespace Extensions
290
} // namespace Envoy