LCOV - code coverage report
Current view: top level - source/extensions/http/cache/file_system_http_cache - insert_context.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 221 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 18 0.0 %

          Line data    Source code
       1             : #include "source/extensions/http/cache/file_system_http_cache/insert_context.h"
       2             : 
       3             : #include "source/common/protobuf/utility.h"
       4             : #include "source/extensions/http/cache/file_system_http_cache/cache_file_header_proto_util.h"
       5             : #include "source/extensions/http/cache/file_system_http_cache/file_system_http_cache.h"
       6             : #include "source/extensions/http/cache/file_system_http_cache/lookup_context.h"
       7             : 
       8             : namespace Envoy {
       9             : namespace Extensions {
      10             : namespace HttpFilters {
      11             : namespace Cache {
      12             : namespace FileSystemHttpCache {
      13             : 
      14             : namespace {
      15             : std::string writeFailureMessage(absl::string_view kind, absl::StatusOr<size_t> result,
      16           0 :                                 size_t wanted) {
      17           0 :   if (result.ok()) {
      18           0 :     return fmt::format("incomplete write of {} - wrote {}, expected {}", kind, result.value(),
      19           0 :                        wanted);
      20           0 :   } else {
      21           0 :     return fmt::format("write failed of {}: {}", kind, result.status());
      22           0 :   }
      23           0 : }
      24             : } // namespace
      25             : 
      26             : FileInsertContext::FileInsertContext(std::shared_ptr<FileSystemHttpCache> cache,
      27             :                                      std::unique_ptr<FileLookupContext> lookup_context)
      28             :     : lookup_context_(std::move(lookup_context)), key_(lookup_context_->lookup().key()),
      29           0 :       cache_(std::move(cache)) {}
      30             : 
      31             : void FileInsertContext::insertHeaders(const Http::ResponseHeaderMap& response_headers,
      32             :                                       const ResponseMetadata& metadata,
      33           0 :                                       InsertCallback insert_complete, bool end_stream) {
      34           0 :   absl::MutexLock lock(&mu_);
      35           0 :   callback_in_flight_ = insert_complete;
      36           0 :   const VaryAllowList& vary_allow_list = lookup_context_->lookup().varyAllowList();
      37           0 :   const Http::RequestHeaderMap& request_headers = lookup_context_->lookup().requestHeaders();
      38           0 :   if (VaryHeaderUtils::hasVary(response_headers)) {
      39           0 :     auto vary_header_values = VaryHeaderUtils::getVaryValues(response_headers);
      40           0 :     Key old_key = key_;
      41           0 :     const auto vary_identifier =
      42           0 :         VaryHeaderUtils::createVaryIdentifier(vary_allow_list, vary_header_values, request_headers);
      43           0 :     if (vary_identifier.has_value()) {
      44           0 :       key_.add_custom_fields(vary_identifier.value());
      45           0 :     } else {
      46             :       // No error for this cancel, it's just an entry that's ineligible for insertion.
      47           0 :       cancelInsert();
      48           0 :       return;
      49           0 :     }
      50           0 :     cleanup_ = cache_->setCacheEntryToVary(old_key, response_headers, key_, cleanup_);
      51           0 :   } else {
      52           0 :     cleanup_ = cache_->maybeStartWritingEntry(key_);
      53           0 :   }
      54           0 :   if (!cleanup_) {
      55             :     // No error for this cancel, someone else just got there first.
      56           0 :     cancelInsert();
      57           0 :     return;
      58           0 :   }
      59           0 :   auto header_proto = makeCacheFileHeaderProto(key_, response_headers, metadata);
      60             :   // Open the file.
      61           0 :   cancel_action_in_flight_ = cache_->asyncFileManager()->createAnonymousFile(
      62           0 :       cache_->cachePath(), [this, end_stream, header_proto,
      63           0 :                             insert_complete](absl::StatusOr<AsyncFileHandle> open_result) {
      64           0 :         absl::MutexLock lock(&mu_);
      65           0 :         cancel_action_in_flight_ = nullptr;
      66           0 :         if (!open_result.ok()) {
      67           0 :           cancelInsert("failed to create anonymous file");
      68           0 :           return;
      69           0 :         }
      70           0 :         file_handle_ = std::move(open_result.value());
      71           0 :         Buffer::OwnedImpl unset_header;
      72           0 :         header_block_.serializeToBuffer(unset_header);
      73             :         // Write an empty header block.
      74           0 :         auto queued = file_handle_->write(
      75           0 :             unset_header, 0, [this, end_stream, header_proto](absl::StatusOr<size_t> write_result) {
      76           0 :               absl::MutexLock lock(&mu_);
      77           0 :               cancel_action_in_flight_ = nullptr;
      78           0 :               if (!write_result.ok() || write_result.value() != CacheFileFixedBlock::size()) {
      79           0 :                 cancelInsert(writeFailureMessage("empty header block", write_result,
      80           0 :                                                  CacheFileFixedBlock::size()));
      81           0 :                 return;
      82           0 :               }
      83           0 :               auto buf = bufferFromProto(header_proto);
      84           0 :               auto sz = buf.length();
      85           0 :               auto queued = file_handle_->write(
      86           0 :                   buf, header_block_.offsetToHeaders(),
      87           0 :                   [this, end_stream, sz](absl::StatusOr<size_t> write_result) {
      88           0 :                     absl::MutexLock lock(&mu_);
      89           0 :                     cancel_action_in_flight_ = nullptr;
      90           0 :                     if (!write_result.ok() || write_result.value() != sz) {
      91           0 :                       cancelInsert(writeFailureMessage("headers", write_result, sz));
      92           0 :                       return;
      93           0 :                     }
      94           0 :                     header_block_.setHeadersSize(sz);
      95           0 :                     if (end_stream) {
      96           0 :                       commit(callback_in_flight_);
      97           0 :                       return;
      98           0 :                     }
      99           0 :                     auto cb = callback_in_flight_;
     100           0 :                     callback_in_flight_ = nullptr;
     101           0 :                     cb(true);
     102           0 :                   });
     103           0 :               ASSERT(queued.ok(), queued.status().ToString());
     104           0 :               cancel_action_in_flight_ = queued.value();
     105           0 :             });
     106           0 :         ASSERT(queued.ok(), queued.status().ToString());
     107           0 :         cancel_action_in_flight_ = queued.value();
     108           0 :       });
     109           0 : }
     110             : 
     111             : void FileInsertContext::insertBody(const Buffer::Instance& fragment,
     112           0 :                                    InsertCallback ready_for_next_fragment, bool end_stream) {
     113           0 :   absl::MutexLock lock(&mu_);
     114           0 :   if (!cleanup_) {
     115             :     // Already cancelled, do nothing, return failure.
     116           0 :     ready_for_next_fragment(false);
     117           0 :     return;
     118           0 :   }
     119           0 :   ASSERT(!cancel_action_in_flight_, "should be no actions in flight when receiving new data");
     120           0 :   callback_in_flight_ = ready_for_next_fragment;
     121           0 :   size_t sz = fragment.length();
     122           0 :   Buffer::OwnedImpl consumable_fragment(fragment);
     123           0 :   auto queued = file_handle_->write(
     124           0 :       consumable_fragment, header_block_.offsetToBody() + header_block_.bodySize(),
     125           0 :       [this, sz, end_stream](absl::StatusOr<size_t> write_result) {
     126           0 :         absl::MutexLock lock(&mu_);
     127           0 :         cancel_action_in_flight_ = nullptr;
     128           0 :         if (!write_result.ok() || write_result.value() != sz) {
     129           0 :           cancelInsert(writeFailureMessage("body chunk", write_result, sz));
     130           0 :           return;
     131           0 :         }
     132           0 :         header_block_.setBodySize(header_block_.bodySize() + sz);
     133           0 :         if (end_stream) {
     134           0 :           commit(callback_in_flight_);
     135           0 :         } else {
     136           0 :           auto cb = callback_in_flight_;
     137           0 :           callback_in_flight_ = nullptr;
     138           0 :           cb(true);
     139           0 :         }
     140           0 :       });
     141           0 :   ASSERT(queued.ok(), queued.status().ToString());
     142           0 :   cancel_action_in_flight_ = queued.value();
     143           0 : }
     144             : 
     145             : void FileInsertContext::insertTrailers(const Http::ResponseTrailerMap& trailers,
     146           0 :                                        InsertCallback insert_complete) {
     147           0 :   absl::MutexLock lock(&mu_);
     148           0 :   if (!cleanup_) {
     149             :     // Already cancelled, do nothing, return failure.
     150           0 :     insert_complete(false);
     151           0 :     return;
     152           0 :   }
     153           0 :   ASSERT(!cancel_action_in_flight_, "should be no actions in flight when receiving new data");
     154           0 :   callback_in_flight_ = insert_complete;
     155           0 :   CacheFileTrailer file_trailer = makeCacheFileTrailerProto(trailers);
     156           0 :   Buffer::OwnedImpl consumable_buffer = bufferFromProto(file_trailer);
     157           0 :   size_t sz = consumable_buffer.length();
     158           0 :   auto queued =
     159           0 :       file_handle_->write(consumable_buffer, header_block_.offsetToTrailers(),
     160           0 :                           [this, sz](absl::StatusOr<size_t> write_result) {
     161           0 :                             absl::MutexLock lock(&mu_);
     162           0 :                             cancel_action_in_flight_ = nullptr;
     163           0 :                             if (!write_result.ok() || write_result.value() != sz) {
     164           0 :                               cancelInsert(writeFailureMessage("trailer chunk", write_result, sz));
     165           0 :                               return;
     166           0 :                             }
     167           0 :                             header_block_.setTrailersSize(sz);
     168           0 :                             commit(callback_in_flight_);
     169           0 :                           });
     170           0 :   ASSERT(queued.ok(), queued.status().ToString());
     171           0 :   cancel_action_in_flight_ = queued.value();
     172           0 : }
     173             : 
     174           0 : void FileInsertContext::onDestroy() {
     175           0 :   absl::MutexLock lock(&mu_);
     176           0 :   cancelInsert("InsertContext destroyed prematurely");
     177           0 : }
     178             : 
     179           0 : void FileInsertContext::commit(InsertCallback callback) {
     180           0 :   mu_.AssertHeld();
     181             :   // Write the file header block now that we know the sizes of the pieces.
     182           0 :   Buffer::OwnedImpl block_buffer;
     183           0 :   callback_in_flight_ = callback;
     184           0 :   header_block_.serializeToBuffer(block_buffer);
     185           0 :   auto queued = file_handle_->write(block_buffer, 0, [this](absl::StatusOr<size_t> write_result) {
     186           0 :     absl::MutexLock lock(&mu_);
     187           0 :     cancel_action_in_flight_ = nullptr;
     188           0 :     if (!write_result.ok() || write_result.value() != CacheFileFixedBlock::size()) {
     189           0 :       cancelInsert(writeFailureMessage("header block", write_result, CacheFileFixedBlock::size()));
     190           0 :       return;
     191           0 :     }
     192             :     // Unlink any existing cache entry with this filename.
     193           0 :     cancel_action_in_flight_ = cache_->asyncFileManager()->stat(
     194           0 :         absl::StrCat(cache_->cachePath(), cache_->generateFilename(key_)),
     195           0 :         [this](absl::StatusOr<struct stat> stat_result) {
     196           0 :           absl::MutexLock lock(&mu_);
     197           0 :           cancel_action_in_flight_ = nullptr;
     198           0 :           size_t file_size = 0;
     199           0 :           if (stat_result.ok()) {
     200           0 :             file_size = stat_result.value().st_size;
     201           0 :           }
     202           0 :           cancel_action_in_flight_ = cache_->asyncFileManager()->unlink(
     203           0 :               absl::StrCat(cache_->cachePath(), cache_->generateFilename(key_)),
     204           0 :               [this, file_size](absl::Status unlink_result) {
     205           0 :                 if (unlink_result.ok()) {
     206           0 :                   cache_->trackFileRemoved(file_size);
     207           0 :                 }
     208             :                 // We can ignore failure of unlink - the file may or may not have previously
     209             :                 // existed.
     210           0 :                 absl::MutexLock lock(&mu_);
     211           0 :                 cancel_action_in_flight_ = nullptr;
     212             :                 // Link the file to its filename.
     213           0 :                 auto queued = file_handle_->createHardLink(
     214           0 :                     absl::StrCat(cache_->cachePath(), cache_->generateFilename(key_)),
     215           0 :                     [this](absl::Status link_result) {
     216           0 :                       absl::MutexLock lock(&mu_);
     217           0 :                       cancel_action_in_flight_ = nullptr;
     218           0 :                       if (!link_result.ok()) {
     219           0 :                         cancelInsert(absl::StrCat("failed to link file (", link_result.ToString(),
     220           0 :                                                   "): ", cache_->cachePath(),
     221           0 :                                                   cache_->generateFilename(key_)));
     222           0 :                         return;
     223           0 :                       }
     224           0 :                       ENVOY_LOG(debug, "created cache file {}", cache_->generateFilename(key_));
     225           0 :                       callback_in_flight_(true);
     226           0 :                       callback_in_flight_ = nullptr;
     227           0 :                       uint64_t file_size =
     228           0 :                           header_block_.offsetToTrailers() + header_block_.trailerSize();
     229           0 :                       cache_->trackFileAdded(file_size);
     230             :                       // By clearing cleanup before destructor, we prevent logging an error.
     231           0 :                       cleanup_ = nullptr;
     232           0 :                     });
     233           0 :                 ASSERT(queued.ok(), queued.status().ToString());
     234           0 :                 cancel_action_in_flight_ = queued.value();
     235           0 :               });
     236           0 :         });
     237           0 :   });
     238           0 :   ASSERT(queued.ok(), queued.status().ToString());
     239           0 :   cancel_action_in_flight_ = queued.value();
     240           0 : }
     241             : 
     242           0 : void FileInsertContext::cancelInsert(absl::string_view error) {
     243           0 :   mu_.AssertHeld();
     244           0 :   if (cancel_action_in_flight_) {
     245           0 :     cancel_action_in_flight_();
     246           0 :     cancel_action_in_flight_ = nullptr;
     247           0 :   }
     248           0 :   if (callback_in_flight_) {
     249           0 :     callback_in_flight_(false);
     250           0 :     callback_in_flight_ = nullptr;
     251           0 :   }
     252           0 :   if (cleanup_) {
     253           0 :     cleanup_ = nullptr;
     254           0 :     if (!error.empty()) {
     255           0 :       ENVOY_LOG(warn, "FileSystemHttpCache: {}", error);
     256           0 :     }
     257           0 :   }
     258           0 :   if (file_handle_) {
     259           0 :     auto close_status = file_handle_->close([](absl::Status) {});
     260           0 :     ASSERT(close_status.ok());
     261           0 :     file_handle_ = nullptr;
     262           0 :   }
     263           0 : }
     264             : 
     265             : } // namespace FileSystemHttpCache
     266             : } // namespace Cache
     267             : } // namespace HttpFilters
     268             : } // namespace Extensions
     269             : } // namespace Envoy

Generated by: LCOV version 1.15