1
#include "source/extensions/http/cache_v2/file_system_http_cache/insert_context.h"
2

            
3
#include "source/common/protobuf/utility.h"
4
#include "source/extensions/http/cache_v2/file_system_http_cache/cache_file_header_proto_util.h"
5
#include "source/extensions/http/cache_v2/file_system_http_cache/cache_file_reader.h"
6
#include "source/extensions/http/cache_v2/file_system_http_cache/file_system_http_cache.h"
7
#include "source/extensions/http/cache_v2/file_system_http_cache/lookup_context.h"
8

            
9
namespace Envoy {
10
namespace Extensions {
11
namespace HttpFilters {
12
namespace CacheV2 {
13
namespace FileSystemHttpCache {
14

            
15
// Arbitrary 128K fragments to balance memory usage and speed.
16
static constexpr size_t MaxInsertFragmentSize = 128 * 1024;
17

            
18
using Common::AsyncFiles::AsyncFileHandle;
19
using Common::AsyncFiles::AsyncFileManager;
20

            
21
void FileInsertContext::begin(Event::Dispatcher& dispatcher, Key key, std::string filepath,
22
                              Http::ResponseHeaderMapPtr headers, ResponseMetadata metadata,
23
                              HttpSourcePtr source, std::shared_ptr<CacheProgressReceiver> progress,
24
                              std::shared_ptr<CacheShared> stat_recorder,
25
16
                              AsyncFileManager& file_manager) {
26
16
  auto p = new FileInsertContext(dispatcher, std::move(key), std::move(filepath),
27
16
                                 std::move(headers), std::move(metadata), std::move(source),
28
16
                                 std::move(progress), std::move(stat_recorder));
29
16
  p->createFile(file_manager);
30
16
}
31

            
32
FileInsertContext::FileInsertContext(Event::Dispatcher& dispatcher, Key key, std::string filepath,
33
                                     Http::ResponseHeaderMapPtr headers, ResponseMetadata metadata,
34
                                     HttpSourcePtr source,
35
                                     std::shared_ptr<CacheProgressReceiver> progress,
36
                                     std::shared_ptr<CacheShared> stat_recorder)
37
16
    : dispatcher_(dispatcher), filepath_(std::move(filepath)),
38
16
      cache_file_header_proto_(makeCacheFileHeaderProto(key, *headers, metadata)),
39
16
      headers_(std::move(headers)), source_(std::move(source)),
40
16
      progress_receiver_(std::move(progress)), stat_recorder_(std::move(stat_recorder)) {}
41

            
42
6
void FileInsertContext::fail(absl::Status status) {
43
6
  progress_receiver_->onInsertFailed(status);
44
6
  if (file_handle_) {
45
5
    auto queued = file_handle_->close(nullptr, [](absl::Status) {});
46
5
    ASSERT(queued.ok());
47
5
  }
48
6
  delete this;
49
6
}
50

            
51
10
void FileInsertContext::complete() {
52
10
  auto queued = file_handle_->close(nullptr, [](absl::Status) {});
53
10
  ASSERT(queued.ok());
54
10
  delete this;
55
10
}
56

            
57
16
void FileInsertContext::createFile(AsyncFileManager& file_manager) {
58
16
  absl::string_view cache_path = absl::string_view{filepath_};
59
16
  cache_path = absl::string_view{cache_path.begin(), cache_path.rfind('/') + 1};
60
16
  file_manager.createAnonymousFile(
61
16
      &dispatcher_, cache_path, [this](absl::StatusOr<AsyncFileHandle> open_result) -> void {
62
16
        if (!open_result.ok()) {
63
1
          return fail(
64
1
              absl::Status(open_result.status().code(),
65
1
                           fmt::format("create file failed: {}", open_result.status().message())));
66
1
        }
67
15
        file_handle_ = std::move(open_result.value());
68
15
        dupFile();
69
15
      });
70
16
}
71

            
72
15
void FileInsertContext::dupFile() {
73
15
  auto queued =
74
15
      file_handle_->duplicate(&dispatcher_, [this](absl::StatusOr<AsyncFileHandle> dup_result) {
75
15
        if (!dup_result.ok()) {
76
1
          return fail(
77
1
              absl::Status(dup_result.status().code(), fmt::format("duplicate file failed: {}",
78
1
                                                                   dup_result.status().message())));
79
1
        }
80
14
        bool end_stream = source_ == nullptr;
81
14
        progress_receiver_->onHeadersInserted(
82
14
            std::make_unique<CacheFileReader>(std::move(dup_result.value())), std::move(headers_),
83
14
            end_stream);
84
14
        writeEmptyHeaderBlock();
85
14
      });
86
15
  ASSERT(queued.ok(), queued.status().ToString());
87
15
}
88

            
89
14
void FileInsertContext::writeEmptyHeaderBlock() {
90
14
  Buffer::OwnedImpl unset_header;
91
14
  header_block_.serializeToBuffer(unset_header);
92
  // Write an empty header block.
93
14
  auto queued = file_handle_->write(
94
14
      &dispatcher_, unset_header, 0, [this](absl::StatusOr<size_t> write_result) {
95
14
        if (!write_result.ok()) {
96
1
          return fail(absl::Status(
97
1
              write_result.status().code(),
98
1
              fmt::format("write to file failed: {}", write_result.status().message())));
99
13
        } else if (write_result.value() != CacheFileFixedBlock::size()) {
100
          return fail(absl::UnavailableError(
101
              fmt::format("write to file failed; wrote {} bytes instead of {}",
102
                          write_result.value(), CacheFileFixedBlock::size())));
103
        }
104
13
        if (source_) {
105
11
          getBody();
106
11
        } else {
107
2
          writeHeaders();
108
2
        }
109
13
      });
110
14
  ASSERT(queued.ok(), queued.status().ToString());
111
14
}
112

            
113
15
void FileInsertContext::getBody() {
114
15
  ASSERT(source_);
115
15
  source_->getBody(AdjustedByteRange(read_pos_, read_pos_ + MaxInsertFragmentSize),
116
15
                   [this](Buffer::InstancePtr buf, EndStream end_stream) {
117
15
                     if (end_stream == EndStream::Reset) {
118
1
                       return fail(
119
1
                           absl::UnavailableError("cache write failed due to upstream reset"));
120
1
                     }
121
14
                     if (buf == nullptr) {
122
5
                       if (end_stream == EndStream::End) {
123
1
                         progress_receiver_->onBodyInserted(AdjustedByteRange(0, read_pos_), true);
124
1
                         writeHeaders();
125
4
                       } else {
126
4
                         getTrailers();
127
4
                       }
128
9
                     } else {
129
9
                       read_pos_ += buf->length();
130
9
                       onBody(std::move(buf), end_stream == EndStream::End);
131
9
                     }
132
14
                   });
133
15
}
134

            
135
9
void FileInsertContext::onBody(Buffer::InstancePtr buf, bool end_stream) {
136
9
  ASSERT(buf);
137
9
  size_t len = buf->length();
138
9
  auto queued = file_handle_->write(
139
9
      &dispatcher_, *buf, header_block_.offsetToBody() + header_block_.bodySize(),
140
9
      [this, len, end_stream](absl::StatusOr<size_t> write_result) {
141
9
        if (!write_result.ok()) {
142
1
          return fail(absl::Status(
143
1
              write_result.status().code(),
144
1
              fmt::format("write to file failed: {}", write_result.status().message())));
145
8
        } else if (write_result.value() != len) {
146
          return fail(absl::UnavailableError(fmt::format(
147
              "write to file failed: wrote {} bytes instead of {}", write_result.value(), len)));
148
        }
149
8
        progress_receiver_->onBodyInserted(
150
8
            AdjustedByteRange(header_block_.bodySize(), header_block_.bodySize() + len),
151
8
            end_stream);
152
8
        header_block_.setBodySize(header_block_.bodySize() + len);
153
8
        if (end_stream) {
154
4
          writeHeaders();
155
4
        } else {
156
4
          getBody();
157
4
        }
158
8
      });
159
9
  ASSERT(queued.ok(), queued.status().ToString());
160
9
}
161

            
162
4
void FileInsertContext::getTrailers() {
163
4
  source_->getTrailers([this](Http::ResponseTrailerMapPtr trailers, EndStream end_stream) {
164
4
    if (end_stream == EndStream::Reset) {
165
1
      return fail(
166
1
          absl::UnavailableError("write to cache failed, upstream reset during getTrailers"));
167
1
    }
168
3
    onTrailers(std::move(trailers));
169
3
  });
170
4
}
171

            
172
3
void FileInsertContext::onTrailers(Http::ResponseTrailerMapPtr trailers) {
173
3
  CacheFileTrailer trailer_proto = makeCacheFileTrailerProto(*trailers);
174
3
  progress_receiver_->onTrailersInserted(std::move(trailers));
175
3
  Buffer::OwnedImpl trailer_buffer = bufferFromProto(trailer_proto);
176
3
  header_block_.setTrailersSize(trailer_buffer.length());
177
3
  auto queued = file_handle_->write(&dispatcher_, trailer_buffer, header_block_.offsetToTrailers(),
178
3
                                    [this](absl::StatusOr<size_t> write_result) {
179
3
                                      if (!write_result.ok() ||
180
3
                                          write_result.value() != header_block_.trailerSize()) {
181
                                        // We've already told the client that the write worked, and
182
                                        // it already has the data they need, so we can act like it
183
                                        // was complete until the next lookup, even though the file
184
                                        // didn't actually get linked.
185
1
                                        return complete();
186
1
                                      }
187
2
                                      writeHeaders();
188
2
                                    });
189
3
  ASSERT(queued.ok(), queued.status().ToString());
190
3
}
191

            
192
9
void FileInsertContext::writeHeaders() {
193
9
  Buffer::OwnedImpl header_buffer = bufferFromProto(cache_file_header_proto_);
194
9
  header_block_.setHeadersSize(header_buffer.length());
195
9
  auto queued = file_handle_->write(&dispatcher_, header_buffer, header_block_.offsetToHeaders(),
196
9
                                    [this](absl::StatusOr<size_t> write_result) {
197
9
                                      if (!write_result.ok() ||
198
9
                                          write_result.value() != header_block_.headerSize()) {
199
                                        // We've already told the client that the write worked, and
200
                                        // it already has the data they need, so we can act like it
201
                                        // was complete until the next lookup, even though the file
202
                                        // didn't actually get linked.
203
                                        return complete();
204
                                      }
205
9
                                      commit();
206
9
                                    });
207
9
  ASSERT(queued.ok(), queued.status().ToString());
208
9
}
209

            
210
9
void FileInsertContext::commit() {
211
  // now that the header block knows the size of all the pieces, overwrite it in the file.
212
9
  Buffer::OwnedImpl block_buffer;
213
9
  header_block_.serializeToBuffer(block_buffer);
214
9
  auto queued = file_handle_->write(
215
9
      &dispatcher_, block_buffer, 0, [this](absl::StatusOr<size_t> write_result) {
216
9
        if (!write_result.ok() || write_result.value() != CacheFileFixedBlock::size()) {
217
          // We've already told the client that the write worked, and it already
218
          // has the data they need, so we can act like it was complete until
219
          // the next lookup, even though the file didn't actually get linked.
220
          return complete();
221
        }
222
9
        createHardLink();
223
9
      });
224
9
  ASSERT(queued.ok(), queued.status().ToString());
225
9
}
226

            
227
9
void FileInsertContext::createHardLink() {
228
9
  auto queued =
229
9
      file_handle_->createHardLink(&dispatcher_, filepath_, [this](absl::Status link_result) {
230
9
        if (!link_result.ok()) {
231
          ENVOY_LOG(error, "failed to link file {}: {}", filepath_, link_result);
232
          return complete();
233
        }
234
9
        ENVOY_LOG(debug, "created cache file {}", filepath_);
235
9
        uint64_t file_size = header_block_.offsetToTrailers() + header_block_.trailerSize();
236
9
        stat_recorder_->trackFileAdded(file_size);
237
9
        complete();
238
9
      });
239
9
  ASSERT(queued.ok(), queued.status().ToString());
240
9
}
241

            
242
} // namespace FileSystemHttpCache
243
} // namespace CacheV2
244
} // namespace HttpFilters
245
} // namespace Extensions
246
} // namespace Envoy