1
#include "source/extensions/http/cache/file_system_http_cache/lookup_context.h"
2

            
3
#include "source/extensions/http/cache/file_system_http_cache/cache_file_fixed_block.h"
4
#include "source/extensions/http/cache/file_system_http_cache/cache_file_header.pb.h"
5
#include "source/extensions/http/cache/file_system_http_cache/cache_file_header_proto_util.h"
6
#include "source/extensions/http/cache/file_system_http_cache/file_system_http_cache.h"
7

            
8
namespace Envoy {
9
namespace Extensions {
10
namespace HttpFilters {
11
namespace Cache {
12
namespace FileSystemHttpCache {
13

            
14
84
std::string FileLookupContext::filepath() {
15
84
  return absl::StrCat(cache_.cachePath(), cache_.generateFilename(key_));
16
84
}
17

            
18
35
bool FileLookupContext::workInProgress() const { return cache_.workInProgress(key()); }
19

            
20
67
void FileLookupContext::getHeaders(LookupHeadersCallback&& cb) {
21
67
  lookup_headers_callback_ = std::move(cb);
22
67
  tryOpenCacheFile();
23
67
}
24

            
25
74
void FileLookupContext::tryOpenCacheFile() {
26
74
  cancel_action_in_flight_ = cache_.asyncFileManager()->openExistingFile(
27
74
      dispatcher(), filepath(), Common::AsyncFiles::AsyncFileManager::Mode::ReadOnly,
28
74
      [this](absl::StatusOr<AsyncFileHandle> open_result) {
29
72
        cancel_action_in_flight_ = nullptr;
30
72
        if (!open_result.ok()) {
31
27
          return doCacheMiss();
32
27
        }
33
45
        ASSERT(!file_handle_);
34
45
        file_handle_ = std::move(open_result.value());
35
45
        getHeaderBlockFromFile();
36
45
      });
37
74
}
38

            
39
31
void FileLookupContext::doCacheMiss() {
40
31
  cache_.stats().cache_miss_.inc();
41
31
  std::move(lookup_headers_callback_)(LookupResult{}, /* end_stream (ignored) = */ false);
42
31
  lookup_headers_callback_ = nullptr;
43
31
}
44

            
45
3
void FileLookupContext::doCacheEntryInvalid() {
46
3
  invalidateCacheEntry();
47
3
  doCacheMiss();
48
3
}
49

            
50
45
void FileLookupContext::getHeaderBlockFromFile() {
51
45
  ASSERT(dispatcher()->isThreadSafe());
52
45
  auto queued = file_handle_->read(
53
45
      dispatcher(), 0, CacheFileFixedBlock::size(),
54
45
      [this](absl::StatusOr<Buffer::InstancePtr> read_result) {
55
45
        ASSERT(dispatcher()->isThreadSafe());
56
45
        cancel_action_in_flight_ = nullptr;
57
45
        if (!read_result.ok() || read_result.value()->length() != CacheFileFixedBlock::size()) {
58
1
          return doCacheEntryInvalid();
59
1
        }
60
44
        header_block_.populateFromStringView(read_result.value()->toString());
61
44
        if (!header_block_.isValid()) {
62
1
          return doCacheEntryInvalid();
63
1
        }
64
43
        getHeadersFromFile();
65
43
      });
66
45
  ASSERT(queued.ok(), queued.status().ToString());
67
45
  cancel_action_in_flight_ = std::move(queued.value());
68
45
}
69

            
70
43
void FileLookupContext::getHeadersFromFile() {
71
43
  ASSERT(dispatcher()->isThreadSafe());
72
43
  auto queued = file_handle_->read(
73
43
      dispatcher(), header_block_.offsetToHeaders(), header_block_.headerSize(),
74
43
      [this](absl::StatusOr<Buffer::InstancePtr> read_result) {
75
43
        ASSERT(dispatcher()->isThreadSafe());
76
43
        cancel_action_in_flight_ = nullptr;
77
43
        if (!read_result.ok() || read_result.value()->length() != header_block_.headerSize()) {
78
1
          return doCacheEntryInvalid();
79
1
        }
80
42
        auto header_proto = makeCacheFileHeaderProto(*read_result.value());
81
42
        if (header_proto.headers_size() == 1 && header_proto.headers().at(0).key() == "vary") {
82
8
          auto maybe_vary_key = cache_.makeVaryKey(
83
8
              key_, lookup().varyAllowList(),
84
8
              absl::StrSplit(header_proto.headers().at(0).value(), ','), lookup().requestHeaders());
85
8
          if (!maybe_vary_key.has_value()) {
86
1
            return doCacheMiss();
87
1
          }
88
7
          key_ = maybe_vary_key.value();
89
7
          return closeFileAndGetHeadersAgainWithNewVaryKey();
90
8
        }
91
34
        cache_.stats().cache_hit_.inc();
92
34
        std::move(lookup_headers_callback_)(
93
34
            lookup().makeLookupResult(headersFromHeaderProto(header_proto),
94
34
                                      metadataFromHeaderProto(header_proto),
95
34
                                      header_block_.bodySize()),
96
34
            /* end_stream = */ header_block_.trailerSize() == 0 && header_block_.bodySize() == 0);
97
34
      });
98
43
  ASSERT(queued.ok(), queued.status().ToString());
99
43
  cancel_action_in_flight_ = std::move(queued.value());
100
43
}
101

            
102
7
void FileLookupContext::closeFileAndGetHeadersAgainWithNewVaryKey() {
103
7
  ASSERT(dispatcher()->isThreadSafe());
104
7
  auto queued = file_handle_->close(dispatcher(), [this](absl::Status) {
105
7
    ASSERT(dispatcher()->isThreadSafe());
106
7
    file_handle_ = nullptr;
107
    // Restart with the new key.
108
7
    return tryOpenCacheFile();
109
7
  });
110
7
  ASSERT(queued.ok(), queued.status().ToString());
111
7
  cancel_action_in_flight_ = std::move(queued.value());
112
7
}
113

            
114
5
void FileLookupContext::invalidateCacheEntry() {
115
5
  ASSERT(dispatcher()->isThreadSafe());
116
  // We don't capture the cancel action here because we want these operations to continue even
117
  // if the filter was destroyed in the meantime. For the same reason, we must not capture 'this'.
118
5
  cache_.asyncFileManager()->stat(
119
5
      dispatcher(), filepath(),
120
5
      [file = filepath(), cache = cache_.shared_from_this(),
121
5
       dispatcher = dispatcher()](absl::StatusOr<struct stat> stat_result) {
122
5
        ASSERT(dispatcher->isThreadSafe());
123
5
        size_t file_size = 0;
124
5
        if (stat_result.ok()) {
125
1
          file_size = stat_result.value().st_size;
126
1
        }
127
5
        cache->asyncFileManager()->unlink(dispatcher, file,
128
5
                                          [cache, file_size](absl::Status unlink_result) {
129
5
                                            if (unlink_result.ok()) {
130
1
                                              cache->trackFileRemoved(file_size);
131
1
                                            }
132
5
                                          });
133
5
      });
134
5
}
135

            
136
18
void FileLookupContext::getBody(const AdjustedByteRange& range, LookupBodyCallback&& cb) {
137
18
  ASSERT(dispatcher()->isThreadSafe());
138
18
  ASSERT(cb);
139
18
  ASSERT(!cancel_action_in_flight_);
140
18
  ASSERT(file_handle_);
141
18
  auto queued = file_handle_->read(
142
18
      dispatcher(), header_block_.offsetToBody() + range.begin(), range.length(),
143
18
      [this, cb = std::move(cb), range](absl::StatusOr<Buffer::InstancePtr> read_result) mutable {
144
17
        ASSERT(dispatcher()->isThreadSafe());
145
17
        cancel_action_in_flight_ = nullptr;
146
17
        if (!read_result.ok() || read_result.value()->length() != range.length()) {
147
1
          invalidateCacheEntry();
148
          // Calling callback with nullptr fails the request.
149
1
          std::move(cb)(nullptr, /* end_stream (ignored) = */ false);
150
1
          return;
151
1
        }
152
16
        std::move(cb)(std::move(read_result.value()),
153
16
                      /* end_stream = */ range.end() == header_block_.bodySize() &&
154
16
                          header_block_.trailerSize() == 0);
155
16
      });
156
18
  ASSERT(queued.ok(), queued.status().ToString());
157
18
  cancel_action_in_flight_ = std::move(queued.value());
158
18
}
159

            
160
6
void FileLookupContext::getTrailers(LookupTrailersCallback&& cb) {
161
6
  ASSERT(dispatcher()->isThreadSafe());
162
6
  ASSERT(cb);
163
6
  ASSERT(!cancel_action_in_flight_);
164
6
  ASSERT(file_handle_);
165
6
  auto queued = file_handle_->read(
166
6
      dispatcher(), header_block_.offsetToTrailers(), header_block_.trailerSize(),
167
6
      [this, cb = std::move(cb)](absl::StatusOr<Buffer::InstancePtr> read_result) mutable {
168
5
        ASSERT(dispatcher()->isThreadSafe());
169
5
        cancel_action_in_flight_ = nullptr;
170
5
        if (!read_result.ok() || read_result.value()->length() != header_block_.trailerSize()) {
171
1
          invalidateCacheEntry();
172
          // There is no failure response for getTrailers, so we just
173
          // say there were no trailers in the event of this failure.
174
1
          std::move(cb)(Http::ResponseTrailerMapImpl::create());
175
1
          return;
176
1
        }
177
4
        CacheFileTrailer trailer;
178
4
        trailer.ParseFromString(read_result.value()->toString());
179
4
        std::move(cb)(trailersFromTrailerProto(trailer));
180
4
      });
181
6
  ASSERT(queued.ok(), queued.status().ToString());
182
6
  cancel_action_in_flight_ = std::move(queued.value());
183
6
}
184

            
185
157
void FileLookupContext::onDestroy() {
186
157
  if (cancel_action_in_flight_) {
187
4
    std::move(cancel_action_in_flight_)();
188
4
    cancel_action_in_flight_ = nullptr;
189
4
  }
190
157
  if (file_handle_) {
191
38
    auto status = file_handle_->close(nullptr, [](absl::Status) {});
192
38
    ASSERT(status.ok(), status.status().ToString());
193
38
    file_handle_ = nullptr;
194
38
  }
195
157
}
196

            
197
} // namespace FileSystemHttpCache
198
} // namespace Cache
199
} // namespace HttpFilters
200
} // namespace Extensions
201
} // namespace Envoy