Coverage Report

Created: 2026-02-14 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/seqno_to_time_mapping.cc
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
7
#include "db/seqno_to_time_mapping.h"
8
9
#include <algorithm>
10
#include <cassert>
11
#include <cstdint>
12
#include <deque>
13
#include <functional>
14
#include <queue>
15
#include <vector>
16
17
#include "db/version_edit.h"
18
#include "util/string_util.h"
19
20
namespace ROCKSDB_NAMESPACE {
21
22
SeqnoToTimeMapping::pair_const_iterator SeqnoToTimeMapping::FindGreaterTime(
23
0
    uint64_t time) const {
24
0
  assert(enforced_);
25
0
  return std::upper_bound(pairs_.cbegin(), pairs_.cend(),
26
0
                          SeqnoTimePair{0, time}, SeqnoTimePair::TimeLess);
27
0
}
28
29
SeqnoToTimeMapping::pair_const_iterator SeqnoToTimeMapping::FindGreaterEqSeqno(
30
686
    SequenceNumber seqno) const {
31
686
  assert(enforced_);
32
686
  return std::lower_bound(pairs_.cbegin(), pairs_.cend(),
33
686
                          SeqnoTimePair{seqno, 0}, SeqnoTimePair::SeqnoLess);
34
686
}
35
36
SeqnoToTimeMapping::pair_const_iterator SeqnoToTimeMapping::FindGreaterSeqno(
37
686
    SequenceNumber seqno) const {
38
686
  assert(enforced_);
39
686
  return std::upper_bound(pairs_.cbegin(), pairs_.cend(),
40
686
                          SeqnoTimePair{seqno, 0}, SeqnoTimePair::SeqnoLess);
41
686
}
42
43
uint64_t SeqnoToTimeMapping::GetProximalTimeBeforeSeqno(
44
0
    SequenceNumber seqno) const {
45
0
  assert(enforced_);
46
  // Find the last entry with a seqno strictly less than the given seqno.
47
  // First, find the first entry >= the given seqno (or end)
48
0
  auto it = FindGreaterEqSeqno(seqno);
49
0
  if (it == pairs_.cbegin()) {
50
0
    return kUnknownTimeBeforeAll;
51
0
  }
52
  // Then return data from previous.
53
0
  it--;
54
0
  return it->time;
55
0
}
56
57
SequenceNumber SeqnoToTimeMapping::GetProximalSeqnoBeforeTime(
58
0
    uint64_t time) const {
59
0
  assert(enforced_);
60
61
  // Find the last entry with a time <= the given time.
62
  // First, find the first entry > the given time (or end).
63
0
  auto it = FindGreaterTime(time);
64
0
  if (it == pairs_.cbegin()) {
65
0
    return kUnknownSeqnoBeforeAll;
66
0
  }
67
  // Then return data from previous.
68
0
  --it;
69
0
  return it->seqno;
70
0
}
71
72
void SeqnoToTimeMapping::GetCurrentTieringCutoffSeqnos(
73
    uint64_t current_time, uint64_t preserve_internal_time_seconds,
74
    uint64_t preclude_last_level_data_seconds,
75
    SequenceNumber* preserve_time_min_seqno,
76
0
    SequenceNumber* preclude_last_level_min_seqno) const {
77
0
  uint64_t preserve_time_duration = std::max(preserve_internal_time_seconds,
78
0
                                             preclude_last_level_data_seconds);
79
0
  if (preserve_time_duration <= 0) {
80
0
    return;
81
0
  }
82
0
  uint64_t preserve_time = current_time > preserve_time_duration
83
0
                               ? current_time - preserve_time_duration
84
0
                               : 0;
85
  // GetProximalSeqnoBeforeTime tells us the last seqno known to have been
86
  // written at or before the given time. + 1 to get the minimum we should
87
  // preserve without excluding anything that might have been written on or
88
  // after the given time.
89
0
  if (preserve_time_min_seqno) {
90
0
    *preserve_time_min_seqno = GetProximalSeqnoBeforeTime(preserve_time) + 1;
91
0
  }
92
0
  if (preclude_last_level_data_seconds > 0 && preclude_last_level_min_seqno) {
93
0
    uint64_t preclude_last_level_time =
94
0
        current_time > preclude_last_level_data_seconds
95
0
            ? current_time - preclude_last_level_data_seconds
96
0
            : 0;
97
0
    *preclude_last_level_min_seqno =
98
0
        GetProximalSeqnoBeforeTime(preclude_last_level_time) + 1;
99
0
  }
100
0
}
101
102
19.7k
void SeqnoToTimeMapping::EnforceMaxTimeSpan(uint64_t now) {
103
19.7k
  assert(enforced_);  // at least sorted
104
19.7k
  uint64_t cutoff_time;
105
19.7k
  if (pairs_.size() <= 1) {
106
19.7k
    return;
107
19.7k
  }
108
0
  if (now > 0) {
109
0
    if (now < max_time_span_) {
110
      // Nothing eligible to prune / avoid underflow
111
0
      return;
112
0
    }
113
0
    cutoff_time = now - max_time_span_;
114
0
  } else {
115
0
    const auto& last = pairs_.back();
116
0
    if (last.time < max_time_span_) {
117
      // Nothing eligible to prune / avoid underflow
118
0
      return;
119
0
    }
120
0
    cutoff_time = last.time - max_time_span_;
121
0
  }
122
  // Keep one entry <= cutoff_time
123
0
  while (pairs_.size() >= 2 && pairs_[0].time <= cutoff_time &&
124
0
         pairs_[1].time <= cutoff_time) {
125
0
    pairs_.pop_front();
126
0
  }
127
0
}
128
129
20.3k
void SeqnoToTimeMapping::EnforceCapacity(bool strict) {
130
20.3k
  assert(enforced_);  // at least sorted
131
20.3k
  uint64_t strict_cap = capacity_;
132
20.3k
  if (strict_cap == 0) {
133
19.7k
    pairs_.clear();
134
19.7k
    return;
135
19.7k
  }
136
  // Treat cap of 1 as 2 to work with the below algorithm (etc.)
137
686
  if (strict_cap == 1) {
138
0
    strict_cap = 2;
139
0
  }
140
  // When !strict, allow being over nominal capacity by a modest fraction.
141
686
  uint64_t effective_cap = strict_cap + (strict ? 0 : strict_cap / 8);
142
686
  if (effective_cap < strict_cap) {
143
    // Correct overflow
144
0
    effective_cap = UINT64_MAX;
145
0
  }
146
686
  if (pairs_.size() <= effective_cap) {
147
686
    return;
148
686
  }
149
  // The below algorithm expects at least one removal candidate between first
150
  // and last.
151
686
  assert(pairs_.size() >= 3);
152
0
  size_t to_remove_count = pairs_.size() - strict_cap;
153
154
0
  struct RemovalCandidate {
155
0
    uint64_t new_time_gap;
156
0
    std::deque<SeqnoTimePair>::iterator it;
157
0
    RemovalCandidate(uint64_t _new_time_gap,
158
0
                     std::deque<SeqnoTimePair>::iterator _it)
159
0
        : new_time_gap(_new_time_gap), it(_it) {}
160
0
    bool operator>(const RemovalCandidate& other) const {
161
0
      if (new_time_gap == other.new_time_gap) {
162
        // If same gap, treat the newer entry as less attractive
163
        // for removal (like larger gap)
164
0
        return it->seqno > other.it->seqno;
165
0
      }
166
0
      return new_time_gap > other.new_time_gap;
167
0
    }
168
0
  };
169
170
  // A priority queue of best removal candidates (smallest time gap remaining
171
  // after removal)
172
0
  using RC = RemovalCandidate;
173
0
  using PQ = std::priority_queue<RC, std::vector<RC>, std::greater<RC>>;
174
0
  PQ pq;
175
176
  // Add all the candidates (not including first and last)
177
0
  {
178
0
    auto it = pairs_.begin();
179
0
    assert(it->time != kUnknownTimeBeforeAll);
180
0
    uint64_t prev_prev_time = it->time;
181
0
    ++it;
182
0
    assert(it->time != kUnknownTimeBeforeAll);
183
0
    auto prev_it = it;
184
0
    ++it;
185
0
    while (it != pairs_.end()) {
186
0
      assert(it->time != kUnknownTimeBeforeAll);
187
0
      uint64_t gap = it->time - prev_prev_time;
188
0
      pq.emplace(gap, prev_it);
189
0
      prev_prev_time = prev_it->time;
190
0
      prev_it = it;
191
0
      ++it;
192
0
    }
193
0
  }
194
195
  // Greedily remove the best candidate, iteratively
196
0
  while (to_remove_count > 0) {
197
0
    assert(!pq.empty());
198
    // Remove the candidate with smallest gap
199
0
    auto rc = pq.top();
200
0
    pq.pop();
201
202
    // NOTE: priority_queue does not support updating an existing element,
203
    // but we can work around that because the gap tracked in pq is only
204
    // going to be better than actuality, and we can detect and adjust
205
    // when a better-than-actual gap is found.
206
207
    // Determine actual time gap if this entry is removed (zero entries are
208
    // marked for deletion)
209
0
    auto it = rc.it + 1;
210
0
    uint64_t after_time = it->time;
211
0
    while (after_time == kUnknownTimeBeforeAll) {
212
0
      assert(it != pairs_.end());
213
0
      ++it;
214
0
      after_time = it->time;
215
0
    }
216
0
    it = rc.it - 1;
217
0
    uint64_t before_time = it->time;
218
0
    while (before_time == kUnknownTimeBeforeAll) {
219
0
      assert(it != pairs_.begin());
220
0
      --it;
221
0
      before_time = it->time;
222
0
    }
223
    // Check whether the gap is still valid (or needs to be recomputed)
224
0
    if (rc.new_time_gap == after_time - before_time) {
225
      // Mark the entry as removed
226
0
      rc.it->time = kUnknownTimeBeforeAll;
227
0
      --to_remove_count;
228
0
    } else {
229
      // Insert a replacement up-to-date removal candidate
230
0
      pq.emplace(after_time - before_time, rc.it);
231
0
    }
232
0
  }
233
234
  // Collapse away entries marked for deletion
235
0
  auto from_it = pairs_.begin();
236
0
  auto to_it = from_it;
237
238
0
  for (; from_it != pairs_.end(); ++from_it) {
239
0
    if (from_it->time != kUnknownTimeBeforeAll) {
240
0
      if (from_it != to_it) {
241
0
        *to_it = *from_it;
242
0
      }
243
0
      ++to_it;
244
0
    }
245
0
  }
246
247
  // Erase slots freed up
248
0
  pairs_.erase(to_it, pairs_.end());
249
0
  assert(pairs_.size() == strict_cap);
250
0
}
251
252
0
bool SeqnoToTimeMapping::SeqnoTimePair::Merge(const SeqnoTimePair& other) {
253
0
  assert(seqno <= other.seqno);
254
0
  if (seqno == other.seqno) {
255
    // Favoring GetProximalSeqnoBeforeTime over GetProximalTimeBeforeSeqno
256
    // by keeping the older time. For example, consider nothing has been
257
    // written to the DB in some time.
258
0
    time = std::min(time, other.time);
259
0
    return true;
260
0
  } else if (time == other.time) {
261
    // Favoring GetProximalSeqnoBeforeTime over GetProximalTimeBeforeSeqno
262
    // by keeping the newer seqno. For example, when a burst of writes ages
263
    // out, we want the cutoff to be the newest seqno from that burst.
264
0
    seqno = std::max(seqno, other.seqno);
265
0
    return true;
266
0
  } else if (time > other.time) {
267
0
    assert(seqno < other.seqno);
268
    // Need to resolve an inconsistency (clock drift? very rough time?).
269
    // Given the direction that entries are supposed to err, trust the earlier
270
    // time entry as more reliable, and this choice ensures we don't
271
    // accidentally throw out an entry within our time span.
272
0
    *this = other;
273
0
    return true;
274
0
  } else {
275
    // Not merged
276
0
    return false;
277
0
  }
278
0
}
279
280
0
void SeqnoToTimeMapping::SortAndMerge() {
281
0
  assert(!enforced_);
282
0
  if (!pairs_.empty()) {
283
0
    std::sort(pairs_.begin(), pairs_.end());
284
285
0
    auto from_it = pairs_.begin();
286
0
    auto to_it = from_it;
287
0
    for (++from_it; from_it != pairs_.end(); ++from_it) {
288
0
      if (to_it->Merge(*from_it)) {
289
        // Merged with last entry
290
0
      } else {
291
        // Copy into next entry
292
0
        *++to_it = *from_it;
293
0
      }
294
0
    }
295
    // Erase slots freed up from merging
296
0
    pairs_.erase(to_it + 1, pairs_.end());
297
0
  }
298
  // Mark as "at least sorted"
299
0
  enforced_ = true;
300
0
}
301
302
19.7k
SeqnoToTimeMapping& SeqnoToTimeMapping::SetMaxTimeSpan(uint64_t max_time_span) {
303
19.7k
  max_time_span_ = max_time_span;
304
19.7k
  if (enforced_) {
305
19.7k
    EnforceMaxTimeSpan();
306
19.7k
  }
307
19.7k
  return *this;
308
19.7k
}
309
310
20.3k
SeqnoToTimeMapping& SeqnoToTimeMapping::SetCapacity(uint64_t capacity) {
311
20.3k
  capacity_ = capacity;
312
20.3k
  if (enforced_) {
313
20.3k
    EnforceCapacity(/*strict=*/true);
314
20.3k
  }
315
20.3k
  return *this;
316
20.3k
}
317
318
0
SeqnoToTimeMapping& SeqnoToTimeMapping::Enforce(uint64_t now) {
319
0
  if (!enforced_) {
320
0
    SortAndMerge();
321
0
    assert(enforced_);
322
0
    EnforceMaxTimeSpan(now);
323
0
  } else if (now > 0) {
324
0
    EnforceMaxTimeSpan(now);
325
0
  }
326
0
  EnforceCapacity(/*strict=*/true);
327
0
  return *this;
328
0
}
329
330
0
void SeqnoToTimeMapping::AddUnenforced(SequenceNumber seqno, uint64_t time) {
331
0
  if (seqno == 0) {
332
0
    return;
333
0
  }
334
0
  enforced_ = false;
335
0
  pairs_.emplace_back(seqno, time);
336
0
}
337
338
// The encoded format is:
339
//  [num_of_entries][[seqno][time],[seqno][time],...]
340
//      ^                                 ^
341
//    var_int                      delta_encoded (var_int)
342
// Except empty string is used for empty mapping. This means the encoding
343
// doesn't fully form a prefix code, but that is OK for applications like
344
// TableProperties.
345
7.93k
void SeqnoToTimeMapping::EncodeTo(std::string& dest) const {
346
7.93k
  assert(enforced_);
347
  // Can use empty string for empty mapping
348
7.93k
  if (pairs_.empty()) {
349
7.93k
    return;
350
7.93k
  }
351
  // Encode number of entries
352
0
  PutVarint64(&dest, pairs_.size());
353
0
  SeqnoTimePair base;
354
0
  for (auto& cur : pairs_) {
355
0
    assert(base < cur);
356
    // Delta encode each entry
357
0
    SeqnoTimePair val = cur.ComputeDelta(base);
358
0
    base = cur;
359
0
    val.Encode(dest);
360
0
  }
361
0
}
362
363
namespace {
364
Status DecodeImpl(Slice& input,
365
34.7k
                  std::deque<SeqnoToTimeMapping::SeqnoTimePair>& pairs) {
366
34.8k
  if (input.empty()) {
367
34.8k
    return Status::OK();
368
34.8k
  }
369
18.4E
  uint64_t count;
370
18.4E
  if (!GetVarint64(&input, &count)) {
371
0
    return Status::Corruption("Invalid sequence number time size");
372
0
  }
373
374
18.4E
  SeqnoToTimeMapping::SeqnoTimePair base;
375
18.4E
  for (uint64_t i = 0; i < count; i++) {
376
0
    SeqnoToTimeMapping::SeqnoTimePair val;
377
0
    Status s = val.Decode(input);
378
0
    if (!s.ok()) {
379
0
      return s;
380
0
    }
381
0
    val.ApplyDelta(base);
382
0
    pairs.emplace_back(val);
383
0
    base = val;
384
0
  }
385
386
18.4E
  if (!input.empty()) {
387
0
    return Status::Corruption(
388
0
        "Extra bytes at end of sequence number time mapping");
389
0
  }
390
18.4E
  return Status::OK();
391
18.4E
}
392
}  // namespace
393
394
34.8k
Status SeqnoToTimeMapping::DecodeFrom(const std::string& pairs_str) {
395
34.8k
  size_t orig_size = pairs_.size();
396
397
34.8k
  Slice input(pairs_str);
398
34.8k
  Status s = DecodeImpl(input, pairs_);
399
34.8k
  if (!s.ok()) {
400
    // Roll back in case of corrupted data
401
0
    pairs_.resize(orig_size);
402
34.8k
  } else if (orig_size > 0 || max_time_span_ < UINT64_MAX ||
403
34.8k
             capacity_ < UINT64_MAX) {
404
0
    enforced_ = false;
405
0
  }
406
34.8k
  return s;
407
34.8k
}
408
409
0
void SeqnoToTimeMapping::SeqnoTimePair::Encode(std::string& dest) const {
410
0
  PutVarint64Varint64(&dest, seqno, time);
411
0
}
412
413
0
Status SeqnoToTimeMapping::SeqnoTimePair::Decode(Slice& input) {
414
0
  if (!GetVarint64(&input, &seqno)) {
415
0
    return Status::Corruption("Invalid sequence number");
416
0
  }
417
0
  if (!GetVarint64(&input, &time)) {
418
0
    return Status::Corruption("Invalid time");
419
0
  }
420
0
  return Status::OK();
421
0
}
422
423
void SeqnoToTimeMapping::CopyFromSeqnoRange(const SeqnoToTimeMapping& src,
424
                                            SequenceNumber from_seqno,
425
686
                                            SequenceNumber to_seqno) {
426
686
  bool orig_empty = Empty();
427
686
  auto src_it = src.FindGreaterEqSeqno(from_seqno);
428
  // Allow nonsensical ranges like [1000, 0] which might show up e.g. for
429
  // an SST file with no entries.
430
686
  auto src_it_end =
431
686
      to_seqno < from_seqno ? src_it : src.FindGreaterSeqno(to_seqno);
432
  // To best answer GetProximalTimeBeforeSeqno(from_seqno) we need an entry
433
  // with a seqno before that (if available)
434
686
  if (src_it != src.pairs_.begin()) {
435
0
    --src_it;
436
0
  }
437
686
  assert(src_it <= src_it_end);
438
686
  std::copy(src_it, src_it_end, std::back_inserter(pairs_));
439
440
686
  if (!orig_empty || max_time_span_ < UINT64_MAX || capacity_ < UINT64_MAX) {
441
0
    enforced_ = false;
442
0
  }
443
686
}
444
445
0
bool SeqnoToTimeMapping::Append(SequenceNumber seqno, uint64_t time) {
446
0
  if (capacity_ == 0) {
447
0
    return false;
448
0
  }
449
0
  bool added = false;
450
0
  if (seqno == 0) {
451
    // skip seq number 0, which may have special meaning, like zeroed out data
452
    // TODO: consider changing?
453
0
  } else if (pairs_.empty()) {
454
0
    enforced_ = true;
455
0
    pairs_.emplace_back(seqno, time);
456
    // skip normal enforced check below
457
0
    return true;
458
0
  } else {
459
0
    auto& last = pairs_.back();
460
    // We can attempt to merge with the last entry if the new entry sorts with
461
    // it.
462
0
    if (last.seqno <= seqno) {
463
0
      bool merged = last.Merge({seqno, time});
464
0
      if (!merged) {
465
0
        if (enforced_ && (seqno <= last.seqno || time <= last.time)) {
466
          // Out of order append should not happen, except in case of clock
467
          // reset
468
0
          assert(false);
469
0
        } else {
470
0
          pairs_.emplace_back(seqno, time);
471
0
          added = true;
472
0
        }
473
0
      }
474
0
    } else if (!enforced_) {
475
      // Treat like AddUnenforced and fix up below
476
0
      pairs_.emplace_back(seqno, time);
477
0
      added = true;
478
0
    } else {
479
      // Out of order append attempted
480
0
      assert(false);
481
0
    }
482
0
  }
483
  // Similar to Enforce() but not quite
484
0
  if (!enforced_) {
485
0
    SortAndMerge();
486
0
    assert(enforced_);
487
0
  }
488
0
  EnforceMaxTimeSpan();
489
0
  EnforceCapacity(/*strict=*/false);
490
0
  return added;
491
0
}
492
493
void SeqnoToTimeMapping::PrePopulate(SequenceNumber from_seqno,
494
                                     SequenceNumber to_seqno,
495
0
                                     uint64_t from_time, uint64_t to_time) {
496
0
  assert(Empty());
497
0
  assert(from_seqno > 0);
498
0
  assert(to_seqno > from_seqno);
499
0
  assert(from_time > kUnknownTimeBeforeAll);
500
0
  assert(to_time >= from_time);
501
502
  // TODO: smartly limit this to max_capacity_ representative samples
503
0
  for (auto i = from_seqno; i <= to_seqno; i++) {
504
0
    uint64_t t = from_time + (to_time - from_time) * (i - from_seqno) /
505
0
                                 (to_seqno - from_seqno);
506
0
    pairs_.emplace_back(i, t);
507
0
  }
508
0
}
509
510
0
std::string SeqnoToTimeMapping::ToHumanString() const {
511
0
  std::string ret;
512
0
  for (const auto& seq_time : pairs_) {
513
0
    AppendNumberTo(&ret, seq_time.seqno);
514
0
    ret.append("->");
515
0
    AppendNumberTo(&ret, seq_time.time);
516
0
    ret.append(",");
517
0
  }
518
0
  return ret;
519
0
}
520
521
Slice PackValueAndWriteTime(const Slice& value, uint64_t unix_write_time,
522
0
                            std::string* buf) {
523
0
  buf->assign(value.data(), value.size());
524
0
  PutFixed64(buf, unix_write_time);
525
0
  return Slice(*buf);
526
0
}
527
528
Slice PackValueAndSeqno(const Slice& value, SequenceNumber seqno,
529
0
                        std::string* buf) {
530
0
  buf->assign(value.data(), value.size());
531
0
  PutFixed64(buf, seqno);
532
0
  return Slice(*buf);
533
0
}
534
535
0
uint64_t ParsePackedValueForWriteTime(const Slice& value) {
536
0
  assert(value.size() >= sizeof(uint64_t));
537
0
  Slice write_time_slice(value.data() + value.size() - sizeof(uint64_t),
538
0
                         sizeof(uint64_t));
539
0
  uint64_t write_time;
540
0
  [[maybe_unused]] auto res = GetFixed64(&write_time_slice, &write_time);
541
0
  assert(res);
542
0
  return write_time;
543
0
}
544
545
0
std::tuple<Slice, uint64_t> ParsePackedValueWithWriteTime(const Slice& value) {
546
0
  return std::make_tuple(Slice(value.data(), value.size() - sizeof(uint64_t)),
547
0
                         ParsePackedValueForWriteTime(value));
548
0
}
549
550
0
SequenceNumber ParsePackedValueForSeqno(const Slice& value) {
551
0
  assert(value.size() >= sizeof(SequenceNumber));
552
0
  Slice seqno_slice(value.data() + value.size() - sizeof(uint64_t),
553
0
                    sizeof(uint64_t));
554
0
  SequenceNumber seqno;
555
0
  [[maybe_unused]] auto res = GetFixed64(&seqno_slice, &seqno);
556
0
  assert(res);
557
0
  return seqno;
558
0
}
559
560
std::tuple<Slice, SequenceNumber> ParsePackedValueWithSeqno(
561
0
    const Slice& value) {
562
0
  return std::make_tuple(
563
0
      Slice(value.data(), value.size() - sizeof(SequenceNumber)),
564
0
      ParsePackedValueForSeqno(value));
565
0
}
566
567
0
Slice ParsePackedValueForValue(const Slice& value) {
568
  assert(value.size() >= sizeof(uint64_t));
569
0
  return Slice(value.data(), value.size() - sizeof(uint64_t));
570
0
}
571
}  // namespace ROCKSDB_NAMESPACE