1
#include "source/extensions/http/cache_v2/simple_http_cache/simple_http_cache.h"
2

            
3
#include "envoy/extensions/http/cache_v2/simple_http_cache/v3/config.pb.h"
4
#include "envoy/registry/registry.h"
5

            
6
#include "source/common/buffer/buffer_impl.h"
7
#include "source/common/http/header_map_impl.h"
8
#include "source/extensions/filters/http/cache_v2/cache_sessions.h"
9

            
10
namespace Envoy {
11
namespace Extensions {
12
namespace HttpFilters {
13
namespace CacheV2 {
14
namespace {
15

            
16
constexpr absl::string_view Name = "envoy.extensions.http.cache_v2.simple";
17

            
18
constexpr uint64_t InsertReadChunkSize = 512 * 1024;
19

            
20
class InsertContext {
21
public:
22
  static void start(std::shared_ptr<SimpleHttpCache::Entry> entry,
23
                    std::shared_ptr<CacheProgressReceiver> progress_receiver, HttpSourcePtr source);
24

            
25
private:
26
  InsertContext(std::shared_ptr<SimpleHttpCache::Entry> entry,
27
                std::shared_ptr<CacheProgressReceiver> progress_receiver, HttpSourcePtr source);
28
  void onBody(AdjustedByteRange range, Buffer::InstancePtr buffer, EndStream end_stream);
29
  void onTrailers(Http::ResponseTrailerMapPtr trailers, EndStream end_stream);
30
  std::shared_ptr<SimpleHttpCache::Entry> entry_;
31
  std::shared_ptr<CacheProgressReceiver> progress_receiver_;
32
  HttpSourcePtr source_;
33
};
34

            
35
class SimpleHttpCacheReader : public CacheReader {
36
public:
37
105
  SimpleHttpCacheReader(std::shared_ptr<SimpleHttpCache::Entry> entry) : entry_(std::move(entry)) {}
38
  void getBody(Event::Dispatcher& dispatcher, AdjustedByteRange range,
39
               GetBodyCallback&& cb) override;
40

            
41
private:
42
  std::shared_ptr<SimpleHttpCache::Entry> entry_;
43
};
44

            
45
void SimpleHttpCacheReader::getBody(Event::Dispatcher&, AdjustedByteRange range,
46
165
                                    GetBodyCallback&& cb) {
47
165
  cb(entry_->body(std::move(range)), EndStream::More);
48
165
}
49

            
50
void InsertContext::start(std::shared_ptr<SimpleHttpCache::Entry> entry,
51
                          std::shared_ptr<CacheProgressReceiver> progress_receiver,
52
99
                          HttpSourcePtr source) {
53
99
  auto ctx = new InsertContext(std::move(entry), std::move(progress_receiver), std::move(source));
54
99
  ctx->source_->getBody(AdjustedByteRange(0, InsertReadChunkSize), [ctx](Buffer::InstancePtr buffer,
55
99
                                                                         EndStream end_stream) {
56
99
    ctx->onBody(AdjustedByteRange(0, InsertReadChunkSize), std::move(buffer), end_stream);
57
99
  });
58
99
}
59

            
60
InsertContext::InsertContext(std::shared_ptr<SimpleHttpCache::Entry> entry,
61
                             std::shared_ptr<CacheProgressReceiver> progress_receiver,
62
                             HttpSourcePtr source)
63
99
    : entry_(std::move(entry)), progress_receiver_(std::move(progress_receiver)),
64
99
      source_(std::move(source)) {}
65

            
66
void InsertContext::onBody(AdjustedByteRange range, Buffer::InstancePtr buffer,
67
164
                           EndStream end_stream) {
68
164
  if (end_stream == EndStream::Reset) {
69
1
    progress_receiver_->onInsertFailed(absl::UnavailableError("upstream reset"));
70
1
    delete this;
71
1
    return;
72
1
  }
73
163
  if (end_stream == EndStream::End) {
74
83
    entry_->setEndStreamAfterBody();
75
83
  }
76
163
  if (buffer) {
77
117
    ASSERT(range.length() >= buffer->length());
78
117
    range = AdjustedByteRange(range.begin(), range.begin() + buffer->length());
79
117
    entry_->appendBody(std::move(buffer));
80
117
  } else if (end_stream == EndStream::More) {
81
    // Neither buffer nor EndStream::End means we want trailers.
82
15
    return source_->getTrailers([this](Http::ResponseTrailerMapPtr trailers, EndStream end_stream) {
83
15
      onTrailers(std::move(trailers), end_stream);
84
15
    });
85
33
  } else {
86
31
    range = AdjustedByteRange(0, entry_->bodySize());
87
31
  }
88
148
  progress_receiver_->onBodyInserted(std::move(range), end_stream == EndStream::End);
89
148
  if (end_stream != EndStream::End) {
90
65
    AdjustedByteRange next_range(range.end(), range.end() + InsertReadChunkSize);
91
65
    return source_->getBody(next_range,
92
65
                            [this, next_range](Buffer::InstancePtr buffer, EndStream end_stream) {
93
65
                              onBody(next_range, std::move(buffer), end_stream);
94
65
                            });
95
65
  }
96
83
  delete this;
97
83
}
98

            
99
15
void InsertContext::onTrailers(Http::ResponseTrailerMapPtr trailers, EndStream end_stream) {
100
15
  if (end_stream == EndStream::Reset) {
101
1
    progress_receiver_->onInsertFailed(absl::UnavailableError("upstream reset during trailers"));
102
14
  } else {
103
14
    entry_->setTrailers(std::move(trailers));
104
14
    progress_receiver_->onTrailersInserted(entry_->copyTrailers());
105
14
  }
106
15
  delete this;
107
15
}
108

            
109
} // namespace
110

            
111
165
Buffer::InstancePtr SimpleHttpCache::Entry::body(AdjustedByteRange range) const {
112
165
  absl::ReaderMutexLock lock(mu_);
113
165
  return std::make_unique<Buffer::OwnedImpl>(
114
165
      absl::string_view{body_}.substr(range.begin(), range.length()));
115
165
}
116

            
117
117
void SimpleHttpCache::Entry::appendBody(Buffer::InstancePtr buf) {
118
117
  absl::WriterMutexLock lock(mu_);
119
117
  body_ += buf->toString();
120
117
}
121

            
122
37
uint64_t SimpleHttpCache::Entry::bodySize() const {
123
37
  absl::ReaderMutexLock lock(mu_);
124
37
  return body_.size();
125
37
}
126

            
127
6
Http::ResponseHeaderMapPtr SimpleHttpCache::Entry::copyHeaders() const {
128
6
  absl::ReaderMutexLock lock(mu_);
129
6
  return Http::createHeaderMap<Http::ResponseHeaderMapImpl>(*response_headers_);
130
6
}
131

            
132
20
Http::ResponseTrailerMapPtr SimpleHttpCache::Entry::copyTrailers() const {
133
20
  absl::ReaderMutexLock lock(mu_);
134
20
  if (!trailers_) {
135
4
    return nullptr;
136
4
  }
137
16
  return Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*trailers_);
138
20
}
139

            
140
6
ResponseMetadata SimpleHttpCache::Entry::metadata() const {
141
6
  absl::ReaderMutexLock lock(mu_);
142
6
  return metadata_;
143
6
}
144

            
145
void SimpleHttpCache::Entry::updateHeadersAndMetadata(Http::ResponseHeaderMapPtr response_headers,
146
19
                                                      ResponseMetadata metadata) {
147
19
  absl::WriterMutexLock lock(mu_);
148
19
  response_headers_ = std::move(response_headers);
149
19
  metadata_ = std::move(metadata);
150
19
}
151

            
152
14
void SimpleHttpCache::Entry::setTrailers(Http::ResponseTrailerMapPtr trailers) {
153
14
  absl::WriterMutexLock lock(mu_);
154
14
  trailers_ = std::move(trailers);
155
14
}
156

            
157
83
void SimpleHttpCache::Entry::setEndStreamAfterBody() {
158
83
  absl::WriterMutexLock lock(mu_);
159
83
  end_stream_after_body_ = true;
160
83
}
161

            
162
93
CacheInfo SimpleHttpCache::cacheInfo() const {
163
93
  CacheInfo cache_info;
164
93
  cache_info.name_ = Name;
165
93
  return cache_info;
166
93
}
167

            
168
100
void SimpleHttpCache::lookup(LookupRequest&& request, LookupCallback&& callback) {
169
100
  LookupResult result;
170
100
  {
171
100
    absl::ReaderMutexLock lock(mu_);
172
100
    auto it = entries_.find(request.key());
173
100
    if (it != entries_.end()) {
174
6
      result.cache_reader_ = std::make_unique<SimpleHttpCacheReader>(it->second);
175
6
      result.response_headers_ = it->second->copyHeaders();
176
6
      result.response_metadata_ = it->second->metadata();
177
6
      result.response_trailers_ = it->second->copyTrailers();
178
6
      result.body_length_ = it->second->bodySize();
179
6
    }
180
100
  }
181
100
  callback(std::move(result));
182
100
}
183

            
184
12
void SimpleHttpCache::evict(Event::Dispatcher&, const Key& key) {
185
12
  absl::WriterMutexLock lock(mu_);
186
12
  entries_.erase(key);
187
12
}
188

            
189
void SimpleHttpCache::updateHeaders(Event::Dispatcher&, const Key& key,
190
                                    const Http::ResponseHeaderMap& updated_headers,
191
20
                                    const ResponseMetadata& updated_metadata) {
192
20
  absl::WriterMutexLock lock(mu_);
193
20
  auto it = entries_.find(key);
194
20
  if (it == entries_.end()) {
195
1
    return;
196
1
  }
197
19
  it->second->updateHeadersAndMetadata(
198
19
      Http::createHeaderMap<Http::ResponseHeaderMapImpl>(updated_headers), updated_metadata);
199
19
}
200

            
201
void SimpleHttpCache::insert(Event::Dispatcher&, Key key, Http::ResponseHeaderMapPtr headers,
202
                             ResponseMetadata metadata, HttpSourcePtr source,
203
101
                             std::shared_ptr<CacheProgressReceiver> progress) {
204
101
  auto entry = std::make_shared<Entry>(Http::createHeaderMap<Http::ResponseHeaderMapImpl>(*headers),
205
101
                                       std::move(metadata));
206
101
  {
207
101
    absl::WriterMutexLock lock(mu_);
208
101
    entries_.emplace(key, entry);
209
101
  }
210
101
  if (source) {
211
99
    progress->onHeadersInserted(std::make_unique<SimpleHttpCacheReader>(entry), std::move(headers),
212
99
                                false);
213
99
    InsertContext::start(entry, std::move(progress), std::move(source));
214
99
  } else {
215
2
    progress->onHeadersInserted(nullptr, std::move(headers), true);
216
2
  }
217
101
}
218

            
219
SINGLETON_MANAGER_REGISTRATION(simple_http_cache_v2_singleton);
220

            
221
class SimpleHttpCacheFactory : public HttpCacheFactory {
222
public:
223
  // From UntypedFactory
224
100
  std::string name() const override { return std::string(Name); }
225
  // From TypedFactory
226
8
  ProtobufTypes::MessagePtr createEmptyConfigProto() override {
227
8
    return std::make_unique<
228
8
        envoy::extensions::http::cache_v2::simple_http_cache::v3::SimpleHttpCacheV2Config>();
229
8
  }
230
  // From HttpCacheFactory
231
  absl::StatusOr<std::shared_ptr<CacheSessions>>
232
  getCache(const envoy::extensions::filters::http::cache_v2::v3::CacheV2Config&,
233
92
           Server::Configuration::FactoryContext& context) override {
234
92
    return context.serverFactoryContext().singletonManager().getTyped<CacheSessions>(
235
92
        SINGLETON_MANAGER_REGISTERED_NAME(simple_http_cache_v2_singleton), [&context]() {
236
92
          return CacheSessions::create(context, std::make_unique<SimpleHttpCache>());
237
92
        });
238
92
  }
239

            
240
private:
241
};
242

            
243
static Registry::RegisterFactory<SimpleHttpCacheFactory, HttpCacheFactory> register_;
244

            
245
} // namespace CacheV2
246
} // namespace HttpFilters
247
} // namespace Extensions
248
} // namespace Envoy