/src/rocksdb/db/blob/blob_source.cc
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "db/blob/blob_source.h" |
7 | | |
8 | | #include <cassert> |
9 | | #include <string> |
10 | | |
11 | | #include "cache/cache_reservation_manager.h" |
12 | | #include "cache/charged_cache.h" |
13 | | #include "db/blob/blob_contents.h" |
14 | | #include "db/blob/blob_file_reader.h" |
15 | | #include "db/blob/blob_log_format.h" |
16 | | #include "monitoring/statistics_impl.h" |
17 | | #include "options/cf_options.h" |
18 | | #include "table/get_context.h" |
19 | | #include "table/multiget_context.h" |
20 | | |
21 | | namespace ROCKSDB_NAMESPACE { |
22 | | |
23 | | namespace { |
24 | | |
25 | | Status AppendBlobRefreshRetryFailure(const Status& stale_status, |
26 | 0 | const Status& retry_status) { |
27 | 0 | assert(stale_status.IsCorruption()); |
28 | 0 | assert(!retry_status.ok()); |
29 | 0 | if (retry_status.IsCorruption()) { |
30 | 0 | return retry_status; |
31 | 0 | } |
32 | 0 | return Status::CopyAppendMessage( |
33 | 0 | stale_status, "; refresh retry failed: ", retry_status.ToString()); |
34 | 0 | } |
35 | | |
36 | | } // namespace |
37 | | |
38 | | BlobSource::BlobSource(const ImmutableOptions& immutable_options, |
39 | | const MutableCFOptions& mutable_cf_options, |
40 | | const std::string& db_id, |
41 | | const std::string& db_session_id, |
42 | | BlobFileCache* blob_file_cache) |
43 | 83.7k | : db_id_(db_id), |
44 | 83.7k | db_session_id_(db_session_id), |
45 | 83.7k | statistics_(immutable_options.statistics.get()), |
46 | 83.7k | blob_file_cache_(blob_file_cache), |
47 | 83.7k | blob_cache_(immutable_options.blob_cache), |
48 | 83.7k | lowest_used_cache_tier_(immutable_options.lowest_used_cache_tier) { |
49 | 83.7k | auto bbto = |
50 | 83.7k | mutable_cf_options.table_factory->GetOptions<BlockBasedTableOptions>(); |
51 | 83.7k | if (bbto && |
52 | 83.7k | bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache) |
53 | 83.7k | .charged == CacheEntryRoleOptions::Decision::kEnabled) { |
54 | 0 | blob_cache_ = SharedCacheInterface{std::make_shared<ChargedCache>( |
55 | 0 | immutable_options.blob_cache, bbto->block_cache)}; |
56 | 0 | } |
57 | 83.7k | } |
58 | | |
59 | 83.7k | BlobSource::~BlobSource() = default; |
60 | | |
61 | | Status BlobSource::GetBlobFromCache( |
62 | 0 | const Slice& cache_key, CacheHandleGuard<BlobContents>* cached_blob) const { |
63 | 0 | assert(blob_cache_); |
64 | 0 | assert(!cache_key.empty()); |
65 | 0 | assert(cached_blob); |
66 | 0 | assert(cached_blob->IsEmpty()); |
67 | |
|
68 | 0 | Cache::Handle* cache_handle = nullptr; |
69 | 0 | cache_handle = GetEntryFromCache(cache_key); |
70 | 0 | if (cache_handle != nullptr) { |
71 | 0 | *cached_blob = |
72 | 0 | CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle); |
73 | |
|
74 | 0 | assert(cached_blob->GetValue()); |
75 | |
|
76 | 0 | PERF_COUNTER_ADD(blob_cache_hit_count, 1); |
77 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_HIT); |
78 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ, |
79 | 0 | cached_blob->GetValue()->size()); |
80 | |
|
81 | 0 | return Status::OK(); |
82 | 0 | } |
83 | | |
84 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_MISS); |
85 | |
|
86 | 0 | return Status::NotFound("Blob not found in cache"); |
87 | 0 | } |
88 | | |
89 | | Status BlobSource::PutBlobIntoCache( |
90 | | const Slice& cache_key, std::unique_ptr<BlobContents>* blob, |
91 | 0 | CacheHandleGuard<BlobContents>* cached_blob) const { |
92 | 0 | assert(blob_cache_); |
93 | 0 | assert(!cache_key.empty()); |
94 | 0 | assert(blob); |
95 | 0 | assert(*blob); |
96 | 0 | assert(cached_blob); |
97 | 0 | assert(cached_blob->IsEmpty()); |
98 | |
|
99 | 0 | TypedHandle* cache_handle = nullptr; |
100 | 0 | const Status s = InsertEntryIntoCache(cache_key, blob->get(), &cache_handle, |
101 | 0 | Cache::Priority::BOTTOM); |
102 | 0 | if (s.ok()) { |
103 | 0 | blob->release(); |
104 | |
|
105 | 0 | assert(cache_handle != nullptr); |
106 | 0 | *cached_blob = |
107 | 0 | CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle); |
108 | |
|
109 | 0 | assert(cached_blob->GetValue()); |
110 | |
|
111 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_ADD); |
112 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE, |
113 | 0 | cached_blob->GetValue()->size()); |
114 | |
|
115 | 0 | } else { |
116 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES); |
117 | 0 | } |
118 | |
|
119 | 0 | return s; |
120 | 0 | } |
121 | | |
122 | 0 | BlobSource::TypedHandle* BlobSource::GetEntryFromCache(const Slice& key) const { |
123 | 0 | return blob_cache_.LookupFull(key, nullptr /* context */, |
124 | 0 | Cache::Priority::BOTTOM, statistics_, |
125 | 0 | lowest_used_cache_tier_); |
126 | 0 | } |
127 | | |
128 | | void BlobSource::PinCachedBlob(CacheHandleGuard<BlobContents>* cached_blob, |
129 | 0 | PinnableSlice* value) { |
130 | 0 | assert(cached_blob); |
131 | 0 | assert(cached_blob->GetValue()); |
132 | 0 | assert(value); |
133 | | |
134 | | // To avoid copying the cached blob into the buffer provided by the |
135 | | // application, we can simply transfer ownership of the cache handle to |
136 | | // the target PinnableSlice. This has the potential to save a lot of |
137 | | // CPU, especially with large blob values. |
138 | |
|
139 | 0 | value->Reset(); |
140 | |
|
141 | 0 | constexpr Cleanable* cleanable = nullptr; |
142 | 0 | value->PinSlice(cached_blob->GetValue()->data(), cleanable); |
143 | |
|
144 | 0 | cached_blob->TransferTo(value); |
145 | 0 | } |
146 | | |
147 | | void BlobSource::PinOwnedBlob(std::unique_ptr<BlobContents>* owned_blob, |
148 | 0 | PinnableSlice* value) { |
149 | 0 | assert(owned_blob); |
150 | 0 | assert(*owned_blob); |
151 | 0 | assert(value); |
152 | |
|
153 | 0 | BlobContents* const blob = owned_blob->release(); |
154 | 0 | assert(blob); |
155 | |
|
156 | 0 | value->Reset(); |
157 | 0 | value->PinSlice( |
158 | 0 | blob->data(), |
159 | 0 | [](void* arg1, void* /* arg2 */) { |
160 | 0 | delete static_cast<BlobContents*>(arg1); |
161 | 0 | }, |
162 | 0 | blob, nullptr); |
163 | 0 | } |
164 | | |
165 | | Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value, |
166 | | TypedHandle** cache_handle, |
167 | 0 | Cache::Priority priority) const { |
168 | 0 | return blob_cache_.InsertFull(key, value, value->ApproximateMemoryUsage(), |
169 | 0 | cache_handle, priority, |
170 | 0 | lowest_used_cache_tier_); |
171 | 0 | } |
172 | | |
173 | | Status BlobSource::GetBlob(const ReadOptions& read_options, |
174 | | const Slice& user_key, uint64_t file_number, |
175 | | uint64_t offset, uint64_t file_size, |
176 | | uint64_t value_size, |
177 | | CompressionType compression_type, |
178 | | FilePrefetchBuffer* prefetch_buffer, |
179 | 0 | PinnableSlice* value, uint64_t* bytes_read) { |
180 | 0 | assert(value); |
181 | |
|
182 | 0 | Status s; |
183 | |
|
184 | 0 | const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); |
185 | |
|
186 | 0 | CacheHandleGuard<BlobContents> blob_handle; |
187 | | |
188 | | // First, try to get the blob from the cache |
189 | | // |
190 | | // If blob cache is enabled, we'll try to read from it. |
191 | 0 | if (blob_cache_) { |
192 | 0 | Slice key = cache_key.AsSlice(); |
193 | 0 | s = GetBlobFromCache(key, &blob_handle); |
194 | 0 | if (s.ok()) { |
195 | 0 | PinCachedBlob(&blob_handle, value); |
196 | | |
197 | | // For consistency, the size of on-disk (possibly compressed) blob record |
198 | | // is assigned to bytes_read. |
199 | 0 | uint64_t adjustment = |
200 | 0 | read_options.verify_checksums |
201 | 0 | ? BlobLogRecord::CalculateAdjustmentForRecordHeader( |
202 | 0 | user_key.size()) |
203 | 0 | : 0; |
204 | 0 | assert(offset >= adjustment); |
205 | |
|
206 | 0 | uint64_t record_size = value_size + adjustment; |
207 | 0 | if (bytes_read) { |
208 | 0 | *bytes_read = record_size; |
209 | 0 | } |
210 | 0 | return s; |
211 | 0 | } |
212 | 0 | } |
213 | | |
214 | 0 | assert(blob_handle.IsEmpty()); |
215 | |
|
216 | 0 | const bool no_io = read_options.read_tier == kBlockCacheTier; |
217 | 0 | if (no_io) { |
218 | 0 | s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); |
219 | 0 | return s; |
220 | 0 | } |
221 | | |
222 | | // Can't find the blob from the cache. Since I/O is allowed, read from the |
223 | | // file. |
224 | 0 | std::unique_ptr<BlobContents> blob_contents; |
225 | |
|
226 | 0 | { |
227 | 0 | CacheHandleGuard<BlobFileReader> blob_file_reader; |
228 | 0 | s = blob_file_cache_->GetBlobFileReader(read_options, file_number, |
229 | 0 | &blob_file_reader); |
230 | 0 | if (!s.ok()) { |
231 | 0 | return s; |
232 | 0 | } |
233 | | |
234 | 0 | assert(blob_file_reader.GetValue()); |
235 | |
|
236 | 0 | if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) { |
237 | 0 | return Status::Corruption("Compression type mismatch when reading blob"); |
238 | 0 | } |
239 | | |
240 | 0 | MemoryAllocator* const allocator = |
241 | 0 | (blob_cache_ && read_options.fill_cache) |
242 | 0 | ? blob_cache_.get()->memory_allocator() |
243 | 0 | : nullptr; |
244 | |
|
245 | 0 | uint64_t read_size = 0; |
246 | 0 | s = blob_file_reader.GetValue()->GetBlob( |
247 | 0 | read_options, user_key, offset, value_size, compression_type, |
248 | 0 | prefetch_buffer, allocator, &blob_contents, &read_size); |
249 | 0 | if (s.IsCorruption()) { |
250 | 0 | const Status stale_status = s; |
251 | 0 | blob_file_reader.Reset(); |
252 | 0 | blob_file_cache_->Evict(file_number); |
253 | |
|
254 | 0 | std::unique_ptr<BlobFileReader> fresh_reader; |
255 | 0 | s = blob_file_cache_->OpenBlobFileReaderUncached( |
256 | 0 | read_options, file_number, &fresh_reader, |
257 | 0 | /*allow_footer_skip_retry=*/false); |
258 | 0 | if (!s.ok()) { |
259 | 0 | return AppendBlobRefreshRetryFailure(stale_status, s); |
260 | 0 | } |
261 | | |
262 | 0 | if (compression_type != fresh_reader->GetCompressionType()) { |
263 | 0 | return Status::Corruption( |
264 | 0 | "Compression type mismatch when reading blob"); |
265 | 0 | } |
266 | | |
267 | 0 | blob_contents.reset(); |
268 | 0 | read_size = 0; |
269 | 0 | s = fresh_reader->GetBlob(read_options, user_key, offset, value_size, |
270 | 0 | compression_type, prefetch_buffer, allocator, |
271 | 0 | &blob_contents, &read_size); |
272 | 0 | if (!s.ok()) { |
273 | 0 | s = AppendBlobRefreshRetryFailure(stale_status, s); |
274 | 0 | } else { |
275 | 0 | CacheHandleGuard<BlobFileReader> ignored_reader; |
276 | 0 | blob_file_cache_ |
277 | 0 | ->RefreshBlobFileReader(file_number, &fresh_reader, &ignored_reader) |
278 | 0 | .PermitUncheckedError(); |
279 | 0 | } |
280 | 0 | } |
281 | 0 | if (!s.ok()) { |
282 | 0 | return s; |
283 | 0 | } |
284 | 0 | if (bytes_read) { |
285 | 0 | *bytes_read = read_size; |
286 | 0 | } |
287 | 0 | } |
288 | | |
289 | 0 | if (blob_cache_ && read_options.fill_cache) { |
290 | | // If filling cache is allowed and a cache is configured, try to put the |
291 | | // blob to the cache. |
292 | 0 | Slice key = cache_key.AsSlice(); |
293 | 0 | s = PutBlobIntoCache(key, &blob_contents, &blob_handle); |
294 | 0 | if (!s.ok()) { |
295 | 0 | return s; |
296 | 0 | } |
297 | | |
298 | 0 | PinCachedBlob(&blob_handle, value); |
299 | 0 | } else { |
300 | 0 | PinOwnedBlob(&blob_contents, value); |
301 | 0 | } |
302 | | |
303 | 0 | assert(s.ok()); |
304 | 0 | return s; |
305 | 0 | } |
306 | | |
307 | | void BlobSource::MultiGetBlob(const ReadOptions& read_options, |
308 | | autovector<BlobFileReadRequests>& blob_reqs, |
309 | 0 | uint64_t* bytes_read) { |
310 | 0 | assert(blob_reqs.size() > 0); |
311 | |
|
312 | 0 | uint64_t total_bytes_read = 0; |
313 | 0 | uint64_t bytes_read_in_file = 0; |
314 | |
|
315 | 0 | for (auto& [file_number, file_size, blob_reqs_in_file] : blob_reqs) { |
316 | | // sort blob_reqs_in_file by file offset. |
317 | 0 | std::sort( |
318 | 0 | blob_reqs_in_file.begin(), blob_reqs_in_file.end(), |
319 | 0 | [](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool { |
320 | 0 | return lhs.offset < rhs.offset; |
321 | 0 | }); |
322 | |
|
323 | 0 | MultiGetBlobFromOneFile(read_options, file_number, file_size, |
324 | 0 | blob_reqs_in_file, &bytes_read_in_file); |
325 | |
|
326 | 0 | total_bytes_read += bytes_read_in_file; |
327 | 0 | } |
328 | |
|
329 | 0 | if (bytes_read) { |
330 | 0 | *bytes_read = total_bytes_read; |
331 | 0 | } |
332 | 0 | } |
333 | | |
334 | | void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, |
335 | | uint64_t file_number, |
336 | | uint64_t /*file_size*/, |
337 | | autovector<BlobReadRequest>& blob_reqs, |
338 | 0 | uint64_t* bytes_read) { |
339 | 0 | const size_t num_blobs = blob_reqs.size(); |
340 | 0 | assert(num_blobs > 0); |
341 | 0 | assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE); |
342 | |
|
343 | | #ifndef NDEBUG |
344 | | for (size_t i = 0; i < num_blobs - 1; ++i) { |
345 | | assert(blob_reqs[i].offset <= blob_reqs[i + 1].offset); |
346 | | } |
347 | | #endif // !NDEBUG |
348 | |
|
349 | 0 | using Mask = uint64_t; |
350 | 0 | Mask cache_hit_mask = 0; |
351 | |
|
352 | 0 | uint64_t total_bytes = 0; |
353 | 0 | const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number); |
354 | |
|
355 | 0 | if (blob_cache_) { |
356 | 0 | size_t cached_blob_count = 0; |
357 | 0 | for (size_t i = 0; i < num_blobs; ++i) { |
358 | 0 | auto& req = blob_reqs[i]; |
359 | |
|
360 | 0 | CacheHandleGuard<BlobContents> blob_handle; |
361 | 0 | const CacheKey cache_key = base_cache_key.WithOffset(req.offset); |
362 | 0 | const Slice key = cache_key.AsSlice(); |
363 | |
|
364 | 0 | const Status s = GetBlobFromCache(key, &blob_handle); |
365 | |
|
366 | 0 | if (s.ok()) { |
367 | 0 | assert(req.status); |
368 | 0 | *req.status = s; |
369 | |
|
370 | 0 | PinCachedBlob(&blob_handle, req.result); |
371 | | |
372 | | // Update the counter for the number of valid blobs read from the cache. |
373 | 0 | ++cached_blob_count; |
374 | | |
375 | | // For consistency, the size of each on-disk (possibly compressed) blob |
376 | | // record is accumulated to total_bytes. |
377 | 0 | uint64_t adjustment = |
378 | 0 | read_options.verify_checksums |
379 | 0 | ? BlobLogRecord::CalculateAdjustmentForRecordHeader( |
380 | 0 | req.user_key->size()) |
381 | 0 | : 0; |
382 | 0 | assert(req.offset >= adjustment); |
383 | 0 | total_bytes += req.len + adjustment; |
384 | 0 | cache_hit_mask |= (Mask{1} << i); // cache hit |
385 | 0 | } |
386 | 0 | } |
387 | | |
388 | | // All blobs were read from the cache. |
389 | 0 | if (cached_blob_count == num_blobs) { |
390 | 0 | if (bytes_read) { |
391 | 0 | *bytes_read = total_bytes; |
392 | 0 | } |
393 | 0 | return; |
394 | 0 | } |
395 | 0 | } |
396 | | |
397 | 0 | const bool no_io = read_options.read_tier == kBlockCacheTier; |
398 | 0 | if (no_io) { |
399 | 0 | for (size_t i = 0; i < num_blobs; ++i) { |
400 | 0 | if (!(cache_hit_mask & (Mask{1} << i))) { |
401 | 0 | BlobReadRequest& req = blob_reqs[i]; |
402 | 0 | assert(req.status); |
403 | |
|
404 | 0 | *req.status = |
405 | 0 | Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); |
406 | 0 | } |
407 | 0 | } |
408 | 0 | return; |
409 | 0 | } |
410 | | |
411 | 0 | { |
412 | | // Find the rest of blobs from the file since I/O is allowed. |
413 | 0 | autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>> |
414 | 0 | _blob_reqs; |
415 | 0 | uint64_t _bytes_read = 0; |
416 | |
|
417 | 0 | for (size_t i = 0; i < num_blobs; ++i) { |
418 | 0 | if (!(cache_hit_mask & (Mask{1} << i))) { |
419 | 0 | _blob_reqs.emplace_back(&blob_reqs[i], std::unique_ptr<BlobContents>()); |
420 | 0 | } |
421 | 0 | } |
422 | |
|
423 | 0 | CacheHandleGuard<BlobFileReader> blob_file_reader; |
424 | 0 | Status s = blob_file_cache_->GetBlobFileReader(read_options, file_number, |
425 | 0 | &blob_file_reader); |
426 | 0 | if (!s.ok()) { |
427 | 0 | for (size_t i = 0; i < _blob_reqs.size(); ++i) { |
428 | 0 | BlobReadRequest* const req = _blob_reqs[i].first; |
429 | 0 | assert(req); |
430 | 0 | assert(req->status); |
431 | |
|
432 | 0 | *req->status = s; |
433 | 0 | } |
434 | 0 | return; |
435 | 0 | } |
436 | | |
437 | 0 | assert(blob_file_reader.GetValue()); |
438 | |
|
439 | 0 | MemoryAllocator* const allocator = |
440 | 0 | (blob_cache_ && read_options.fill_cache) |
441 | 0 | ? blob_cache_.get()->memory_allocator() |
442 | 0 | : nullptr; |
443 | |
|
444 | 0 | blob_file_reader.GetValue()->MultiGetBlob(read_options, allocator, |
445 | 0 | _blob_reqs, &_bytes_read); |
446 | |
|
447 | 0 | bool needs_reader_refresh = false; |
448 | 0 | for (const auto& blob_req : _blob_reqs) { |
449 | 0 | BlobReadRequest* const req = blob_req.first; |
450 | 0 | assert(req != nullptr); |
451 | 0 | assert(req->status != nullptr); |
452 | 0 | if (req->status->IsCorruption()) { |
453 | 0 | needs_reader_refresh = true; |
454 | 0 | break; |
455 | 0 | } |
456 | 0 | } |
457 | |
|
458 | 0 | if (needs_reader_refresh) { |
459 | 0 | blob_file_reader.Reset(); |
460 | 0 | blob_file_cache_->Evict(file_number); |
461 | |
|
462 | 0 | std::unique_ptr<BlobFileReader> fresh_reader; |
463 | 0 | s = blob_file_cache_->OpenBlobFileReaderUncached( |
464 | 0 | read_options, file_number, &fresh_reader, |
465 | 0 | /*allow_footer_skip_retry=*/false); |
466 | 0 | if (!s.ok()) { |
467 | 0 | for (const auto& blob_req : _blob_reqs) { |
468 | 0 | BlobReadRequest* const req = blob_req.first; |
469 | 0 | assert(req != nullptr); |
470 | 0 | assert(req->status != nullptr); |
471 | 0 | if (req->status->IsCorruption()) { |
472 | 0 | *req->status = AppendBlobRefreshRetryFailure(*req->status, s); |
473 | 0 | } |
474 | 0 | } |
475 | 0 | return; |
476 | 0 | } |
477 | | |
478 | 0 | autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>> |
479 | 0 | retry_blob_reqs; |
480 | 0 | autovector<Status> stale_statuses; |
481 | 0 | for (auto& blob_req : _blob_reqs) { |
482 | 0 | BlobReadRequest* const req = blob_req.first; |
483 | 0 | assert(req != nullptr); |
484 | 0 | assert(req->status != nullptr); |
485 | 0 | if (!req->status->IsCorruption()) { |
486 | 0 | continue; |
487 | 0 | } |
488 | | |
489 | 0 | stale_statuses.emplace_back(*req->status); |
490 | 0 | *req->status = Status::OK(); |
491 | 0 | blob_req.second.reset(); |
492 | 0 | retry_blob_reqs.emplace_back(req, std::unique_ptr<BlobContents>()); |
493 | 0 | } |
494 | |
|
495 | 0 | uint64_t refreshed_bytes_read = 0; |
496 | 0 | fresh_reader->MultiGetBlob(read_options, allocator, retry_blob_reqs, |
497 | 0 | &refreshed_bytes_read); |
498 | 0 | _bytes_read += refreshed_bytes_read; |
499 | |
|
500 | 0 | bool install_fresh_reader = false; |
501 | 0 | for (size_t i = 0; i < retry_blob_reqs.size(); ++i) { |
502 | 0 | auto& retried_blob_req = retry_blob_reqs[i]; |
503 | 0 | BlobReadRequest* const retried_req = retried_blob_req.first; |
504 | 0 | assert(retried_req != nullptr); |
505 | 0 | if (retried_req->status->ok()) { |
506 | 0 | install_fresh_reader = true; |
507 | 0 | } else { |
508 | 0 | *retried_req->status = AppendBlobRefreshRetryFailure( |
509 | 0 | stale_statuses[i], *retried_req->status); |
510 | 0 | } |
511 | |
|
512 | 0 | for (auto& blob_req : _blob_reqs) { |
513 | 0 | if (blob_req.first != retried_req) { |
514 | 0 | continue; |
515 | 0 | } |
516 | | |
517 | 0 | blob_req.second = std::move(retried_blob_req.second); |
518 | 0 | break; |
519 | 0 | } |
520 | 0 | } |
521 | |
|
522 | 0 | if (install_fresh_reader) { |
523 | 0 | CacheHandleGuard<BlobFileReader> ignored_reader; |
524 | 0 | blob_file_cache_ |
525 | 0 | ->RefreshBlobFileReader(file_number, &fresh_reader, &ignored_reader) |
526 | 0 | .PermitUncheckedError(); |
527 | 0 | } |
528 | 0 | } |
529 | | |
530 | 0 | if (blob_cache_ && read_options.fill_cache) { |
531 | | // If filling cache is allowed and a cache is configured, try to put |
532 | | // the blob(s) to the cache. |
533 | 0 | for (auto& [req, blob_contents] : _blob_reqs) { |
534 | 0 | assert(req); |
535 | |
|
536 | 0 | if (req->status->ok()) { |
537 | 0 | CacheHandleGuard<BlobContents> blob_handle; |
538 | 0 | const CacheKey cache_key = base_cache_key.WithOffset(req->offset); |
539 | 0 | const Slice key = cache_key.AsSlice(); |
540 | 0 | s = PutBlobIntoCache(key, &blob_contents, &blob_handle); |
541 | 0 | if (!s.ok()) { |
542 | 0 | *req->status = s; |
543 | 0 | } else { |
544 | 0 | PinCachedBlob(&blob_handle, req->result); |
545 | 0 | } |
546 | 0 | } |
547 | 0 | } |
548 | 0 | } else { |
549 | 0 | for (auto& [req, blob_contents] : _blob_reqs) { |
550 | 0 | assert(req); |
551 | |
|
552 | 0 | if (req->status->ok()) { |
553 | 0 | PinOwnedBlob(&blob_contents, req->result); |
554 | 0 | } |
555 | 0 | } |
556 | 0 | } |
557 | |
|
558 | 0 | total_bytes += _bytes_read; |
559 | 0 | if (bytes_read) { |
560 | 0 | *bytes_read = total_bytes; |
561 | 0 | } |
562 | 0 | } |
563 | 0 | } |
564 | | |
565 | | bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size, |
566 | 0 | uint64_t offset, size_t* charge) const { |
567 | 0 | const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); |
568 | 0 | const Slice key = cache_key.AsSlice(); |
569 | |
|
570 | 0 | CacheHandleGuard<BlobContents> blob_handle; |
571 | 0 | const Status s = GetBlobFromCache(key, &blob_handle); |
572 | |
|
573 | 0 | if (s.ok() && blob_handle.GetValue() != nullptr) { |
574 | 0 | if (charge) { |
575 | 0 | const Cache* const cache = blob_handle.GetCache(); |
576 | 0 | assert(cache); |
577 | |
|
578 | 0 | Cache::Handle* const handle = blob_handle.GetCacheHandle(); |
579 | 0 | assert(handle); |
580 | |
|
581 | 0 | *charge = cache->GetUsage(handle); |
582 | 0 | } |
583 | |
|
584 | 0 | return true; |
585 | 0 | } |
586 | | |
587 | 0 | return false; |
588 | 0 | } |
589 | | |
590 | | } // namespace ROCKSDB_NAMESPACE |