Line data Source code
1 : #include "source/extensions/http/cache/file_system_http_cache/insert_context.h"
2 :
3 : #include "source/common/protobuf/utility.h"
4 : #include "source/extensions/http/cache/file_system_http_cache/cache_file_header_proto_util.h"
5 : #include "source/extensions/http/cache/file_system_http_cache/file_system_http_cache.h"
6 : #include "source/extensions/http/cache/file_system_http_cache/lookup_context.h"
7 :
8 : namespace Envoy {
9 : namespace Extensions {
10 : namespace HttpFilters {
11 : namespace Cache {
12 : namespace FileSystemHttpCache {
13 :
14 : namespace {
15 : std::string writeFailureMessage(absl::string_view kind, absl::StatusOr<size_t> result,
16 0 : size_t wanted) {
17 0 : if (result.ok()) {
18 0 : return fmt::format("incomplete write of {} - wrote {}, expected {}", kind, result.value(),
19 0 : wanted);
20 0 : } else {
21 0 : return fmt::format("write failed of {}: {}", kind, result.status());
22 0 : }
23 0 : }
24 : } // namespace
25 :
26 : FileInsertContext::FileInsertContext(std::shared_ptr<FileSystemHttpCache> cache,
27 : std::unique_ptr<FileLookupContext> lookup_context)
28 : : lookup_context_(std::move(lookup_context)), key_(lookup_context_->lookup().key()),
29 0 : cache_(std::move(cache)) {}
30 :
31 : void FileInsertContext::insertHeaders(const Http::ResponseHeaderMap& response_headers,
32 : const ResponseMetadata& metadata,
33 0 : InsertCallback insert_complete, bool end_stream) {
34 0 : absl::MutexLock lock(&mu_);
35 0 : callback_in_flight_ = insert_complete;
36 0 : const VaryAllowList& vary_allow_list = lookup_context_->lookup().varyAllowList();
37 0 : const Http::RequestHeaderMap& request_headers = lookup_context_->lookup().requestHeaders();
38 0 : if (VaryHeaderUtils::hasVary(response_headers)) {
39 0 : auto vary_header_values = VaryHeaderUtils::getVaryValues(response_headers);
40 0 : Key old_key = key_;
41 0 : const auto vary_identifier =
42 0 : VaryHeaderUtils::createVaryIdentifier(vary_allow_list, vary_header_values, request_headers);
43 0 : if (vary_identifier.has_value()) {
44 0 : key_.add_custom_fields(vary_identifier.value());
45 0 : } else {
46 : // No error for this cancel, it's just an entry that's ineligible for insertion.
47 0 : cancelInsert();
48 0 : return;
49 0 : }
50 0 : cleanup_ = cache_->setCacheEntryToVary(old_key, response_headers, key_, cleanup_);
51 0 : } else {
52 0 : cleanup_ = cache_->maybeStartWritingEntry(key_);
53 0 : }
54 0 : if (!cleanup_) {
55 : // No error for this cancel, someone else just got there first.
56 0 : cancelInsert();
57 0 : return;
58 0 : }
59 0 : auto header_proto = makeCacheFileHeaderProto(key_, response_headers, metadata);
60 : // Open the file.
61 0 : cancel_action_in_flight_ = cache_->asyncFileManager()->createAnonymousFile(
62 0 : cache_->cachePath(), [this, end_stream, header_proto,
63 0 : insert_complete](absl::StatusOr<AsyncFileHandle> open_result) {
64 0 : absl::MutexLock lock(&mu_);
65 0 : cancel_action_in_flight_ = nullptr;
66 0 : if (!open_result.ok()) {
67 0 : cancelInsert("failed to create anonymous file");
68 0 : return;
69 0 : }
70 0 : file_handle_ = std::move(open_result.value());
71 0 : Buffer::OwnedImpl unset_header;
72 0 : header_block_.serializeToBuffer(unset_header);
73 : // Write an empty header block.
74 0 : auto queued = file_handle_->write(
75 0 : unset_header, 0, [this, end_stream, header_proto](absl::StatusOr<size_t> write_result) {
76 0 : absl::MutexLock lock(&mu_);
77 0 : cancel_action_in_flight_ = nullptr;
78 0 : if (!write_result.ok() || write_result.value() != CacheFileFixedBlock::size()) {
79 0 : cancelInsert(writeFailureMessage("empty header block", write_result,
80 0 : CacheFileFixedBlock::size()));
81 0 : return;
82 0 : }
83 0 : auto buf = bufferFromProto(header_proto);
84 0 : auto sz = buf.length();
85 0 : auto queued = file_handle_->write(
86 0 : buf, header_block_.offsetToHeaders(),
87 0 : [this, end_stream, sz](absl::StatusOr<size_t> write_result) {
88 0 : absl::MutexLock lock(&mu_);
89 0 : cancel_action_in_flight_ = nullptr;
90 0 : if (!write_result.ok() || write_result.value() != sz) {
91 0 : cancelInsert(writeFailureMessage("headers", write_result, sz));
92 0 : return;
93 0 : }
94 0 : header_block_.setHeadersSize(sz);
95 0 : if (end_stream) {
96 0 : commit(callback_in_flight_);
97 0 : return;
98 0 : }
99 0 : auto cb = callback_in_flight_;
100 0 : callback_in_flight_ = nullptr;
101 0 : cb(true);
102 0 : });
103 0 : ASSERT(queued.ok(), queued.status().ToString());
104 0 : cancel_action_in_flight_ = queued.value();
105 0 : });
106 0 : ASSERT(queued.ok(), queued.status().ToString());
107 0 : cancel_action_in_flight_ = queued.value();
108 0 : });
109 0 : }
110 :
111 : void FileInsertContext::insertBody(const Buffer::Instance& fragment,
112 0 : InsertCallback ready_for_next_fragment, bool end_stream) {
113 0 : absl::MutexLock lock(&mu_);
114 0 : if (!cleanup_) {
115 : // Already cancelled, do nothing, return failure.
116 0 : ready_for_next_fragment(false);
117 0 : return;
118 0 : }
119 0 : ASSERT(!cancel_action_in_flight_, "should be no actions in flight when receiving new data");
120 0 : callback_in_flight_ = ready_for_next_fragment;
121 0 : size_t sz = fragment.length();
122 0 : Buffer::OwnedImpl consumable_fragment(fragment);
123 0 : auto queued = file_handle_->write(
124 0 : consumable_fragment, header_block_.offsetToBody() + header_block_.bodySize(),
125 0 : [this, sz, end_stream](absl::StatusOr<size_t> write_result) {
126 0 : absl::MutexLock lock(&mu_);
127 0 : cancel_action_in_flight_ = nullptr;
128 0 : if (!write_result.ok() || write_result.value() != sz) {
129 0 : cancelInsert(writeFailureMessage("body chunk", write_result, sz));
130 0 : return;
131 0 : }
132 0 : header_block_.setBodySize(header_block_.bodySize() + sz);
133 0 : if (end_stream) {
134 0 : commit(callback_in_flight_);
135 0 : } else {
136 0 : auto cb = callback_in_flight_;
137 0 : callback_in_flight_ = nullptr;
138 0 : cb(true);
139 0 : }
140 0 : });
141 0 : ASSERT(queued.ok(), queued.status().ToString());
142 0 : cancel_action_in_flight_ = queued.value();
143 0 : }
144 :
145 : void FileInsertContext::insertTrailers(const Http::ResponseTrailerMap& trailers,
146 0 : InsertCallback insert_complete) {
147 0 : absl::MutexLock lock(&mu_);
148 0 : if (!cleanup_) {
149 : // Already cancelled, do nothing, return failure.
150 0 : insert_complete(false);
151 0 : return;
152 0 : }
153 0 : ASSERT(!cancel_action_in_flight_, "should be no actions in flight when receiving new data");
154 0 : callback_in_flight_ = insert_complete;
155 0 : CacheFileTrailer file_trailer = makeCacheFileTrailerProto(trailers);
156 0 : Buffer::OwnedImpl consumable_buffer = bufferFromProto(file_trailer);
157 0 : size_t sz = consumable_buffer.length();
158 0 : auto queued =
159 0 : file_handle_->write(consumable_buffer, header_block_.offsetToTrailers(),
160 0 : [this, sz](absl::StatusOr<size_t> write_result) {
161 0 : absl::MutexLock lock(&mu_);
162 0 : cancel_action_in_flight_ = nullptr;
163 0 : if (!write_result.ok() || write_result.value() != sz) {
164 0 : cancelInsert(writeFailureMessage("trailer chunk", write_result, sz));
165 0 : return;
166 0 : }
167 0 : header_block_.setTrailersSize(sz);
168 0 : commit(callback_in_flight_);
169 0 : });
170 0 : ASSERT(queued.ok(), queued.status().ToString());
171 0 : cancel_action_in_flight_ = queued.value();
172 0 : }
173 :
174 0 : void FileInsertContext::onDestroy() {
175 0 : absl::MutexLock lock(&mu_);
176 0 : cancelInsert("InsertContext destroyed prematurely");
177 0 : }
178 :
179 0 : void FileInsertContext::commit(InsertCallback callback) {
180 0 : mu_.AssertHeld();
181 : // Write the file header block now that we know the sizes of the pieces.
182 0 : Buffer::OwnedImpl block_buffer;
183 0 : callback_in_flight_ = callback;
184 0 : header_block_.serializeToBuffer(block_buffer);
185 0 : auto queued = file_handle_->write(block_buffer, 0, [this](absl::StatusOr<size_t> write_result) {
186 0 : absl::MutexLock lock(&mu_);
187 0 : cancel_action_in_flight_ = nullptr;
188 0 : if (!write_result.ok() || write_result.value() != CacheFileFixedBlock::size()) {
189 0 : cancelInsert(writeFailureMessage("header block", write_result, CacheFileFixedBlock::size()));
190 0 : return;
191 0 : }
192 : // Unlink any existing cache entry with this filename.
193 0 : cancel_action_in_flight_ = cache_->asyncFileManager()->stat(
194 0 : absl::StrCat(cache_->cachePath(), cache_->generateFilename(key_)),
195 0 : [this](absl::StatusOr<struct stat> stat_result) {
196 0 : absl::MutexLock lock(&mu_);
197 0 : cancel_action_in_flight_ = nullptr;
198 0 : size_t file_size = 0;
199 0 : if (stat_result.ok()) {
200 0 : file_size = stat_result.value().st_size;
201 0 : }
202 0 : cancel_action_in_flight_ = cache_->asyncFileManager()->unlink(
203 0 : absl::StrCat(cache_->cachePath(), cache_->generateFilename(key_)),
204 0 : [this, file_size](absl::Status unlink_result) {
205 0 : if (unlink_result.ok()) {
206 0 : cache_->trackFileRemoved(file_size);
207 0 : }
208 : // We can ignore failure of unlink - the file may or may not have previously
209 : // existed.
210 0 : absl::MutexLock lock(&mu_);
211 0 : cancel_action_in_flight_ = nullptr;
212 : // Link the file to its filename.
213 0 : auto queued = file_handle_->createHardLink(
214 0 : absl::StrCat(cache_->cachePath(), cache_->generateFilename(key_)),
215 0 : [this](absl::Status link_result) {
216 0 : absl::MutexLock lock(&mu_);
217 0 : cancel_action_in_flight_ = nullptr;
218 0 : if (!link_result.ok()) {
219 0 : cancelInsert(absl::StrCat("failed to link file (", link_result.ToString(),
220 0 : "): ", cache_->cachePath(),
221 0 : cache_->generateFilename(key_)));
222 0 : return;
223 0 : }
224 0 : ENVOY_LOG(debug, "created cache file {}", cache_->generateFilename(key_));
225 0 : callback_in_flight_(true);
226 0 : callback_in_flight_ = nullptr;
227 0 : uint64_t file_size =
228 0 : header_block_.offsetToTrailers() + header_block_.trailerSize();
229 0 : cache_->trackFileAdded(file_size);
230 : // By clearing cleanup before destructor, we prevent logging an error.
231 0 : cleanup_ = nullptr;
232 0 : });
233 0 : ASSERT(queued.ok(), queued.status().ToString());
234 0 : cancel_action_in_flight_ = queued.value();
235 0 : });
236 0 : });
237 0 : });
238 0 : ASSERT(queued.ok(), queued.status().ToString());
239 0 : cancel_action_in_flight_ = queued.value();
240 0 : }
241 :
242 0 : void FileInsertContext::cancelInsert(absl::string_view error) {
243 0 : mu_.AssertHeld();
244 0 : if (cancel_action_in_flight_) {
245 0 : cancel_action_in_flight_();
246 0 : cancel_action_in_flight_ = nullptr;
247 0 : }
248 0 : if (callback_in_flight_) {
249 0 : callback_in_flight_(false);
250 0 : callback_in_flight_ = nullptr;
251 0 : }
252 0 : if (cleanup_) {
253 0 : cleanup_ = nullptr;
254 0 : if (!error.empty()) {
255 0 : ENVOY_LOG(warn, "FileSystemHttpCache: {}", error);
256 0 : }
257 0 : }
258 0 : if (file_handle_) {
259 0 : auto close_status = file_handle_->close([](absl::Status) {});
260 0 : ASSERT(close_status.ok());
261 0 : file_handle_ = nullptr;
262 0 : }
263 0 : }
264 :
265 : } // namespace FileSystemHttpCache
266 : } // namespace Cache
267 : } // namespace HttpFilters
268 : } // namespace Extensions
269 : } // namespace Envoy
|