1
#include "source/extensions/http/cache/file_system_http_cache/insert_context.h"
2

            
3
#include "source/common/protobuf/utility.h"
4
#include "source/extensions/http/cache/file_system_http_cache/cache_file_header_proto_util.h"
5
#include "source/extensions/http/cache/file_system_http_cache/file_system_http_cache.h"
6
#include "source/extensions/http/cache/file_system_http_cache/lookup_context.h"
7

            
8
namespace Envoy {
9
namespace Extensions {
10
namespace HttpFilters {
11
namespace Cache {
12
namespace FileSystemHttpCache {
13

            
14
namespace {
15
std::string writeFailureMessage(absl::string_view kind, absl::StatusOr<size_t> result,
16
5
                                size_t wanted) {
17
5
  if (result.ok()) {
18
1
    return fmt::format("incomplete write of {} - wrote {}, expected {}", kind, result.value(),
19
1
                       wanted);
20
4
  } else {
21
4
    return fmt::format("write failed of {}: {}", kind, result.status());
22
4
  }
23
5
}
24
} // namespace
25

            
26
FileInsertContext::FileInsertContext(std::shared_ptr<FileSystemHttpCache> cache,
27
                                     std::unique_ptr<FileLookupContext> lookup_context)
28
34
    : lookup_context_(std::move(lookup_context)), key_(lookup_context_->lookup().key()),
29
34
      cache_(std::move(cache)) {}
30

            
31
void FileInsertContext::insertHeaders(const Http::ResponseHeaderMap& response_headers,
32
                                      const ResponseMetadata& metadata,
33
34
                                      InsertCallback insert_complete, bool end_stream) {
34
34
  ASSERT(dispatcher()->isThreadSafe());
35
34
  callback_in_flight_ = std::move(insert_complete);
36
34
  const VaryAllowList& vary_allow_list = lookup_context_->lookup().varyAllowList();
37
34
  const Http::RequestHeaderMap& request_headers = lookup_context_->lookup().requestHeaders();
38
34
  if (VaryHeaderUtils::hasVary(response_headers)) {
39
6
    auto vary_header_values = VaryHeaderUtils::getVaryValues(response_headers);
40
6
    Key old_key = key_;
41
6
    const auto vary_identifier =
42
6
        VaryHeaderUtils::createVaryIdentifier(vary_allow_list, vary_header_values, request_headers);
43
6
    if (vary_identifier.has_value()) {
44
5
      key_.add_custom_fields(vary_identifier.value());
45
5
    } else {
46
      // No error for this cancel, it's just an entry that's ineligible for insertion.
47
1
      cancelInsert();
48
1
      return;
49
1
    }
50
5
    cleanup_ =
51
5
        cache_->setCacheEntryToVary(*dispatcher(), old_key, response_headers, key_, cleanup_);
52
28
  } else {
53
28
    cleanup_ = cache_->maybeStartWritingEntry(key_);
54
28
  }
55
33
  if (!cleanup_) {
56
    // No error for this cancel, someone else just got there first.
57
1
    cancelInsert();
58
1
    return;
59
1
  }
60
32
  cache_file_header_proto_ = makeCacheFileHeaderProto(key_, response_headers, metadata);
61
32
  end_stream_after_headers_ = end_stream;
62
32
  createFile();
63
32
}
64

            
65
32
void FileInsertContext::createFile() {
66
32
  ASSERT(dispatcher()->isThreadSafe());
67
32
  ASSERT(!cancel_action_in_flight_);
68
32
  ASSERT(callback_in_flight_ != nullptr);
69
32
  cancel_action_in_flight_ = cache_->asyncFileManager()->createAnonymousFile(
70
32
      dispatcher(), cache_->cachePath(), [this](absl::StatusOr<AsyncFileHandle> open_result) {
71
31
        cancel_action_in_flight_ = nullptr;
72
31
        if (!open_result.ok()) {
73
4
          cancelInsert("failed to create anonymous file");
74
4
          return;
75
4
        }
76
27
        file_handle_ = std::move(open_result.value());
77
27
        writeEmptyHeaderBlock();
78
27
      });
79
32
}
80

            
81
27
void FileInsertContext::writeEmptyHeaderBlock() {
82
27
  ASSERT(dispatcher()->isThreadSafe());
83
27
  ASSERT(!cancel_action_in_flight_);
84
27
  ASSERT(callback_in_flight_ != nullptr);
85
27
  Buffer::OwnedImpl unset_header;
86
27
  header_block_.serializeToBuffer(unset_header);
87
  // Write an empty header block.
88
27
  auto queued = file_handle_->write(
89
27
      dispatcher(), unset_header, 0, [this](absl::StatusOr<size_t> write_result) {
90
27
        cancel_action_in_flight_ = nullptr;
91
27
        if (!write_result.ok() || write_result.value() != CacheFileFixedBlock::size()) {
92
1
          cancelInsert(
93
1
              writeFailureMessage("empty header block", write_result, CacheFileFixedBlock::size()));
94
1
          return;
95
1
        }
96
26
        writeHeaderProto();
97
26
      });
98
27
  ASSERT(queued.ok(), queued.status().ToString());
99
27
  cancel_action_in_flight_ = std::move(queued.value());
100
27
}
101

            
102
54
void FileInsertContext::succeedCurrentAction() {
103
54
  ASSERT(!cancel_action_in_flight_);
104
54
  ASSERT(callback_in_flight_ != nullptr);
105
54
  auto cb = std::move(callback_in_flight_);
106
54
  callback_in_flight_ = nullptr;
107
54
  cb(true);
108
54
}
109

            
110
26
void FileInsertContext::writeHeaderProto() {
111
26
  ASSERT(dispatcher()->isThreadSafe());
112
26
  ASSERT(!cancel_action_in_flight_);
113
26
  ASSERT(callback_in_flight_ != nullptr);
114
26
  auto buf = bufferFromProto(cache_file_header_proto_);
115
26
  auto sz = buf.length();
116
26
  auto queued =
117
26
      file_handle_->write(dispatcher(), buf, header_block_.offsetToHeaders(),
118
26
                          [this, sz](absl::StatusOr<size_t> write_result) {
119
26
                            cancel_action_in_flight_ = nullptr;
120
26
                            if (!write_result.ok() || write_result.value() != sz) {
121
1
                              cancelInsert(writeFailureMessage("headers", write_result, sz));
122
1
                              return;
123
1
                            }
124
25
                            header_block_.setHeadersSize(sz);
125
25
                            if (end_stream_after_headers_) {
126
2
                              commit();
127
2
                              return;
128
2
                            }
129
23
                            succeedCurrentAction();
130
23
                          });
131
26
  ASSERT(queued.ok(), queued.status().ToString());
132
26
  cancel_action_in_flight_ = std::move(queued.value());
133
26
}
134

            
135
void FileInsertContext::insertBody(const Buffer::Instance& fragment,
136
25
                                   InsertCallback ready_for_next_fragment, bool end_stream) {
137
25
  ASSERT(dispatcher()->isThreadSafe());
138
25
  ASSERT(!cancel_action_in_flight_, "should be no actions in flight when receiving new data");
139
25
  ASSERT(!callback_in_flight_);
140
25
  if (!cleanup_) {
141
    // Already cancelled, do nothing, return failure.
142
1
    std::move(ready_for_next_fragment)(false);
143
1
    return;
144
1
  }
145
24
  callback_in_flight_ = std::move(ready_for_next_fragment);
146
24
  size_t sz = fragment.length();
147
24
  Buffer::OwnedImpl consumable_fragment(fragment);
148
24
  auto queued = file_handle_->write(
149
24
      dispatcher(), consumable_fragment, header_block_.offsetToBody() + header_block_.bodySize(),
150
24
      [this, sz, end_stream](absl::StatusOr<size_t> write_result) {
151
24
        cancel_action_in_flight_ = nullptr;
152
24
        if (!write_result.ok() || write_result.value() != sz) {
153
1
          cancelInsert(writeFailureMessage("body chunk", write_result, sz));
154
1
          return;
155
1
        }
156
23
        header_block_.setBodySize(header_block_.bodySize() + sz);
157
23
        if (end_stream) {
158
13
          commit();
159
13
        } else {
160
10
          succeedCurrentAction();
161
10
        }
162
23
      });
163
24
  ASSERT(queued.ok(), queued.status().ToString());
164
24
  cancel_action_in_flight_ = std::move(queued.value());
165
24
}
166

            
167
void FileInsertContext::insertTrailers(const Http::ResponseTrailerMap& trailers,
168
10
                                       InsertCallback insert_complete) {
169
10
  ASSERT(dispatcher()->isThreadSafe());
170
10
  ASSERT(!cancel_action_in_flight_, "should be no actions in flight when receiving new data");
171
10
  ASSERT(!callback_in_flight_);
172
10
  if (!cleanup_) {
173
    // Already cancelled, do nothing, return failure.
174
1
    std::move(insert_complete)(false);
175
1
    return;
176
1
  }
177
9
  callback_in_flight_ = std::move(insert_complete);
178
9
  CacheFileTrailer file_trailer = makeCacheFileTrailerProto(trailers);
179
9
  Buffer::OwnedImpl consumable_buffer = bufferFromProto(file_trailer);
180
9
  size_t sz = consumable_buffer.length();
181
9
  auto queued =
182
9
      file_handle_->write(dispatcher(), consumable_buffer, header_block_.offsetToTrailers(),
183
9
                          [this, sz](absl::StatusOr<size_t> write_result) {
184
9
                            cancel_action_in_flight_ = nullptr;
185
9
                            if (!write_result.ok() || write_result.value() != sz) {
186
1
                              cancelInsert(writeFailureMessage("trailer chunk", write_result, sz));
187
1
                              return;
188
1
                            }
189
8
                            header_block_.setTrailersSize(sz);
190
8
                            commit();
191
8
                          });
192
9
  ASSERT(queued.ok(), queued.status().ToString());
193
9
  cancel_action_in_flight_ = std::move(queued.value());
194
9
}
195

            
196
34
void FileInsertContext::onDestroy() {
197
34
  lookup_context_->onDestroy();
198
34
  cancelInsert("InsertContext destroyed prematurely");
199
34
}
200

            
201
23
void FileInsertContext::commit() {
202
23
  ASSERT(dispatcher()->isThreadSafe());
203
23
  ASSERT(!cancel_action_in_flight_);
204
23
  ASSERT(callback_in_flight_ != nullptr);
205
  // Write the file header block now that we know the sizes of the pieces.
206
23
  Buffer::OwnedImpl block_buffer;
207
23
  header_block_.serializeToBuffer(block_buffer);
208
23
  auto queued = file_handle_->write(
209
23
      dispatcher(), block_buffer, 0, [this](absl::StatusOr<size_t> write_result) {
210
23
        cancel_action_in_flight_ = nullptr;
211
23
        if (!write_result.ok() || write_result.value() != CacheFileFixedBlock::size()) {
212
1
          cancelInsert(
213
1
              writeFailureMessage("header block", write_result, CacheFileFixedBlock::size()));
214
1
          return;
215
1
        }
216
22
        commitMeasureExisting();
217
22
      });
218
23
  ASSERT(queued.ok(), queued.status().ToString());
219
23
  cancel_action_in_flight_ = std::move(queued.value());
220
23
}
221

            
222
67
std::string FileInsertContext::pathAndFilename() {
223
67
  return absl::StrCat(cache_->cachePath(), cache_->generateFilename(key_));
224
67
}
225

            
226
22
void FileInsertContext::commitMeasureExisting() {
227
22
  ASSERT(!cancel_action_in_flight_);
228
22
  ASSERT(callback_in_flight_ != nullptr);
229
22
  cancel_action_in_flight_ = cache_->asyncFileManager()->stat(
230
22
      dispatcher(), pathAndFilename(), [this](absl::StatusOr<struct stat> stat_result) {
231
22
        cancel_action_in_flight_ = nullptr;
232
22
        if (stat_result.ok()) {
233
2
          commitUnlinkExisting(stat_result.value().st_size);
234
20
        } else {
235
20
          commitUnlinkExisting(0);
236
20
        }
237
22
      });
238
22
}
239

            
240
22
void FileInsertContext::commitUnlinkExisting(size_t file_size) {
241
22
  ASSERT(!cancel_action_in_flight_);
242
22
  ASSERT(callback_in_flight_ != nullptr);
243
22
  cancel_action_in_flight_ = cache_->asyncFileManager()->unlink(
244
22
      dispatcher(), pathAndFilename(), [this, file_size](absl::Status unlink_result) {
245
22
        cancel_action_in_flight_ = nullptr;
246
22
        if (unlink_result.ok()) {
247
3
          cache_->trackFileRemoved(file_size);
248
3
        }
249
22
        commitCreateHardLink();
250
22
      });
251
22
}
252

            
253
22
void FileInsertContext::commitCreateHardLink() {
254
22
  ASSERT(!cancel_action_in_flight_);
255
22
  ASSERT(callback_in_flight_ != nullptr);
256
22
  auto queued = file_handle_->createHardLink(
257
22
      dispatcher(), pathAndFilename(), [this](absl::Status link_result) {
258
22
        cancel_action_in_flight_ = nullptr;
259
22
        if (!link_result.ok()) {
260
1
          cancelInsert(absl::StrCat("failed to link file (", link_result.ToString(),
261
1
                                    "): ", pathAndFilename()));
262
1
          return;
263
1
        }
264
21
        ENVOY_LOG(debug, "created cache file {}", cache_->generateFilename(key_));
265
21
        succeedCurrentAction();
266
21
        uint64_t file_size = header_block_.offsetToTrailers() + header_block_.trailerSize();
267
21
        cache_->trackFileAdded(file_size);
268
        // By clearing cleanup before destructor, we prevent logging an error.
269
21
        cleanup_ = nullptr;
270
21
      });
271
22
  ASSERT(queued.ok(), queued.status().ToString());
272
22
  cancel_action_in_flight_ = std::move(queued.value());
273
22
}
274

            
275
46
void FileInsertContext::cancelInsert(absl::string_view error) {
276
46
  if (cancel_action_in_flight_) {
277
1
    cancel_action_in_flight_();
278
1
    cancel_action_in_flight_ = nullptr;
279
1
  }
280
46
  if (callback_in_flight_) {
281
13
    callback_in_flight_(false);
282
13
    callback_in_flight_ = nullptr;
283
13
  }
284
46
  if (cleanup_) {
285
11
    cleanup_ = nullptr;
286
11
    if (!error.empty()) {
287
11
      ENVOY_LOG(warn, "FileSystemHttpCache: {}", error);
288
11
    }
289
11
  }
290
46
  if (file_handle_) {
291
27
    auto close_status = file_handle_->close(nullptr, [](absl::Status) {});
292
27
    ASSERT(close_status.ok());
293
27
    file_handle_ = nullptr;
294
27
  }
295
46
}
296

            
297
212
Event::Dispatcher* FileInsertContext::dispatcher() const { return lookup_context_->dispatcher(); }
298

            
299
} // namespace FileSystemHttpCache
300
} // namespace Cache
301
} // namespace HttpFilters
302
} // namespace Extensions
303
} // namespace Envoy