Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/util/udt_util.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
7
#include "util/udt_util.h"
8
9
#include "db/dbformat.h"
10
#include "rocksdb/types.h"
11
#include "util/coding.h"
12
#include "util/write_batch_util.h"
13
14
namespace ROCKSDB_NAMESPACE {
15
namespace {
16
enum class RecoveryType {
17
  kNoop,
18
  kUnrecoverable,
19
  kStripTimestamp,
20
  kPadTimestamp,
21
};
22
23
RecoveryType GetRecoveryType(const size_t running_ts_sz,
24
1.36M
                             const std::optional<size_t>& recorded_ts_sz) {
25
1.36M
  if (running_ts_sz == 0) {
26
1.36M
    if (!recorded_ts_sz.has_value()) {
27
      // A column family id not recorded is equivalent to that column family has
28
      // zero timestamp size.
29
1.36M
      return RecoveryType::kNoop;
30
1.36M
    }
31
0
    return RecoveryType::kStripTimestamp;
32
1.36M
  }
33
34
0
  assert(running_ts_sz != 0);
35
36
0
  if (!recorded_ts_sz.has_value()) {
37
0
    return RecoveryType::kPadTimestamp;
38
0
  }
39
40
0
  if (running_ts_sz != *recorded_ts_sz) {
41
0
    return RecoveryType::kUnrecoverable;
42
0
  }
43
44
0
  return RecoveryType::kNoop;
45
0
}
46
47
bool AllRunningColumnFamiliesConsistent(
48
    const UnorderedMap<uint32_t, size_t>& running_ts_sz,
49
1.36M
    const UnorderedMap<uint32_t, size_t>& record_ts_sz) {
50
1.36M
  for (const auto& [cf_id, ts_sz] : running_ts_sz) {
51
1.36M
    auto record_it = record_ts_sz.find(cf_id);
52
1.36M
    RecoveryType recovery_type =
53
1.36M
        GetRecoveryType(ts_sz, record_it != record_ts_sz.end()
54
1.36M
                                   ? std::optional<size_t>(record_it->second)
55
1.36M
                                   : std::nullopt);
56
1.36M
    if (recovery_type != RecoveryType::kNoop) {
57
0
      return false;
58
0
    }
59
1.36M
  }
60
1.36M
  return true;
61
1.36M
}
62
63
Status CheckWriteBatchTimestampSizeConsistency(
64
    const WriteBatch* batch,
65
    const UnorderedMap<uint32_t, size_t>& running_ts_sz,
66
    const UnorderedMap<uint32_t, size_t>& record_ts_sz,
67
0
    TimestampSizeConsistencyMode check_mode, bool* ts_need_recovery) {
68
0
  std::vector<uint32_t> column_family_ids;
69
0
  Status status =
70
0
      CollectColumnFamilyIdsFromWriteBatch(*batch, &column_family_ids);
71
0
  if (!status.ok()) {
72
0
    return status;
73
0
  }
74
0
  for (const auto& cf_id : column_family_ids) {
75
0
    auto running_iter = running_ts_sz.find(cf_id);
76
0
    if (running_iter == running_ts_sz.end()) {
77
      // Ignore dropped column family referred to in a WriteBatch regardless of
78
      // its consistency.
79
0
      continue;
80
0
    }
81
0
    auto record_iter = record_ts_sz.find(cf_id);
82
0
    RecoveryType recovery_type = GetRecoveryType(
83
0
        running_iter->second, record_iter != record_ts_sz.end()
84
0
                                  ? std::optional<size_t>(record_iter->second)
85
0
                                  : std::nullopt);
86
0
    if (recovery_type != RecoveryType::kNoop) {
87
0
      if (check_mode == TimestampSizeConsistencyMode::kVerifyConsistency) {
88
0
        return Status::InvalidArgument(
89
0
            "WriteBatch contains timestamp size inconsistency.");
90
0
      }
91
92
0
      if (recovery_type == RecoveryType::kUnrecoverable) {
93
0
        return Status::InvalidArgument(
94
0
            "WriteBatch contains unrecoverable timestamp size inconsistency.");
95
0
      }
96
97
      // If any column family needs reconciliation, it will mark the whole
98
      // WriteBatch to need recovery and rebuilt.
99
0
      *ts_need_recovery = true;
100
0
    }
101
0
  }
102
0
  return Status::OK();
103
0
}
104
105
enum class ToggleUDT {
106
  kUnchanged,
107
  kEnableUDT,
108
  kDisableUDT,
109
  kInvalidChange,
110
};
111
112
ToggleUDT CompareComparator(const Comparator* new_comparator,
113
4.51k
                            const std::string& old_comparator_name) {
114
4.51k
  static const char* kUDTSuffix = ".u64ts";
115
4.51k
  static const Slice kSuffixSlice = kUDTSuffix;
116
4.51k
  static const size_t kSuffixSize = 6;
117
4.51k
  size_t ts_sz = new_comparator->timestamp_size();
118
4.51k
  (void)ts_sz;
119
4.51k
  Slice new_ucmp_name(new_comparator->Name());
120
4.51k
  Slice old_ucmp_name(old_comparator_name);
121
4.51k
  if (new_ucmp_name.compare(old_ucmp_name) == 0) {
122
4.51k
    return ToggleUDT::kUnchanged;
123
4.51k
  }
124
0
  if (new_ucmp_name.size() == old_ucmp_name.size() + kSuffixSize &&
125
0
      new_ucmp_name.starts_with(old_ucmp_name) &&
126
0
      new_ucmp_name.ends_with(kSuffixSlice)) {
127
0
    assert(ts_sz == 8);
128
0
    return ToggleUDT::kEnableUDT;
129
0
  }
130
0
  if (old_ucmp_name.size() == new_ucmp_name.size() + kSuffixSize &&
131
0
      old_ucmp_name.starts_with(new_ucmp_name) &&
132
0
      old_ucmp_name.ends_with(kSuffixSlice)) {
133
0
    assert(ts_sz == 0);
134
0
    return ToggleUDT::kDisableUDT;
135
0
  }
136
0
  return ToggleUDT::kInvalidChange;
137
0
}
138
}  // namespace
139
140
TimestampRecoveryHandler::TimestampRecoveryHandler(
141
    const UnorderedMap<uint32_t, size_t>& running_ts_sz,
142
    const UnorderedMap<uint32_t, size_t>& record_ts_sz)
143
    : running_ts_sz_(running_ts_sz),
144
      record_ts_sz_(record_ts_sz),
145
      new_batch_(new WriteBatch()),
146
      handler_valid_(true),
147
0
      new_batch_diff_from_orig_batch_(false) {}
148
149
Status TimestampRecoveryHandler::PutCF(uint32_t cf, const Slice& key,
150
0
                                       const Slice& value) {
151
0
  std::string new_key_buf;
152
0
  Slice new_key;
153
0
  Status status =
154
0
      ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
155
0
  if (!status.ok()) {
156
0
    return status;
157
0
  }
158
0
  return WriteBatchInternal::Put(new_batch_.get(), cf, new_key, value);
159
0
}
160
161
Status TimestampRecoveryHandler::PutEntityCF(uint32_t cf, const Slice& key,
162
0
                                             const Slice& entity) {
163
0
  std::string new_key_buf;
164
0
  Slice new_key;
165
0
  Status status = TimestampRecoveryHandler::ReconcileTimestampDiscrepancy(
166
0
      cf, key, &new_key_buf, &new_key);
167
0
  if (!status.ok()) {
168
0
    return status;
169
0
  }
170
0
  Slice entity_copy = entity;
171
0
  WideColumns columns;
172
0
  if (!WideColumnSerialization::Deserialize(entity_copy, columns).ok()) {
173
0
    return Status::Corruption("Unable to deserialize entity",
174
0
                              entity.ToString(/* hex */ true));
175
0
  }
176
177
0
  return WriteBatchInternal::PutEntity(new_batch_.get(), cf, new_key, columns);
178
0
}
179
180
Status TimestampRecoveryHandler::TimedPutCF(uint32_t cf, const Slice& key,
181
                                            const Slice& value,
182
0
                                            uint64_t write_time) {
183
0
  std::string new_key_buf;
184
0
  Slice new_key;
185
0
  Status status =
186
0
      ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
187
0
  if (!status.ok()) {
188
0
    return status;
189
0
  }
190
0
  return WriteBatchInternal::TimedPut(new_batch_.get(), cf, new_key, value,
191
0
                                      write_time);
192
0
}
193
194
0
Status TimestampRecoveryHandler::DeleteCF(uint32_t cf, const Slice& key) {
195
0
  std::string new_key_buf;
196
0
  Slice new_key;
197
0
  Status status =
198
0
      ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
199
0
  if (!status.ok()) {
200
0
    return status;
201
0
  }
202
0
  return WriteBatchInternal::Delete(new_batch_.get(), cf, new_key);
203
0
}
204
205
0
Status TimestampRecoveryHandler::SingleDeleteCF(uint32_t cf, const Slice& key) {
206
0
  std::string new_key_buf;
207
0
  Slice new_key;
208
0
  Status status =
209
0
      ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
210
0
  if (!status.ok()) {
211
0
    return status;
212
0
  }
213
0
  return WriteBatchInternal::SingleDelete(new_batch_.get(), cf, new_key);
214
0
}
215
216
Status TimestampRecoveryHandler::DeleteRangeCF(uint32_t cf,
217
                                               const Slice& begin_key,
218
0
                                               const Slice& end_key) {
219
0
  std::string new_begin_key_buf;
220
0
  Slice new_begin_key;
221
0
  std::string new_end_key_buf;
222
0
  Slice new_end_key;
223
0
  Status status = ReconcileTimestampDiscrepancy(
224
0
      cf, begin_key, &new_begin_key_buf, &new_begin_key);
225
0
  if (!status.ok()) {
226
0
    return status;
227
0
  }
228
0
  status = ReconcileTimestampDiscrepancy(cf, end_key, &new_end_key_buf,
229
0
                                         &new_end_key);
230
0
  if (!status.ok()) {
231
0
    return status;
232
0
  }
233
0
  return WriteBatchInternal::DeleteRange(new_batch_.get(), cf, new_begin_key,
234
0
                                         new_end_key);
235
0
}
236
237
Status TimestampRecoveryHandler::MergeCF(uint32_t cf, const Slice& key,
238
0
                                         const Slice& value) {
239
0
  std::string new_key_buf;
240
0
  Slice new_key;
241
0
  Status status =
242
0
      ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
243
0
  if (!status.ok()) {
244
0
    return status;
245
0
  }
246
0
  return WriteBatchInternal::Merge(new_batch_.get(), cf, new_key, value);
247
0
}
248
249
Status TimestampRecoveryHandler::PutBlobIndexCF(uint32_t cf, const Slice& key,
250
0
                                                const Slice& value) {
251
0
  std::string new_key_buf;
252
0
  Slice new_key;
253
0
  Status status =
254
0
      ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
255
0
  if (!status.ok()) {
256
0
    return status;
257
0
  }
258
0
  return WriteBatchInternal::PutBlobIndex(new_batch_.get(), cf, new_key, value);
259
0
}
260
261
Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy(
262
0
    uint32_t cf, const Slice& key, std::string* new_key_buf, Slice* new_key) {
263
0
  assert(handler_valid_);
264
0
  auto running_iter = running_ts_sz_.find(cf);
265
0
  if (running_iter == running_ts_sz_.end()) {
266
    // The column family referred to by the WriteBatch is no longer running.
267
    // Copy over the entry as is to the new WriteBatch.
268
0
    *new_key = key;
269
0
    return Status::OK();
270
0
  }
271
0
  size_t running_ts_sz = running_iter->second;
272
0
  auto record_iter = record_ts_sz_.find(cf);
273
0
  std::optional<size_t> record_ts_sz =
274
0
      record_iter != record_ts_sz_.end()
275
0
          ? std::optional<size_t>(record_iter->second)
276
0
          : std::nullopt;
277
0
  RecoveryType recovery_type = GetRecoveryType(running_ts_sz, record_ts_sz);
278
279
0
  switch (recovery_type) {
280
0
    case RecoveryType::kNoop:
281
0
      *new_key = key;
282
0
      break;
283
0
    case RecoveryType::kStripTimestamp:
284
0
      assert(record_ts_sz.has_value());
285
0
      *new_key = StripTimestampFromUserKey(key, *record_ts_sz);
286
0
      new_batch_diff_from_orig_batch_ = true;
287
0
      break;
288
0
    case RecoveryType::kPadTimestamp:
289
0
      AppendKeyWithMinTimestamp(new_key_buf, key, running_ts_sz);
290
0
      *new_key = *new_key_buf;
291
0
      new_batch_diff_from_orig_batch_ = true;
292
0
      break;
293
0
    case RecoveryType::kUnrecoverable:
294
0
      return Status::InvalidArgument(
295
0
          "Unrecoverable timestamp size inconsistency encountered by "
296
0
          "TimestampRecoveryHandler.");
297
0
    default:
298
0
      assert(false);
299
0
  }
300
0
  return Status::OK();
301
0
}
302
303
Status HandleWriteBatchTimestampSizeDifference(
304
    const WriteBatch* batch,
305
    const UnorderedMap<uint32_t, size_t>& running_ts_sz,
306
    const UnorderedMap<uint32_t, size_t>& record_ts_sz,
307
    TimestampSizeConsistencyMode check_mode,
308
1.36M
    std::unique_ptr<WriteBatch>* new_batch) {
309
  // Quick path to bypass checking the WriteBatch.
310
1.36M
  if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) {
311
1.36M
    return Status::OK();
312
1.36M
  }
313
0
  bool need_recovery = false;
314
0
  Status status = CheckWriteBatchTimestampSizeConsistency(
315
0
      batch, running_ts_sz, record_ts_sz, check_mode, &need_recovery);
316
0
  if (!status.ok()) {
317
0
    return status;
318
0
  } else if (need_recovery) {
319
0
    assert(new_batch);
320
0
    SequenceNumber sequence = WriteBatchInternal::Sequence(batch);
321
0
    TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz);
322
0
    status = batch->Iterate(&recovery_handler);
323
0
    if (!status.ok()) {
324
0
      return status;
325
0
    } else {
326
0
      *new_batch = recovery_handler.TransferNewBatch();
327
0
      WriteBatchInternal::SetSequence(new_batch->get(), sequence);
328
0
    }
329
0
  }
330
0
  return Status::OK();
331
0
}
332
333
Status ValidateUserDefinedTimestampsOptions(
334
    const Comparator* new_comparator, const std::string& old_comparator_name,
335
    bool new_persist_udt, bool old_persist_udt,
336
4.51k
    bool* mark_sst_files_has_no_udt) {
337
4.51k
  size_t ts_sz = new_comparator->timestamp_size();
338
4.51k
  ToggleUDT res = CompareComparator(new_comparator, old_comparator_name);
339
4.51k
  switch (res) {
340
4.51k
    case ToggleUDT::kUnchanged:
341
4.51k
      if (old_persist_udt == new_persist_udt) {
342
4.51k
        return Status::OK();
343
4.51k
      }
344
0
      if (ts_sz == 0) {
345
0
        return Status::OK();
346
0
      }
347
0
      return Status::InvalidArgument(
348
0
          "Cannot toggle the persist_user_defined_timestamps flag for a column "
349
0
          "family with user-defined timestamps feature enabled.");
350
0
    case ToggleUDT::kEnableUDT:
351
0
      if (!new_persist_udt) {
352
0
        *mark_sst_files_has_no_udt = true;
353
0
        return Status::OK();
354
0
      }
355
0
      return Status::InvalidArgument(
356
0
          "Cannot open a column family and enable user-defined timestamps "
357
0
          "feature without setting persist_user_defined_timestamps flag to "
358
0
          "false.");
359
0
    case ToggleUDT::kDisableUDT:
360
0
      if (!old_persist_udt) {
361
0
        return Status::OK();
362
0
      }
363
0
      return Status::InvalidArgument(
364
0
          "Cannot open a column family and disable user-defined timestamps "
365
0
          "feature if its existing persist_user_defined_timestamps flag is not "
366
0
          "false.");
367
0
    case ToggleUDT::kInvalidChange:
368
0
      return Status::InvalidArgument(
369
0
          new_comparator->Name(),
370
0
          "does not match existing comparator " + old_comparator_name);
371
0
    default:
372
0
      break;
373
4.51k
  }
374
0
  return Status::InvalidArgument(
375
0
      "Unsupported user defined timestamps settings change.");
376
4.51k
}
377
378
void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts,
379
0
                                        std::string* full_history_ts_low) {
380
0
  uint64_t cutoff_udt_ts = 0;
381
0
  [[maybe_unused]] bool format_res = GetFixed64(cutoff_ts, &cutoff_udt_ts);
382
0
  assert(format_res);
383
0
  PutFixed64(full_history_ts_low, cutoff_udt_ts + 1);
384
0
}
385
386
std::tuple<std::optional<Slice>, std::optional<Slice>>
387
MaybeAddTimestampsToRange(const Slice* start, const Slice* end, size_t ts_sz,
388
                          std::string* start_with_ts, std::string* end_with_ts,
389
0
                          bool exclusive_end) {
390
0
  std::optional<Slice> ret_start, ret_end;
391
0
  if (start) {
392
0
    if (ts_sz == 0) {
393
0
      ret_start = *start;
394
0
    } else {
395
      // Maximum timestamp means including all keys with any timestamp for start
396
0
      AppendKeyWithMaxTimestamp(start_with_ts, *start, ts_sz);
397
0
      ret_start = Slice(*start_with_ts);
398
0
    }
399
0
  }
400
0
  if (end) {
401
0
    if (ts_sz == 0) {
402
0
      ret_end = *end;
403
0
    } else {
404
0
      if (exclusive_end) {
405
        // Append a maximum timestamp as the range limit is exclusive:
406
        // [start, end)
407
0
        AppendKeyWithMaxTimestamp(end_with_ts, *end, ts_sz);
408
0
      } else {
409
        // Append a minimum timestamp to end so the range limit is inclusive:
410
        // [start, end]
411
0
        AppendKeyWithMinTimestamp(end_with_ts, *end, ts_sz);
412
0
      }
413
0
      ret_end = Slice(*end_with_ts);
414
0
    }
415
0
  }
416
0
  return std::make_tuple(ret_start, ret_end);
417
0
}
418
}  // namespace ROCKSDB_NAMESPACE