Coverage Report

Created: 2025-07-23 07:17

/src/rocksdb/db/column_family.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/column_family.h"
11
12
#include <algorithm>
13
#include <cinttypes>
14
#include <limits>
15
#include <sstream>
16
#include <string>
17
#include <vector>
18
19
#include "db/blob/blob_file_cache.h"
20
#include "db/blob/blob_source.h"
21
#include "db/compaction/compaction_picker.h"
22
#include "db/compaction/compaction_picker_fifo.h"
23
#include "db/compaction/compaction_picker_level.h"
24
#include "db/compaction/compaction_picker_universal.h"
25
#include "db/db_impl/db_impl.h"
26
#include "db/internal_stats.h"
27
#include "db/job_context.h"
28
#include "db/range_del_aggregator.h"
29
#include "db/table_properties_collector.h"
30
#include "db/version_set.h"
31
#include "db/write_controller.h"
32
#include "file/sst_file_manager_impl.h"
33
#include "logging/logging.h"
34
#include "monitoring/thread_status_util.h"
35
#include "options/options_helper.h"
36
#include "port/port.h"
37
#include "rocksdb/convenience.h"
38
#include "rocksdb/table.h"
39
#include "table/merging_iterator.h"
40
#include "util/autovector.h"
41
#include "util/cast_util.h"
42
#include "util/compression.h"
43
44
namespace ROCKSDB_NAMESPACE {
45
46
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
47
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
48
219k
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
49
219k
  if (cfd_ != nullptr) {
50
160k
    cfd_->Ref();
51
160k
  }
52
219k
}
53
54
219k
ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
55
219k
  if (cfd_ != nullptr) {
56
160k
    for (auto& listener : cfd_->ioptions().listeners) {
57
0
      listener->OnColumnFamilyHandleDeletionStarted(this);
58
0
    }
59
    // Job id == 0 means that this is not our background process, but rather
60
    // user thread
61
    // Need to hold some shared pointers owned by the initial_cf_options
62
    // before final cleaning up finishes.
63
160k
    ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
64
160k
    JobContext job_context(0);
65
160k
    mutex_->Lock();
66
160k
    bool dropped = cfd_->IsDropped();
67
160k
    if (cfd_->UnrefAndTryDelete()) {
68
21.9k
      if (dropped) {
69
21.9k
        db_->FindObsoleteFiles(&job_context, false, true);
70
21.9k
      }
71
21.9k
    }
72
160k
    mutex_->Unlock();
73
160k
    if (job_context.HaveSomethingToDelete()) {
74
0
      bool defer_purge =
75
0
          db_->immutable_db_options().avoid_unnecessary_blocking_io;
76
0
      db_->PurgeObsoleteFiles(job_context, defer_purge);
77
0
    }
78
160k
    job_context.Clean();
79
160k
  }
80
219k
}
81
82
1.02M
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
83
84
0
const std::string& ColumnFamilyHandleImpl::GetName() const {
85
0
  return cfd()->GetName();
86
0
}
87
88
0
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
89
  // accessing mutable cf-options requires db mutex.
90
0
  InstrumentedMutexLock l(mutex_);
91
0
  *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
92
0
  return Status::OK();
93
0
}
94
95
2.12M
const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
96
2.12M
  return cfd()->user_comparator();
97
2.12M
}
98
99
void GetInternalTblPropCollFactory(
100
    const ImmutableCFOptions& ioptions,
101
177k
    InternalTblPropCollFactories* internal_tbl_prop_coll_factories) {
102
177k
  assert(internal_tbl_prop_coll_factories);
103
104
177k
  auto& collector_factories = ioptions.table_properties_collector_factories;
105
177k
  for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
106
177k
       ++i) {
107
0
    assert(collector_factories[i]);
108
0
    internal_tbl_prop_coll_factories->emplace_back(
109
0
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
110
0
  }
111
177k
}
112
113
Status CheckCompressionSupportedWithManager(
114
102k
    CompressionType type, UnownedPtr<CompressionManager> mgr) {
115
102k
  if (mgr) {
116
0
    if (!mgr->SupportsCompressionType(type)) {
117
0
      return Status::NotSupported("Compression type " +
118
0
                                  CompressionTypeToString(type) +
119
0
                                  " is not recognized/supported by this "
120
0
                                  "version of CompressionManager " +
121
0
                                  mgr->GetId());
122
0
    }
123
102k
  } else {
124
102k
    if (!CompressionTypeSupported(type)) {
125
0
      if (type <= kLastBuiltinCompression) {
126
0
        return Status::InvalidArgument("Compression type " +
127
0
                                       CompressionTypeToString(type) +
128
0
                                       " is not linked with the binary.");
129
0
      } else {
130
0
        return Status::NotSupported(
131
0
            "Compression type " + CompressionTypeToString(type) +
132
0
            " is not recognized/supported by built-in CompressionManager.");
133
0
      }
134
0
    }
135
102k
  }
136
102k
  return Status::OK();
137
102k
}
138
139
102k
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
140
102k
  if (!cf_options.compression_per_level.empty()) {
141
0
    for (size_t level = 0; level < cf_options.compression_per_level.size();
142
0
         ++level) {
143
0
      Status s = CheckCompressionSupportedWithManager(
144
0
          cf_options.compression_per_level[level],
145
0
          cf_options.compression_manager.get());
146
0
      if (!s.ok()) {
147
0
        return s;
148
0
      }
149
0
    }
150
102k
  } else {
151
102k
    Status s = CheckCompressionSupportedWithManager(
152
102k
        cf_options.compression, cf_options.compression_manager.get());
153
102k
    if (!s.ok()) {
154
0
      return s;
155
0
    }
156
102k
  }
157
102k
  if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
158
0
    if (cf_options.compression_opts.use_zstd_dict_trainer) {
159
0
      if (!ZSTD_TrainDictionarySupported()) {
160
0
        return Status::InvalidArgument(
161
0
            "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
162
0
            "is not linked with the binary.");
163
0
      }
164
0
    } else if (!ZSTD_FinalizeDictionarySupported()) {
165
0
      return Status::InvalidArgument(
166
0
          "zstd finalizeDictionary cannot be used because ZSTD 1.4.5+ "
167
0
          "is not linked with the binary.");
168
0
    }
169
0
    if (cf_options.compression_opts.max_dict_bytes == 0) {
170
0
      return Status::InvalidArgument(
171
0
          "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
172
0
          "should be nonzero if we're using zstd's dictionary generator.");
173
0
    }
174
0
  }
175
176
102k
  if (!CompressionTypeSupported(cf_options.blob_compression_type)) {
177
0
    std::ostringstream oss;
178
0
    oss << "The specified blob compression type "
179
0
        << CompressionTypeToString(cf_options.blob_compression_type)
180
0
        << " is not available.";
181
182
0
    return Status::InvalidArgument(oss.str());
183
0
  }
184
185
102k
  return Status::OK();
186
102k
}
187
188
102k
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
189
102k
  if (cf_options.inplace_update_support) {
190
0
    return Status::InvalidArgument(
191
0
        "In-place memtable updates (inplace_update_support) is not compatible "
192
0
        "with concurrent writes (allow_concurrent_memtable_write)");
193
0
  }
194
102k
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
195
0
    return Status::InvalidArgument(
196
0
        "Memtable doesn't allow concurrent writes "
197
0
        "(allow_concurrent_memtable_write)");
198
0
  }
199
102k
  return Status::OK();
200
102k
}
201
202
Status CheckCFPathsSupported(const DBOptions& db_options,
203
102k
                             const ColumnFamilyOptions& cf_options) {
204
  // More than one cf_paths are supported only in universal
205
  // and level compaction styles. This function also checks the case
206
  // in which cf_paths is not specified, which results in db_paths
207
  // being used.
208
102k
  if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
209
102k
      (cf_options.compaction_style != kCompactionStyleLevel)) {
210
0
    if (cf_options.cf_paths.size() > 1) {
211
0
      return Status::NotSupported(
212
0
          "More than one CF paths are only supported in "
213
0
          "universal and level compaction styles. ");
214
0
    } else if (cf_options.cf_paths.empty() && db_options.db_paths.size() > 1) {
215
0
      return Status::NotSupported(
216
0
          "More than one DB paths are only supported in "
217
0
          "universal and level compaction styles. ");
218
0
    }
219
0
  }
220
102k
  return Status::OK();
221
102k
}
222
223
namespace {
224
const uint64_t kDefaultTtl = 0xfffffffffffffffe;
225
const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
226
}  // anonymous namespace
227
228
ColumnFamilyOptions SanitizeCfOptions(const ImmutableDBOptions& db_options,
229
                                      bool read_only,
230
187k
                                      const ColumnFamilyOptions& src) {
231
187k
  ColumnFamilyOptions result = src;
232
187k
  size_t clamp_max = std::conditional<
233
187k
      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
234
187k
      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
235
187k
  ClipToRange(&result.write_buffer_size, (static_cast<size_t>(64)) << 10,
236
187k
              clamp_max);
237
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
238
  // calculate a proper value from writer_buffer_size;
239
187k
  if (result.arena_block_size <= 0) {
240
187k
    result.arena_block_size =
241
187k
        std::min(size_t{1024 * 1024}, result.write_buffer_size / 8);
242
243
    // Align up to 4k
244
187k
    const size_t align = 4 * 1024;
245
187k
    result.arena_block_size =
246
187k
        ((result.arena_block_size + align - 1) / align) * align;
247
187k
  }
248
187k
  result.min_write_buffer_number_to_merge =
249
187k
      std::min(result.min_write_buffer_number_to_merge,
250
187k
               result.max_write_buffer_number - 1);
251
187k
  if (result.min_write_buffer_number_to_merge < 1) {
252
0
    result.min_write_buffer_number_to_merge = 1;
253
0
  }
254
255
187k
  if (db_options.atomic_flush && result.min_write_buffer_number_to_merge > 1) {
256
0
    ROCKS_LOG_WARN(
257
0
        db_options.logger,
258
0
        "Currently, if atomic_flush is true, then triggering flush for any "
259
0
        "column family internally (non-manual flush) will trigger flushing "
260
0
        "all column families even if the number of memtables is smaller "
261
0
        "min_write_buffer_number_to_merge. Therefore, configuring "
262
0
        "min_write_buffer_number_to_merge > 1 is not compatible and should "
263
0
        "be satinized to 1. Not doing so will lead to data loss and "
264
0
        "inconsistent state across multiple column families when WAL is "
265
0
        "disabled, which is a common setting for atomic flush");
266
267
0
    result.min_write_buffer_number_to_merge = 1;
268
0
  }
269
187k
  if (result.disallow_memtable_writes) {
270
    // A simple memtable that enforces MarkReadOnly (unlike skip list)
271
0
    result.memtable_factory = std::make_shared<VectorRepFactory>();
272
0
  }
273
274
187k
  if (result.num_levels < 1) {
275
0
    result.num_levels = 1;
276
0
  }
277
187k
  if (result.compaction_style == kCompactionStyleLevel &&
278
187k
      result.num_levels < 2) {
279
0
    result.num_levels = 2;
280
0
  }
281
282
187k
  if (result.compaction_style == kCompactionStyleUniversal &&
283
187k
      db_options.allow_ingest_behind && result.num_levels < 3) {
284
0
    result.num_levels = 3;
285
0
  }
286
287
187k
  if (result.max_write_buffer_number < 2) {
288
0
    result.max_write_buffer_number = 2;
289
0
  }
290
187k
  if (result.max_write_buffer_size_to_maintain < 0) {
291
0
    result.max_write_buffer_size_to_maintain =
292
0
        result.max_write_buffer_number *
293
0
        static_cast<int64_t>(result.write_buffer_size);
294
0
  }
295
  // bloom filter size shouldn't exceed 1/4 of memtable size.
296
187k
  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
297
0
    result.memtable_prefix_bloom_size_ratio = 0.25;
298
187k
  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
299
0
    result.memtable_prefix_bloom_size_ratio = 0;
300
0
  }
301
302
187k
  if (!result.prefix_extractor) {
303
187k
    assert(result.memtable_factory);
304
187k
    Slice name = result.memtable_factory->Name();
305
187k
    if (name.compare("HashSkipListRepFactory") == 0 ||
306
187k
        name.compare("HashLinkListRepFactory") == 0) {
307
0
      result.memtable_factory = std::make_shared<SkipListFactory>();
308
0
    }
309
187k
  }
310
311
187k
  if (result.compaction_style == kCompactionStyleFIFO) {
312
    // since we delete level0 files in FIFO compaction when there are too many
313
    // of them, these options don't really mean anything
314
0
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
315
0
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
316
0
  }
317
318
187k
  if (result.max_bytes_for_level_multiplier <= 0) {
319
0
    result.max_bytes_for_level_multiplier = 1;
320
0
  }
321
322
187k
  if (result.level0_file_num_compaction_trigger == 0) {
323
0
    ROCKS_LOG_WARN(db_options.logger,
324
0
                   "level0_file_num_compaction_trigger cannot be 0");
325
0
    result.level0_file_num_compaction_trigger = 1;
326
0
  }
327
328
187k
  if (result.level0_stop_writes_trigger <
329
187k
          result.level0_slowdown_writes_trigger ||
330
187k
      result.level0_slowdown_writes_trigger <
331
187k
          result.level0_file_num_compaction_trigger) {
332
0
    ROCKS_LOG_WARN(db_options.logger,
333
0
                   "This condition must be satisfied: "
334
0
                   "level0_stop_writes_trigger(%d) >= "
335
0
                   "level0_slowdown_writes_trigger(%d) >= "
336
0
                   "level0_file_num_compaction_trigger(%d)",
337
0
                   result.level0_stop_writes_trigger,
338
0
                   result.level0_slowdown_writes_trigger,
339
0
                   result.level0_file_num_compaction_trigger);
340
0
    if (result.level0_slowdown_writes_trigger <
341
0
        result.level0_file_num_compaction_trigger) {
342
0
      result.level0_slowdown_writes_trigger =
343
0
          result.level0_file_num_compaction_trigger;
344
0
    }
345
0
    if (result.level0_stop_writes_trigger <
346
0
        result.level0_slowdown_writes_trigger) {
347
0
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
348
0
    }
349
0
    ROCKS_LOG_WARN(db_options.logger,
350
0
                   "Adjust the value to "
351
0
                   "level0_stop_writes_trigger(%d) "
352
0
                   "level0_slowdown_writes_trigger(%d) "
353
0
                   "level0_file_num_compaction_trigger(%d)",
354
0
                   result.level0_stop_writes_trigger,
355
0
                   result.level0_slowdown_writes_trigger,
356
0
                   result.level0_file_num_compaction_trigger);
357
0
  }
358
359
187k
  if (result.soft_pending_compaction_bytes_limit == 0) {
360
0
    result.soft_pending_compaction_bytes_limit =
361
0
        result.hard_pending_compaction_bytes_limit;
362
187k
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
363
187k
             result.soft_pending_compaction_bytes_limit >
364
187k
                 result.hard_pending_compaction_bytes_limit) {
365
0
    result.soft_pending_compaction_bytes_limit =
366
0
        result.hard_pending_compaction_bytes_limit;
367
0
  }
368
369
  // When the DB is stopped, it's possible that there are some .trash files that
370
  // were not deleted yet, when we open the DB we will find these .trash files
371
  // and schedule them to be deleted (or delete immediately if SstFileManager
372
  // was not used)
373
187k
  auto sfm =
374
187k
      static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
375
187k
  for (size_t i = 0; i < result.cf_paths.size(); i++) {
376
0
    DeleteScheduler::CleanupDirectory(db_options.env, sfm,
377
0
                                      result.cf_paths[i].path)
378
0
        .PermitUncheckedError();
379
0
  }
380
381
187k
  if (result.cf_paths.empty()) {
382
187k
    result.cf_paths = db_options.db_paths;
383
187k
  }
384
385
187k
  if (result.level_compaction_dynamic_level_bytes) {
386
187k
    if (result.compaction_style != kCompactionStyleLevel) {
387
0
      ROCKS_LOG_INFO(db_options.info_log.get(),
388
0
                     "level_compaction_dynamic_level_bytes only makes sense "
389
0
                     "for level-based compaction");
390
0
      result.level_compaction_dynamic_level_bytes = false;
391
187k
    } else if (result.cf_paths.size() > 1U) {
392
      // we don't yet know how to make both of this feature and multiple
393
      // DB path work.
394
0
      ROCKS_LOG_WARN(db_options.info_log.get(),
395
0
                     "multiple cf_paths/db_paths and "
396
0
                     "level_compaction_dynamic_level_bytes "
397
0
                     "can't be used together");
398
0
      result.level_compaction_dynamic_level_bytes = false;
399
0
    }
400
187k
  }
401
402
187k
  if (result.max_compaction_bytes == 0) {
403
187k
    result.max_compaction_bytes = result.target_file_size_base * 25;
404
187k
  }
405
406
187k
  bool is_block_based_table = (result.table_factory->IsInstanceOf(
407
187k
      TableFactory::kBlockBasedTableName()));
408
409
187k
  const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
410
187k
  if (result.ttl == kDefaultTtl) {
411
187k
    if (is_block_based_table) {
412
      // FIFO also requires max_open_files=-1, which is checked in
413
      // ValidateOptions().
414
187k
      result.ttl = kAdjustedTtl;
415
187k
    } else {
416
0
      result.ttl = 0;
417
0
    }
418
187k
  }
419
420
187k
  const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
421
187k
  if (result.compaction_style == kCompactionStyleLevel) {
422
187k
    if ((result.compaction_filter != nullptr ||
423
187k
         result.compaction_filter_factory != nullptr) &&
424
187k
        result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
425
187k
        is_block_based_table) {
426
0
      result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
427
0
    }
428
187k
  } else if (result.compaction_style == kCompactionStyleUniversal) {
429
0
    if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
430
0
        is_block_based_table) {
431
0
      result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
432
0
    }
433
0
  } else if (result.compaction_style == kCompactionStyleFIFO) {
434
0
    if (result.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
435
0
      ROCKS_LOG_WARN(
436
0
          db_options.info_log.get(),
437
0
          "periodic_compaction_seconds does not support FIFO compaction. You"
438
0
          "may want to set option TTL instead.");
439
0
    }
440
0
    if (result.last_level_temperature != Temperature::kUnknown) {
441
0
      ROCKS_LOG_WARN(
442
0
          db_options.info_log.get(),
443
0
          "last_level_temperature is ignored with FIFO compaction. Consider "
444
0
          "CompactionOptionsFIFO::file_temperature_age_thresholds.");
445
0
      result.last_level_temperature = Temperature::kUnknown;
446
0
    }
447
0
  }
448
449
  // For universal compaction, `ttl` and `periodic_compaction_seconds` mean the
450
  // same thing, take the stricter value.
451
187k
  if (result.compaction_style == kCompactionStyleUniversal) {
452
0
    if (result.periodic_compaction_seconds == 0) {
453
0
      result.periodic_compaction_seconds = result.ttl;
454
0
    } else if (result.ttl != 0) {
455
0
      result.periodic_compaction_seconds =
456
0
          std::min(result.ttl, result.periodic_compaction_seconds);
457
0
    }
458
0
  }
459
460
187k
  if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
461
187k
    result.periodic_compaction_seconds = 0;
462
187k
  }
463
464
187k
  if (read_only && (result.preserve_internal_time_seconds > 0 ||
465
58.3k
                    result.preclude_last_level_data_seconds > 0)) {
466
    // With no writes coming in, we don't need periodic SeqnoToTime entries.
467
    // Existing SST files may or may not have that info associated with them.
468
0
    ROCKS_LOG_WARN(
469
0
        db_options.info_log.get(),
470
0
        "preserve_internal_time_seconds and preclude_last_level_data_seconds "
471
0
        "are ignored in read-only DB");
472
0
    result.preserve_internal_time_seconds = 0;
473
0
    result.preclude_last_level_data_seconds = 0;
474
0
  }
475
476
187k
  if (read_only) {
477
58.3k
    if (result.memtable_op_scan_flush_trigger) {
478
0
      ROCKS_LOG_WARN(db_options.info_log.get(),
479
0
                     "option memtable_op_scan_flush_trigger is sanitized to "
480
0
                     "0(disabled) for read only DB.");
481
0
      result.memtable_op_scan_flush_trigger = 0;
482
0
    }
483
58.3k
    if (result.memtable_avg_op_scan_flush_trigger) {
484
0
      ROCKS_LOG_WARN(
485
0
          db_options.info_log.get(),
486
0
          "option memtable_avg_op_scan_flush_trigger is sanitized to "
487
0
          "0(disabled) for read only DB.");
488
0
      result.memtable_avg_op_scan_flush_trigger = 0;
489
0
    }
490
58.3k
  }
491
187k
  return result;
492
187k
}
493
494
int SuperVersion::dummy = 0;
495
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
496
void* const SuperVersion::kSVObsolete = nullptr;
497
498
118k
SuperVersion::~SuperVersion() {
499
118k
  for (auto td : to_delete) {
500
2.23k
    delete td;
501
2.23k
  }
502
118k
}
503
504
65.9k
SuperVersion* SuperVersion::Ref() {
505
65.9k
  refs.fetch_add(1, std::memory_order_relaxed);
506
65.9k
  return this;
507
65.9k
}
508
509
179k
bool SuperVersion::Unref() {
510
  // fetch_sub returns the previous value of ref
511
179k
  uint32_t previous_refs = refs.fetch_sub(1);
512
179k
  assert(previous_refs > 0);
513
179k
  return previous_refs == 1;
514
179k
}
515
516
113k
void SuperVersion::Cleanup() {
517
113k
  assert(refs.load(std::memory_order_relaxed) == 0);
518
  // Since this SuperVersion object is being deleted,
519
  // decrement reference to the immutable MemtableList
520
  // this SV object was pointing to.
521
113k
  imm->Unref(&to_delete);
522
113k
  ReadOnlyMemTable* m = mem->Unref();
523
113k
  if (m != nullptr) {
524
0
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
525
0
    assert(*memory_usage >= m->ApproximateMemoryUsage());
526
0
    *memory_usage -= m->ApproximateMemoryUsage();
527
0
    to_delete.push_back(m);
528
0
  }
529
113k
  current->Unref();
530
113k
  cfd->UnrefAndTryDelete();
531
113k
}
532
533
void SuperVersion::Init(
534
    ColumnFamilyData* new_cfd, MemTable* new_mem, MemTableListVersion* new_imm,
535
    Version* new_current,
536
113k
    std::shared_ptr<const SeqnoToTimeMapping> new_seqno_to_time_mapping) {
537
113k
  cfd = new_cfd;
538
113k
  mem = new_mem;
539
113k
  imm = new_imm;
540
113k
  current = new_current;
541
113k
  full_history_ts_low = cfd->GetFullHistoryTsLow();
542
113k
  seqno_to_time_mapping = std::move(new_seqno_to_time_mapping);
543
113k
  cfd->Ref();
544
113k
  mem->Ref();
545
113k
  imm->Ref();
546
113k
  current->Ref();
547
113k
  refs.store(1, std::memory_order_relaxed);
548
549
  // There should be at least one mapping entry iff time tracking is enabled.
550
#ifndef NDEBUG
551
  MinAndMaxPreserveSeconds preserve_info{mutable_cf_options};
552
  if (preserve_info.IsEnabled()) {
553
    assert(seqno_to_time_mapping);
554
    assert(!seqno_to_time_mapping->Empty());
555
  } else {
556
    assert(seqno_to_time_mapping == nullptr);
557
  }
558
#endif  // NDEBUG
559
113k
}
560
561
namespace {
562
32.6k
void SuperVersionUnrefHandle(void* ptr) {
563
  // UnrefHandle is called when a thread exits or a ThreadLocalPtr gets
564
  // destroyed. When the former happens, the thread shouldn't see kSVInUse.
565
  // When the latter happens, only super_version_ holds a reference
566
  // to ColumnFamilyData, so no further queries are possible.
567
32.6k
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
568
32.6k
  bool was_last_ref __attribute__((__unused__));
569
32.6k
  was_last_ref = sv->Unref();
570
  // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
571
  // This is important because we can't do SuperVersion cleanup here.
572
  // That would require locking DB mutex, which would deadlock because
573
  // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
574
32.6k
  assert(!was_last_ref);
575
32.6k
}
576
}  // anonymous namespace
577
578
237k
std::vector<std::string> ColumnFamilyData::GetDbPaths() const {
579
237k
  std::vector<std::string> paths;
580
237k
  paths.reserve(ioptions_.cf_paths.size());
581
237k
  for (const DbPath& db_path : ioptions_.cf_paths) {
582
237k
    paths.emplace_back(db_path.path);
583
237k
  }
584
237k
  return paths;
585
237k
}
586
587
const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId =
588
    std::numeric_limits<uint32_t>::max();
589
590
ColumnFamilyData::ColumnFamilyData(
591
    uint32_t id, const std::string& name, Version* _dummy_versions,
592
    Cache* _table_cache, WriteBufferManager* write_buffer_manager,
593
    const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
594
    const FileOptions* file_options, ColumnFamilySet* column_family_set,
595
    BlockCacheTracer* const block_cache_tracer,
596
    const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id,
597
    const std::string& db_session_id, bool read_only)
598
177k
    : id_(id),
599
177k
      name_(name),
600
177k
      dummy_versions_(_dummy_versions),
601
177k
      current_(nullptr),
602
177k
      refs_(0),
603
177k
      initialized_(false),
604
177k
      dropped_(false),
605
177k
      flush_skip_reschedule_(false),
606
177k
      internal_comparator_(cf_options.comparator),
607
177k
      initial_cf_options_(SanitizeCfOptions(db_options, read_only, cf_options)),
608
177k
      ioptions_(db_options, initial_cf_options_),
609
177k
      mutable_cf_options_(initial_cf_options_),
610
      is_delete_range_supported_(
611
177k
          cf_options.table_factory->IsDeleteRangeSupported()),
612
177k
      write_buffer_manager_(write_buffer_manager),
613
177k
      mem_(nullptr),
614
177k
      imm_(ioptions_.min_write_buffer_number_to_merge,
615
177k
           ioptions_.max_write_buffer_size_to_maintain),
616
177k
      super_version_(nullptr),
617
177k
      super_version_number_(0),
618
177k
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
619
177k
      next_(nullptr),
620
177k
      prev_(nullptr),
621
177k
      log_number_(0),
622
177k
      column_family_set_(column_family_set),
623
177k
      queued_for_flush_(false),
624
177k
      queued_for_compaction_(false),
625
177k
      prev_compaction_needed_bytes_(0),
626
177k
      allow_2pc_(db_options.allow_2pc),
627
177k
      last_memtable_id_(0),
628
177k
      db_paths_registered_(false),
629
177k
      mempurge_used_(false),
630
177k
      next_epoch_number_(1) {
631
177k
  if (id_ != kDummyColumnFamilyDataId) {
632
    // TODO(cc): RegisterDbPaths can be expensive, considering moving it
633
    // outside of this constructor which might be called with db mutex held.
634
    // TODO(cc): considering using ioptions_.fs, currently some tests rely on
635
    // EnvWrapper, that's the main reason why we use env here.
636
118k
    Status s = ioptions_.env->RegisterDbPaths(GetDbPaths());
637
118k
    if (s.ok()) {
638
118k
      db_paths_registered_ = true;
639
118k
    } else {
640
0
      ROCKS_LOG_ERROR(
641
0
          ioptions_.logger,
642
0
          "Failed to register data paths of column family (id: %d, name: %s)",
643
0
          id_, name_.c_str());
644
0
    }
645
118k
  }
646
177k
  Ref();
647
648
  // Convert user defined table properties collector factories to internal ones.
649
177k
  GetInternalTblPropCollFactory(ioptions_, &internal_tbl_prop_coll_factories_);
650
651
  // if _dummy_versions is nullptr, then this is a dummy column family.
652
177k
  if (_dummy_versions != nullptr) {
653
118k
    internal_stats_.reset(
654
118k
        new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
655
118k
    table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
656
118k
                                      block_cache_tracer, io_tracer,
657
118k
                                      db_session_id));
658
118k
    blob_file_cache_.reset(
659
118k
        new BlobFileCache(_table_cache, &ioptions(), soptions(), id_,
660
118k
                          internal_stats_->GetBlobFileReadHist(), io_tracer));
661
118k
    blob_source_.reset(new BlobSource(ioptions_, mutable_cf_options_, db_id,
662
118k
                                      db_session_id, blob_file_cache_.get()));
663
664
118k
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
665
118k
      compaction_picker_.reset(
666
118k
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
667
118k
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
668
0
      compaction_picker_.reset(
669
0
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
670
0
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
671
0
      compaction_picker_.reset(
672
0
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
673
0
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
674
0
      compaction_picker_.reset(
675
0
          new NullCompactionPicker(ioptions_, &internal_comparator_));
676
0
      ROCKS_LOG_WARN(ioptions_.logger,
677
0
                     "Column family %s does not use any background compaction. "
678
0
                     "Compactions can only be done via CompactFiles\n",
679
0
                     GetName().c_str());
680
0
    } else {
681
0
      ROCKS_LOG_ERROR(ioptions_.logger,
682
0
                      "Unable to recognize the specified compaction style %d. "
683
0
                      "Column family %s will use kCompactionStyleLevel.\n",
684
0
                      ioptions_.compaction_style, GetName().c_str());
685
0
      compaction_picker_.reset(
686
0
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
687
0
    }
688
689
118k
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
690
118k
      ROCKS_LOG_INFO(ioptions_.logger,
691
118k
                     "--------------- Options for column family [%s]:\n",
692
118k
                     name.c_str());
693
118k
      initial_cf_options_.Dump(ioptions_.logger);
694
118k
    } else {
695
0
      ROCKS_LOG_INFO(ioptions_.logger, "\t(skipping printing options)\n");
696
0
    }
697
118k
  }
698
699
177k
  RecalculateWriteStallConditions(mutable_cf_options_);
700
701
177k
  if (cf_options.table_factory->IsInstanceOf(
702
177k
          TableFactory::kBlockBasedTableName()) &&
703
177k
      cf_options.table_factory->GetOptions<BlockBasedTableOptions>()) {
704
177k
    const BlockBasedTableOptions* bbto =
705
177k
        cf_options.table_factory->GetOptions<BlockBasedTableOptions>();
706
177k
    const auto& options_overrides = bbto->cache_usage_options.options_overrides;
707
177k
    const auto file_metadata_charged =
708
177k
        options_overrides.at(CacheEntryRole::kFileMetadata).charged;
709
177k
    if (bbto->block_cache &&
710
177k
        file_metadata_charged == CacheEntryRoleOptions::Decision::kEnabled) {
711
      // TODO(hx235): Add a `ConcurrentCacheReservationManager` at DB scope
712
      // responsible for reservation of `ObsoleteFileInfo` so that we can keep
713
      // this `file_metadata_cache_res_mgr_` nonconcurrent
714
0
      file_metadata_cache_res_mgr_.reset(new ConcurrentCacheReservationManager(
715
0
          std::make_shared<
716
0
              CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>>(
717
0
              bbto->block_cache)));
718
0
    }
719
177k
  }
720
177k
}
721
722
// DB mutex held
723
177k
ColumnFamilyData::~ColumnFamilyData() {
724
177k
  assert(refs_.load(std::memory_order_relaxed) == 0);
725
  // remove from linked list
726
177k
  auto prev = prev_;
727
177k
  auto next = next_;
728
177k
  prev->next_ = next;
729
177k
  next->prev_ = prev;
730
731
177k
  if (!dropped_ && column_family_set_ != nullptr) {
732
    // If it's dropped, it's already removed from column family set
733
    // If column_family_set_ == nullptr, this is dummy CFD and not in
734
    // ColumnFamilySet
735
80.3k
    column_family_set_->RemoveColumnFamily(this);
736
80.3k
  }
737
738
177k
  if (current_ != nullptr) {
739
118k
    current_->Unref();
740
118k
  }
741
742
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
743
  // compaction_queue_ and we destroyed it
744
177k
  assert(!queued_for_flush_);
745
177k
  assert(!queued_for_compaction_);
746
177k
  assert(super_version_ == nullptr);
747
748
177k
  if (dummy_versions_ != nullptr) {
749
    // List must be empty
750
118k
    assert(dummy_versions_->Next() == dummy_versions_);
751
118k
    bool deleted __attribute__((__unused__));
752
118k
    deleted = dummy_versions_->Unref();
753
118k
    assert(deleted);
754
118k
  }
755
756
177k
  if (mem_ != nullptr) {
757
118k
    delete mem_->Unref();
758
118k
  }
759
177k
  autovector<ReadOnlyMemTable*> to_delete;
760
177k
  imm_.current()->Unref(&to_delete);
761
177k
  for (auto* m : to_delete) {
762
0
    delete m;
763
0
  }
764
765
177k
  if (db_paths_registered_) {
766
    // TODO(cc): considering using ioptions_.fs, currently some tests rely on
767
    // EnvWrapper, that's the main reason why we use env here.
768
118k
    Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
769
118k
    if (!s.ok()) {
770
0
      ROCKS_LOG_ERROR(
771
0
          ioptions_.logger,
772
0
          "Failed to unregister data paths of column family (id: %d, name: %s)",
773
0
          id_, name_.c_str());
774
0
    }
775
118k
  }
776
177k
}
777
778
471k
bool ColumnFamilyData::UnrefAndTryDelete() {
779
471k
  int old_refs = refs_.fetch_sub(1);
780
471k
  assert(old_refs > 0);
781
782
471k
  if (old_refs == 1) {
783
177k
    assert(super_version_ == nullptr);
784
177k
    delete this;
785
177k
    return true;
786
177k
  }
787
788
294k
  if (old_refs == 2 && super_version_ != nullptr) {
789
    // Only the super_version_ holds me
790
102k
    SuperVersion* sv = super_version_;
791
102k
    super_version_ = nullptr;
792
793
    // Release SuperVersion references kept in ThreadLocalPtr.
794
102k
    local_sv_.reset();
795
796
102k
    if (sv->Unref()) {
797
      // Note: sv will delete this ColumnFamilyData during Cleanup()
798
102k
      assert(sv->cfd == this);
799
102k
      sv->Cleanup();
800
102k
      delete sv;
801
102k
      return true;
802
102k
    }
803
102k
  }
804
192k
  return false;
805
294k
}
806
807
38.4k
void ColumnFamilyData::SetDropped() {
808
  // can't drop default CF
809
38.4k
  assert(id_ != 0);
810
38.4k
  dropped_ = true;
811
38.4k
  write_controller_token_.reset();
812
813
  // remove from column_family_set
814
38.4k
  column_family_set_->RemoveColumnFamily(this);
815
38.4k
}
816
817
248k
ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
818
248k
  return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
819
248k
}
820
821
0
uint64_t ColumnFamilyData::OldestLogToKeep() {
822
0
  auto current_log = GetLogNumber();
823
824
0
  if (allow_2pc_) {
825
0
    auto imm_prep_log = imm()->PrecomputeMinLogContainingPrepSection();
826
0
    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
827
828
0
    if (imm_prep_log > 0 && imm_prep_log < current_log) {
829
0
      current_log = imm_prep_log;
830
0
    }
831
832
0
    if (mem_prep_log > 0 && mem_prep_log < current_log) {
833
0
      current_log = mem_prep_log;
834
0
    }
835
0
  }
836
837
0
  return current_log;
838
0
}
839
840
const double kIncSlowdownRatio = 0.8;
841
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
842
const double kNearStopSlowdownRatio = 0.6;
843
const double kDelayRecoverSlowdownRatio = 1.4;
844
845
namespace {
846
// If penalize_stop is true, we further reduce slowdown rate.
847
std::unique_ptr<WriteControllerToken> SetupDelay(
848
    WriteController* write_controller, uint64_t compaction_needed_bytes,
849
    uint64_t prev_compaction_need_bytes, bool penalize_stop,
850
72
    bool auto_compactions_disabled) {
851
72
  const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
852
853
72
  uint64_t max_write_rate = write_controller->max_delayed_write_rate();
854
72
  uint64_t write_rate = write_controller->delayed_write_rate();
855
856
72
  if (auto_compactions_disabled) {
857
    // When auto compaction is disabled, always use the value user gave.
858
0
    write_rate = max_write_rate;
859
72
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
860
    // If user gives rate less than kMinWriteRate, don't adjust it.
861
    //
862
    // If already delayed, need to adjust based on previous compaction debt.
863
    // When there are two or more column families require delay, we always
864
    // increase or reduce write rate based on information for one single
865
    // column family. It is likely to be OK but we can improve if there is a
866
    // problem.
867
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
868
    // is only available in level-based compaction
869
    //
870
    // If the compaction debt stays the same as previously, we also further slow
871
    // down. It usually means a mem table is full. It's mainly for the case
872
    // where both of flush and compaction are much slower than the speed we
873
    // insert to mem tables, so we need to actively slow down before we get
874
    // feedback signal from compaction and flushes to avoid the full stop
875
    // because of hitting the max write buffer number.
876
    //
877
    // If DB just falled into the stop condition, we need to further reduce
878
    // the write rate to avoid the stop condition.
879
0
    if (penalize_stop) {
880
      // Penalize the near stop or stop condition by more aggressive slowdown.
881
      // This is to provide the long term slowdown increase signal.
882
      // The penalty is more than the reward of recovering to the normal
883
      // condition.
884
0
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
885
0
                                         kNearStopSlowdownRatio);
886
0
      if (write_rate < kMinWriteRate) {
887
0
        write_rate = kMinWriteRate;
888
0
      }
889
0
    } else if (prev_compaction_need_bytes > 0 &&
890
0
               prev_compaction_need_bytes <= compaction_needed_bytes) {
891
0
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
892
0
                                         kIncSlowdownRatio);
893
0
      if (write_rate < kMinWriteRate) {
894
0
        write_rate = kMinWriteRate;
895
0
      }
896
0
    } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
897
      // We are speeding up by ratio of kSlowdownRatio when we have paid
898
      // compaction debt. But we'll never speed up to faster than the write rate
899
      // given by users.
900
0
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
901
0
                                         kDecSlowdownRatio);
902
0
      if (write_rate > max_write_rate) {
903
0
        write_rate = max_write_rate;
904
0
      }
905
0
    }
906
0
  }
907
72
  return write_controller->GetDelayToken(write_rate);
908
72
}
909
910
int GetL0FileCountForCompactionSpeedup(int level0_file_num_compaction_trigger,
911
113k
                                       int level0_slowdown_writes_trigger) {
912
  // SanitizeOptions() ensures it.
913
113k
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
914
915
113k
  if (level0_file_num_compaction_trigger < 0) {
916
0
    return std::numeric_limits<int>::max();
917
0
  }
918
919
113k
  const int64_t twice_level0_trigger =
920
113k
      static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
921
922
113k
  const int64_t one_fourth_trigger_slowdown =
923
113k
      static_cast<int64_t>(level0_file_num_compaction_trigger) +
924
113k
      ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
925
113k
       4);
926
927
113k
  assert(twice_level0_trigger >= 0);
928
113k
  assert(one_fourth_trigger_slowdown >= 0);
929
930
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
931
  // condition.
932
  // Or twice as compaction trigger, if it is smaller.
933
113k
  int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
934
113k
  if (res >= std::numeric_limits<int32_t>::max()) {
935
0
    return std::numeric_limits<int32_t>::max();
936
113k
  } else {
937
    // res fits in int
938
113k
    return static_cast<int>(res);
939
113k
  }
940
113k
}
941
942
uint64_t GetPendingCompactionBytesForCompactionSpeedup(
943
    const MutableCFOptions& mutable_cf_options,
944
112k
    const VersionStorageInfo* vstorage) {
945
  // Compaction debt relatively large compared to the stable (bottommost) data
946
  // size indicates compaction fell behind.
947
112k
  const uint64_t kBottommostSizeDivisor = 8;
948
  // Meaningful progress toward the slowdown trigger is another good indication.
949
112k
  const uint64_t kSlowdownTriggerDivisor = 4;
950
951
112k
  uint64_t bottommost_files_size = 0;
952
112k
  for (const auto& level_and_file : vstorage->BottommostFiles()) {
953
81.4k
    bottommost_files_size += level_and_file.second->fd.GetFileSize();
954
81.4k
  }
955
956
  // Slowdown trigger might be zero but that means compaction speedup should
957
  // always happen (undocumented/historical), so no special treatment is needed.
958
112k
  uint64_t slowdown_threshold =
959
112k
      mutable_cf_options.soft_pending_compaction_bytes_limit /
960
112k
      kSlowdownTriggerDivisor;
961
962
  // Size of zero, however, should not be used to decide to speedup compaction.
963
112k
  if (bottommost_files_size == 0) {
964
61.9k
    return slowdown_threshold;
965
61.9k
  }
966
967
  // Prevent a small CF from triggering parallel compactions for other CFs.
968
  // Require compaction debt to be more than a full L0 to Lbase compaction.
969
50.4k
  const uint64_t kMinDebtSize = 2 * mutable_cf_options.max_bytes_for_level_base;
970
50.4k
  uint64_t size_threshold =
971
50.4k
      std::max(bottommost_files_size / kBottommostSizeDivisor, kMinDebtSize);
972
50.4k
  return std::min(size_threshold, slowdown_threshold);
973
112k
}
974
975
112k
uint64_t GetMarkedFileCountForCompactionSpeedup() {
976
  // When just one file is marked, it is not clear that parallel compaction will
977
  // help the compaction that the user nicely requested to happen sooner. When
978
  // multiple files are marked, however, it is pretty clearly helpful, except
979
  // for the rare case in which a single compaction grabs all the marked files.
980
112k
  return 2;
981
112k
}
982
}  // anonymous namespace
983
984
std::pair<WriteStallCondition, WriteStallCause>
985
ColumnFamilyData::GetWriteStallConditionAndCause(
986
    int num_unflushed_memtables, int num_l0_files,
987
    uint64_t num_compaction_needed_bytes,
988
    const MutableCFOptions& mutable_cf_options,
989
113k
    const ImmutableCFOptions& immutable_cf_options) {
990
113k
  if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
991
0
    return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
992
113k
  } else if (!mutable_cf_options.disable_auto_compactions &&
993
113k
             num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
994
0
    return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
995
113k
  } else if (!mutable_cf_options.disable_auto_compactions &&
996
113k
             mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
997
113k
             num_compaction_needed_bytes >=
998
113k
                 mutable_cf_options.hard_pending_compaction_bytes_limit) {
999
0
    return {WriteStallCondition::kStopped,
1000
0
            WriteStallCause::kPendingCompactionBytes};
1001
113k
  } else if (mutable_cf_options.max_write_buffer_number > 3 &&
1002
113k
             num_unflushed_memtables >=
1003
0
                 mutable_cf_options.max_write_buffer_number - 1 &&
1004
113k
             num_unflushed_memtables - 1 >=
1005
0
                 immutable_cf_options.min_write_buffer_number_to_merge) {
1006
0
    return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
1007
113k
  } else if (!mutable_cf_options.disable_auto_compactions &&
1008
113k
             mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
1009
113k
             num_l0_files >=
1010
113k
                 mutable_cf_options.level0_slowdown_writes_trigger) {
1011
72
    return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
1012
113k
  } else if (!mutable_cf_options.disable_auto_compactions &&
1013
113k
             mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
1014
113k
             num_compaction_needed_bytes >=
1015
113k
                 mutable_cf_options.soft_pending_compaction_bytes_limit) {
1016
0
    return {WriteStallCondition::kDelayed,
1017
0
            WriteStallCause::kPendingCompactionBytes};
1018
0
  }
1019
113k
  return {WriteStallCondition::kNormal, WriteStallCause::kNone};
1020
113k
}
1021
1022
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
1023
290k
    const MutableCFOptions& mutable_cf_options) {
1024
290k
  auto write_stall_condition = WriteStallCondition::kNormal;
1025
290k
  if (current_ != nullptr) {
1026
113k
    auto* vstorage = current_->storage_info();
1027
113k
    auto write_controller = column_family_set_->write_controller_;
1028
113k
    uint64_t compaction_needed_bytes =
1029
113k
        vstorage->estimated_compaction_needed_bytes();
1030
1031
113k
    auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
1032
113k
        imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
1033
113k
        vstorage->estimated_compaction_needed_bytes(), mutable_cf_options,
1034
113k
        ioptions());
1035
113k
    write_stall_condition = write_stall_condition_and_cause.first;
1036
113k
    auto write_stall_cause = write_stall_condition_and_cause.second;
1037
1038
113k
    bool was_stopped = write_controller->IsStopped();
1039
113k
    bool needed_delay = write_controller->NeedsDelay();
1040
1041
113k
    if (write_stall_condition == WriteStallCondition::kStopped &&
1042
113k
        write_stall_cause == WriteStallCause::kMemtableLimit) {
1043
0
      write_controller_token_ = write_controller->GetStopToken();
1044
0
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
1045
0
      ROCKS_LOG_WARN(
1046
0
          ioptions_.logger,
1047
0
          "[%s] Stopping writes because we have %d immutable memtables "
1048
0
          "(waiting for flush), max_write_buffer_number is set to %d",
1049
0
          name_.c_str(), imm()->NumNotFlushed(),
1050
0
          mutable_cf_options.max_write_buffer_number);
1051
113k
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
1052
113k
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
1053
0
      write_controller_token_ = write_controller->GetStopToken();
1054
0
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
1055
0
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
1056
0
        internal_stats_->AddCFStats(
1057
0
            InternalStats::L0_FILE_COUNT_LIMIT_STOPS_WITH_ONGOING_COMPACTION,
1058
0
            1);
1059
0
      }
1060
0
      ROCKS_LOG_WARN(ioptions_.logger,
1061
0
                     "[%s] Stopping writes because we have %d level-0 files",
1062
0
                     name_.c_str(), vstorage->l0_delay_trigger_count());
1063
113k
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
1064
113k
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
1065
0
      write_controller_token_ = write_controller->GetStopToken();
1066
0
      internal_stats_->AddCFStats(
1067
0
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
1068
0
      ROCKS_LOG_WARN(
1069
0
          ioptions_.logger,
1070
0
          "[%s] Stopping writes because of estimated pending compaction "
1071
0
          "bytes %" PRIu64,
1072
0
          name_.c_str(), compaction_needed_bytes);
1073
113k
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
1074
113k
               write_stall_cause == WriteStallCause::kMemtableLimit) {
1075
0
      write_controller_token_ =
1076
0
          SetupDelay(write_controller, compaction_needed_bytes,
1077
0
                     prev_compaction_needed_bytes_, was_stopped,
1078
0
                     mutable_cf_options.disable_auto_compactions);
1079
0
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_DELAYS, 1);
1080
0
      ROCKS_LOG_WARN(
1081
0
          ioptions_.logger,
1082
0
          "[%s] Stalling writes because we have %d immutable memtables "
1083
0
          "(waiting for flush), max_write_buffer_number is set to %d "
1084
0
          "rate %" PRIu64,
1085
0
          name_.c_str(), imm()->NumNotFlushed(),
1086
0
          mutable_cf_options.max_write_buffer_number,
1087
0
          write_controller->delayed_write_rate());
1088
113k
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
1089
113k
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
1090
      // L0 is the last two files from stopping.
1091
72
      bool near_stop = vstorage->l0_delay_trigger_count() >=
1092
72
                       mutable_cf_options.level0_stop_writes_trigger - 2;
1093
72
      write_controller_token_ =
1094
72
          SetupDelay(write_controller, compaction_needed_bytes,
1095
72
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
1096
72
                     mutable_cf_options.disable_auto_compactions);
1097
72
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_DELAYS, 1);
1098
72
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
1099
0
        internal_stats_->AddCFStats(
1100
0
            InternalStats::L0_FILE_COUNT_LIMIT_DELAYS_WITH_ONGOING_COMPACTION,
1101
0
            1);
1102
0
      }
1103
72
      ROCKS_LOG_WARN(ioptions_.logger,
1104
72
                     "[%s] Stalling writes because we have %d level-0 files "
1105
72
                     "rate %" PRIu64,
1106
72
                     name_.c_str(), vstorage->l0_delay_trigger_count(),
1107
72
                     write_controller->delayed_write_rate());
1108
113k
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
1109
113k
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
1110
      // If the distance to hard limit is less than 1/4 of the gap between soft
1111
      // and
1112
      // hard bytes limit, we think it is near stop and speed up the slowdown.
1113
0
      bool near_stop =
1114
0
          mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
1115
0
          (compaction_needed_bytes -
1116
0
           mutable_cf_options.soft_pending_compaction_bytes_limit) >
1117
0
              3 *
1118
0
                  (mutable_cf_options.hard_pending_compaction_bytes_limit -
1119
0
                   mutable_cf_options.soft_pending_compaction_bytes_limit) /
1120
0
                  4;
1121
1122
0
      write_controller_token_ =
1123
0
          SetupDelay(write_controller, compaction_needed_bytes,
1124
0
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
1125
0
                     mutable_cf_options.disable_auto_compactions);
1126
0
      internal_stats_->AddCFStats(
1127
0
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_DELAYS, 1);
1128
0
      ROCKS_LOG_WARN(
1129
0
          ioptions_.logger,
1130
0
          "[%s] Stalling writes because of estimated pending compaction "
1131
0
          "bytes %" PRIu64 " rate %" PRIu64,
1132
0
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
1133
0
          write_controller->delayed_write_rate());
1134
113k
    } else {
1135
113k
      assert(write_stall_condition == WriteStallCondition::kNormal);
1136
113k
      if (vstorage->l0_delay_trigger_count() >=
1137
113k
          GetL0FileCountForCompactionSpeedup(
1138
113k
              mutable_cf_options.level0_file_num_compaction_trigger,
1139
113k
              mutable_cf_options.level0_slowdown_writes_trigger)) {
1140
706
        write_controller_token_ =
1141
706
            write_controller->GetCompactionPressureToken();
1142
706
        ROCKS_LOG_INFO(
1143
706
            ioptions_.logger,
1144
706
            "[%s] Increasing compaction threads because we have %d level-0 "
1145
706
            "files ",
1146
706
            name_.c_str(), vstorage->l0_delay_trigger_count());
1147
112k
      } else if (mutable_cf_options.soft_pending_compaction_bytes_limit == 0) {
1148
        // If soft pending compaction byte limit is not set, always speed up
1149
        // compaction.
1150
0
        write_controller_token_ =
1151
0
            write_controller->GetCompactionPressureToken();
1152
112k
      } else if (vstorage->estimated_compaction_needed_bytes() >=
1153
112k
                 GetPendingCompactionBytesForCompactionSpeedup(
1154
112k
                     mutable_cf_options, vstorage)) {
1155
0
        write_controller_token_ =
1156
0
            write_controller->GetCompactionPressureToken();
1157
0
        ROCKS_LOG_INFO(
1158
0
            ioptions_.logger,
1159
0
            "[%s] Increasing compaction threads because of estimated pending "
1160
0
            "compaction "
1161
0
            "bytes %" PRIu64,
1162
0
            name_.c_str(), vstorage->estimated_compaction_needed_bytes());
1163
112k
      } else if (uint64_t(vstorage->FilesMarkedForCompaction().size()) >=
1164
112k
                 GetMarkedFileCountForCompactionSpeedup()) {
1165
0
        write_controller_token_ =
1166
0
            write_controller->GetCompactionPressureToken();
1167
0
        ROCKS_LOG_INFO(
1168
0
            ioptions_.logger,
1169
0
            "[%s] Increasing compaction threads because we have %" PRIu64
1170
0
            " files marked for compaction",
1171
0
            name_.c_str(),
1172
0
            uint64_t(vstorage->FilesMarkedForCompaction().size()));
1173
112k
      } else {
1174
112k
        write_controller_token_.reset();
1175
112k
      }
1176
      // If the DB recovers from delay conditions, we reward with reducing
1177
      // double the slowdown ratio. This is to balance the long term slowdown
1178
      // increase signal.
1179
113k
      if (needed_delay) {
1180
6
        uint64_t write_rate = write_controller->delayed_write_rate();
1181
6
        write_controller->set_delayed_write_rate(static_cast<uint64_t>(
1182
6
            static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
1183
        // Set the low pri limit to be 1/4 the delayed write rate.
1184
        // Note we don't reset this value even after delay condition is relased.
1185
        // Low-pri rate will continue to apply if there is a compaction
1186
        // pressure.
1187
6
        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
1188
6
                                                                    4);
1189
6
      }
1190
113k
    }
1191
113k
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
1192
113k
  }
1193
290k
  return write_stall_condition;
1194
290k
}
1195
1196
118k
const FileOptions* ColumnFamilyData::soptions() const {
1197
118k
  return &(column_family_set_->file_options_);
1198
118k
}
1199
1200
288k
void ColumnFamilyData::SetCurrent(Version* current_version) {
1201
288k
  current_ = current_version;
1202
288k
}
1203
1204
0
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
1205
0
  return VersionSet::GetNumLiveVersions(dummy_versions_);
1206
0
}
1207
1208
0
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
1209
0
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
1210
0
}
1211
1212
0
uint64_t ColumnFamilyData::GetTotalBlobFileSize() const {
1213
0
  return VersionSet::GetTotalBlobFileSize(dummy_versions_);
1214
0
}
1215
1216
0
uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
1217
0
  return current_->GetSstFilesSize();
1218
0
}
1219
1220
MemTable* ColumnFamilyData::ConstructNewMemtable(
1221
141k
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1222
141k
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
1223
141k
                      write_buffer_manager_, earliest_seq, id_);
1224
141k
}
1225
1226
138k
void ColumnFamilyData::CreateNewMemtable(SequenceNumber earliest_seq) {
1227
138k
  if (mem_ != nullptr) {
1228
20.0k
    delete mem_->Unref();
1229
20.0k
  }
1230
  // NOTE: db mutex must be locked for SetMemtable, so safe for
1231
  // GetLatestMutableCFOptions
1232
138k
  SetMemtable(ConstructNewMemtable(GetLatestMutableCFOptions(), earliest_seq));
1233
138k
  mem_->Ref();
1234
138k
}
1235
1236
120k
bool ColumnFamilyData::NeedsCompaction() const {
1237
120k
  return !mutable_cf_options_.disable_auto_compactions &&
1238
120k
         compaction_picker_->NeedsCompaction(current_->storage_info());
1239
120k
}
1240
1241
Compaction* ColumnFamilyData::PickCompaction(
1242
    const MutableCFOptions& mutable_options,
1243
    const MutableDBOptions& mutable_db_options,
1244
    const std::vector<SequenceNumber>& existing_snapshots,
1245
    const SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
1246
6.26k
    bool require_max_output_level) {
1247
6.26k
  auto* result = compaction_picker_->PickCompaction(
1248
6.26k
      GetName(), mutable_options, mutable_db_options, existing_snapshots,
1249
6.26k
      snapshot_checker, current_->storage_info(), log_buffer,
1250
6.26k
      require_max_output_level);
1251
6.26k
  if (result != nullptr) {
1252
6.22k
    result->FinalizeInputInfo(current_);
1253
6.22k
  }
1254
6.26k
  return result;
1255
6.26k
}
1256
1257
bool ColumnFamilyData::RangeOverlapWithCompaction(
1258
    const Slice& smallest_user_key, const Slice& largest_user_key,
1259
0
    int level) const {
1260
0
  return compaction_picker_->RangeOverlapWithCompaction(
1261
0
      smallest_user_key, largest_user_key, level);
1262
0
}
1263
1264
Status ColumnFamilyData::RangesOverlapWithMemtables(
1265
    const autovector<UserKeyRange>& ranges, SuperVersion* super_version,
1266
4.50k
    bool allow_data_in_errors, bool* overlap) {
1267
4.50k
  assert(overlap != nullptr);
1268
4.50k
  *overlap = false;
1269
  // Create an InternalIterator over all unflushed memtables
1270
4.50k
  Arena arena;
1271
  // TODO: plumb Env::IOActivity, Env::IOPriority
1272
4.50k
  ReadOptions read_opts;
1273
4.50k
  read_opts.total_order_seek = true;
1274
4.50k
  MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
1275
4.50k
  merge_iter_builder.AddIterator(super_version->mem->NewIterator(
1276
4.50k
      read_opts, /*seqno_to_time_mapping=*/nullptr, &arena,
1277
4.50k
      /*prefix_extractor=*/nullptr, /*for_flush=*/false));
1278
4.50k
  super_version->imm->AddIterators(read_opts, /*seqno_to_time_mapping=*/nullptr,
1279
4.50k
                                   /*prefix_extractor=*/nullptr,
1280
4.50k
                                   &merge_iter_builder,
1281
4.50k
                                   false /* add_range_tombstone_iter */);
1282
4.50k
  ScopedArenaPtr<InternalIterator> memtable_iter(merge_iter_builder.Finish());
1283
1284
4.50k
  auto read_seq = super_version->current->version_set()->LastSequence();
1285
4.50k
  ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1286
4.50k
  auto* active_range_del_iter = super_version->mem->NewRangeTombstoneIterator(
1287
4.50k
      read_opts, read_seq, false /* immutable_memtable */);
1288
4.50k
  range_del_agg.AddTombstones(
1289
4.50k
      std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
1290
4.50k
  Status status;
1291
4.50k
  status = super_version->imm->AddRangeTombstoneIterators(
1292
4.50k
      read_opts, nullptr /* arena */, &range_del_agg);
1293
  // AddRangeTombstoneIterators always return Status::OK.
1294
4.50k
  assert(status.ok());
1295
1296
9.00k
  for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
1297
4.50k
    auto* vstorage = super_version->current->storage_info();
1298
4.50k
    auto* ucmp = vstorage->InternalComparator()->user_comparator();
1299
4.50k
    InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
1300
4.50k
                            kValueTypeForSeek);
1301
4.50k
    memtable_iter->Seek(range_start.Encode());
1302
4.50k
    status = memtable_iter->status();
1303
4.50k
    ParsedInternalKey seek_result;
1304
1305
4.50k
    if (status.ok() && memtable_iter->Valid()) {
1306
2.40k
      status = ParseInternalKey(memtable_iter->key(), &seek_result,
1307
2.40k
                                allow_data_in_errors);
1308
2.40k
    }
1309
1310
4.50k
    if (status.ok()) {
1311
4.50k
      if (memtable_iter->Valid() &&
1312
4.50k
          ucmp->CompareWithoutTimestamp(seek_result.user_key,
1313
2.40k
                                        ranges[i].limit) <= 0) {
1314
2.23k
        *overlap = true;
1315
2.26k
      } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
1316
2.26k
                                                 ranges[i].limit)) {
1317
0
        *overlap = true;
1318
0
      }
1319
4.50k
    }
1320
4.50k
  }
1321
4.50k
  return status;
1322
4.50k
}
1323
1324
const int ColumnFamilyData::kCompactAllLevels = -1;
1325
const int ColumnFamilyData::kCompactToBaseLevel = -2;
1326
1327
Compaction* ColumnFamilyData::CompactRange(
1328
    const MutableCFOptions& mutable_cf_options,
1329
    const MutableDBOptions& mutable_db_options, int input_level,
1330
    int output_level, const CompactRangeOptions& compact_range_options,
1331
    const InternalKey* begin, const InternalKey* end,
1332
    InternalKey** compaction_end, bool* conflict,
1333
3.26k
    uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
1334
3.26k
  auto* result = compaction_picker_->CompactRange(
1335
3.26k
      GetName(), mutable_cf_options, mutable_db_options,
1336
3.26k
      current_->storage_info(), input_level, output_level,
1337
3.26k
      compact_range_options, begin, end, compaction_end, conflict,
1338
3.26k
      max_file_num_to_ignore, trim_ts);
1339
3.26k
  if (result != nullptr) {
1340
2.75k
    result->FinalizeInputInfo(current_);
1341
2.75k
  }
1342
3.26k
  TEST_SYNC_POINT("ColumnFamilyData::CompactRange:Return");
1343
3.26k
  return result;
1344
3.26k
}
1345
1346
27.2k
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
1347
27.2k
  SuperVersion* sv = GetThreadLocalSuperVersion(db);
1348
27.2k
  sv->Ref();
1349
27.2k
  if (!ReturnThreadLocalSuperVersion(sv)) {
1350
    // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
1351
    // when the thread-local pointer was populated. So, the Ref() earlier in
1352
    // this function still prevents the returned SuperVersion* from being
1353
    // deleted out from under the caller.
1354
0
    sv->Unref();
1355
0
  }
1356
27.2k
  return sv;
1357
27.2k
}
1358
1359
52.0k
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
1360
  // The SuperVersion is cached in thread local storage to avoid acquiring
1361
  // mutex when SuperVersion does not change since the last use. When a new
1362
  // SuperVersion is installed, the compaction or flush thread cleans up
1363
  // cached SuperVersion in all existing thread local storage. To avoid
1364
  // acquiring mutex for this operation, we use atomic Swap() on the thread
1365
  // local pointer to guarantee exclusive access. If the thread local pointer
1366
  // is being used while a new SuperVersion is installed, the cached
1367
  // SuperVersion can become stale. In that case, the background thread would
1368
  // have swapped in kSVObsolete. We re-check the value at when returning
1369
  // SuperVersion back to thread local, with an atomic compare and swap.
1370
  // The superversion will need to be released if detected to be stale.
1371
52.0k
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
1372
  // Invariant:
1373
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
1374
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
1375
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
1376
  // (if no Scrape happens).
1377
52.0k
  assert(ptr != SuperVersion::kSVInUse);
1378
52.0k
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1379
52.0k
  if (sv == SuperVersion::kSVObsolete) {
1380
38.6k
    RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_ACQUIRES);
1381
38.6k
    db->mutex()->Lock();
1382
38.6k
    sv = super_version_->Ref();
1383
38.6k
    db->mutex()->Unlock();
1384
38.6k
  }
1385
52.0k
  assert(sv != nullptr);
1386
52.0k
  return sv;
1387
52.0k
}
1388
1389
52.0k
bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
1390
52.0k
  assert(sv != nullptr);
1391
  // Put the SuperVersion back
1392
52.0k
  void* expected = SuperVersion::kSVInUse;
1393
52.0k
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
1394
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
1395
    // storage has not been altered and no Scrape has happened. The
1396
    // SuperVersion is still current.
1397
52.0k
    return true;
1398
52.0k
  } else {
1399
    // ThreadLocal scrape happened in the process of this GetImpl call (after
1400
    // thread local Swap() at the beginning and before CompareAndSwap()).
1401
    // This means the SuperVersion it holds is obsolete.
1402
0
    assert(expected == SuperVersion::kSVObsolete);
1403
0
  }
1404
0
  return false;
1405
52.0k
}
1406
1407
void ColumnFamilyData::InstallSuperVersion(
1408
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1409
    std::optional<std::shared_ptr<SeqnoToTimeMapping>>
1410
113k
        new_seqno_to_time_mapping) {
1411
113k
  db_mutex->AssertHeld();
1412
1413
113k
  SuperVersion* new_superversion = sv_context->new_superversion.release();
1414
113k
  new_superversion->mutable_cf_options = GetLatestMutableCFOptions();
1415
113k
  new_superversion->Init(this, mem_, imm_.current(), current_,
1416
113k
                         new_seqno_to_time_mapping.has_value()
1417
113k
                             ? std::move(new_seqno_to_time_mapping.value())
1418
113k
                         : super_version_
1419
10.8k
                             ? super_version_->ShareSeqnoToTimeMapping()
1420
10.8k
                             : nullptr);
1421
113k
  SuperVersion* old_superversion = super_version_;
1422
113k
  super_version_ = new_superversion;
1423
113k
  if (old_superversion == nullptr || old_superversion->current != current() ||
1424
113k
      old_superversion->mem != mem_ ||
1425
113k
      old_superversion->imm != imm_.current()) {
1426
    // Should not recalculate slow down condition if nothing has changed, since
1427
    // currently RecalculateWriteStallConditions() treats it as further slowing
1428
    // down is needed.
1429
113k
    super_version_->write_stall_condition =
1430
113k
        RecalculateWriteStallConditions(new_superversion->mutable_cf_options);
1431
113k
  } else {
1432
0
    super_version_->write_stall_condition =
1433
0
        old_superversion->write_stall_condition;
1434
0
  }
1435
113k
  if (old_superversion != nullptr) {
1436
    // Reset SuperVersions cached in thread local storage.
1437
    // This should be done before old_superversion->Unref(). That's to ensure
1438
    // that local_sv_ never holds the last reference to SuperVersion, since
1439
    // it has no means to safely do SuperVersion cleanup.
1440
10.8k
    ResetThreadLocalSuperVersions();
1441
1442
10.8k
    if (old_superversion->mutable_cf_options.write_buffer_size !=
1443
10.8k
        new_superversion->mutable_cf_options.write_buffer_size) {
1444
0
      mem_->UpdateWriteBufferSize(
1445
0
          new_superversion->mutable_cf_options.write_buffer_size);
1446
0
    }
1447
10.8k
    if (old_superversion->write_stall_condition !=
1448
10.8k
        new_superversion->write_stall_condition) {
1449
5
      sv_context->PushWriteStallNotification(
1450
5
          old_superversion->write_stall_condition,
1451
5
          new_superversion->write_stall_condition, GetName(), &ioptions());
1452
5
    }
1453
10.8k
    if (old_superversion->Unref()) {
1454
10.8k
      old_superversion->Cleanup();
1455
10.8k
      sv_context->superversions_to_free.push_back(old_superversion);
1456
10.8k
    }
1457
10.8k
  }
1458
113k
  ++super_version_number_;
1459
113k
  super_version_->version_number = super_version_number_;
1460
113k
}
1461
1462
10.8k
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
1463
10.8k
  autovector<void*> sv_ptrs;
1464
10.8k
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1465
10.8k
  for (auto ptr : sv_ptrs) {
1466
5.99k
    assert(ptr);
1467
5.99k
    if (ptr == SuperVersion::kSVInUse) {
1468
0
      continue;
1469
0
    }
1470
5.99k
    auto sv = static_cast<SuperVersion*>(ptr);
1471
5.99k
    bool was_last_ref __attribute__((__unused__));
1472
5.99k
    was_last_ref = sv->Unref();
1473
    // sv couldn't have been the last reference because
1474
    // ResetThreadLocalSuperVersions() is called before
1475
    // unref'ing super_version_.
1476
5.99k
    assert(!was_last_ref);
1477
5.99k
  }
1478
10.8k
}
1479
1480
Status ColumnFamilyData::ValidateOptions(
1481
102k
    const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
1482
102k
  Status s;
1483
102k
  s = CheckCompressionSupported(cf_options);
1484
102k
  if (s.ok() && db_options.allow_concurrent_memtable_write) {
1485
102k
    s = CheckConcurrentWritesSupported(cf_options);
1486
102k
  }
1487
102k
  if (s.ok() && db_options.unordered_write &&
1488
102k
      cf_options.max_successive_merges != 0) {
1489
0
    s = Status::InvalidArgument(
1490
0
        "max_successive_merges > 0 is incompatible with unordered_write");
1491
0
  }
1492
102k
  if (s.ok()) {
1493
102k
    s = CheckCFPathsSupported(db_options, cf_options);
1494
102k
  }
1495
102k
  if (!s.ok()) {
1496
0
    return s;
1497
0
  }
1498
1499
102k
  if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
1500
0
    if (!cf_options.table_factory->IsInstanceOf(
1501
0
            TableFactory::kBlockBasedTableName())) {
1502
0
      return Status::NotSupported(
1503
0
          "TTL is only supported in Block-Based Table format. ");
1504
0
    }
1505
0
  }
1506
1507
102k
  if (cf_options.periodic_compaction_seconds > 0 &&
1508
102k
      cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
1509
0
    if (!cf_options.table_factory->IsInstanceOf(
1510
0
            TableFactory::kBlockBasedTableName())) {
1511
0
      return Status::NotSupported(
1512
0
          "Periodic Compaction is only supported in "
1513
0
          "Block-Based Table format. ");
1514
0
    }
1515
0
  }
1516
1517
102k
  const auto* ucmp = cf_options.comparator;
1518
102k
  assert(ucmp);
1519
102k
  if (ucmp->timestamp_size() > 0 &&
1520
102k
      !cf_options.persist_user_defined_timestamps) {
1521
0
    if (db_options.atomic_flush) {
1522
0
      return Status::NotSupported(
1523
0
          "Not persisting user-defined timestamps feature is not supported"
1524
0
          "in combination with atomic flush.");
1525
0
    }
1526
0
    if (db_options.allow_concurrent_memtable_write) {
1527
0
      return Status::NotSupported(
1528
0
          "Not persisting user-defined timestamps feature is not supported"
1529
0
          " in combination with concurrent memtable write.");
1530
0
    }
1531
0
    const char* comparator_name = cf_options.comparator->Name();
1532
0
    size_t name_size = strlen(comparator_name);
1533
0
    const char* suffix = ".u64ts";
1534
0
    size_t suffix_size = strlen(suffix);
1535
0
    if (name_size <= suffix_size ||
1536
0
        strcmp(comparator_name + name_size - suffix_size, suffix) != 0) {
1537
0
      return Status::NotSupported(
1538
0
          "Not persisting user-defined timestamps"
1539
0
          "feature only support user-defined timestamps formatted as "
1540
0
          "uint64_t.");
1541
0
    }
1542
0
  }
1543
1544
102k
  if (cf_options.enable_blob_garbage_collection) {
1545
0
    if (cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
1546
0
        cf_options.blob_garbage_collection_age_cutoff > 1.0) {
1547
0
      return Status::InvalidArgument(
1548
0
          "The age cutoff for blob garbage collection should be in the range "
1549
0
          "[0.0, 1.0].");
1550
0
    }
1551
0
    if (cf_options.blob_garbage_collection_force_threshold < 0.0 ||
1552
0
        cf_options.blob_garbage_collection_force_threshold > 1.0) {
1553
0
      return Status::InvalidArgument(
1554
0
          "The garbage ratio threshold for forcing blob garbage collection "
1555
0
          "should be in the range [0.0, 1.0].");
1556
0
    }
1557
0
  }
1558
1559
102k
  if (cf_options.compaction_style == kCompactionStyleFIFO &&
1560
102k
      db_options.max_open_files != -1 && cf_options.ttl > 0) {
1561
0
    return Status::NotSupported(
1562
0
        "FIFO compaction only supported with max_open_files = -1.");
1563
0
  }
1564
1565
102k
  std::vector<uint32_t> supported{0, 1, 2, 4, 8};
1566
102k
  if (std::find(supported.begin(), supported.end(),
1567
102k
                cf_options.memtable_protection_bytes_per_key) ==
1568
102k
      supported.end()) {
1569
0
    return Status::NotSupported(
1570
0
        "Memtable per key-value checksum protection only supports 0, 1, 2, 4 "
1571
0
        "or 8 bytes per key.");
1572
0
  }
1573
102k
  if (std::find(supported.begin(), supported.end(),
1574
102k
                cf_options.block_protection_bytes_per_key) == supported.end()) {
1575
0
    return Status::NotSupported(
1576
0
        "Block per key-value checksum protection only supports 0, 1, 2, 4 "
1577
0
        "or 8 bytes per key.");
1578
0
  }
1579
1580
102k
  if (!cf_options.compaction_options_fifo.file_temperature_age_thresholds
1581
102k
           .empty()) {
1582
0
    if (cf_options.compaction_style != kCompactionStyleFIFO) {
1583
0
      return Status::NotSupported(
1584
0
          "Option file_temperature_age_thresholds only supports FIFO "
1585
0
          "compaction.");
1586
0
    } else if (cf_options.num_levels > 1) {
1587
0
      return Status::NotSupported(
1588
0
          "Option file_temperature_age_thresholds is only supported when "
1589
0
          "num_levels = 1.");
1590
0
    } else {
1591
0
      const auto& ages =
1592
0
          cf_options.compaction_options_fifo.file_temperature_age_thresholds;
1593
0
      assert(ages.size() >= 1);
1594
      // check that age is sorted
1595
0
      for (size_t i = 0; i < ages.size() - 1; ++i) {
1596
0
        if (ages[i].age >= ages[i + 1].age) {
1597
0
          return Status::NotSupported(
1598
0
              "Option file_temperature_age_thresholds requires elements to be "
1599
0
              "sorted in increasing order with respect to `age` field.");
1600
0
        }
1601
0
      }
1602
0
    }
1603
0
  }
1604
1605
102k
  if (cf_options.compaction_style == kCompactionStyleUniversal) {
1606
0
    int max_read_amp = cf_options.compaction_options_universal.max_read_amp;
1607
0
    if (max_read_amp < -1) {
1608
0
      return Status::NotSupported(
1609
0
          "CompactionOptionsUniversal::max_read_amp should be at least -1.");
1610
0
    } else if (0 < max_read_amp &&
1611
0
               max_read_amp < cf_options.level0_file_num_compaction_trigger) {
1612
0
      return Status::NotSupported(
1613
0
          "CompactionOptionsUniversal::max_read_amp limits the number of sorted"
1614
0
          " runs but is smaller than the compaction trigger "
1615
0
          "level0_file_num_compaction_trigger.");
1616
0
    }
1617
0
  }
1618
102k
  return s;
1619
102k
}
1620
1621
Status ColumnFamilyData::SetOptions(
1622
    const DBOptions& db_opts,
1623
0
    const std::unordered_map<std::string, std::string>& options_map) {
1624
0
  ColumnFamilyOptions cf_opts =
1625
0
      BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
1626
0
  ConfigOptions config_opts;
1627
0
  config_opts.mutable_options_only = true;
1628
#ifndef NDEBUG
1629
  if (TEST_allowSetOptionsImmutableInMutable) {
1630
    config_opts.mutable_options_only = false;
1631
  }
1632
#endif
1633
0
  Status s = GetColumnFamilyOptionsFromMap(config_opts, cf_opts, options_map,
1634
0
                                           &cf_opts);
1635
0
  if (s.ok()) {
1636
    // FIXME: we should call SanitizeOptions() too or consolidate it with
1637
    // ValidateOptions().
1638
0
    s = ValidateOptions(db_opts, cf_opts);
1639
0
  }
1640
0
  if (s.ok()) {
1641
0
    mutable_cf_options_ = MutableCFOptions(cf_opts);
1642
0
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1643
0
  }
1644
0
  return s;
1645
0
}
1646
1647
Status ColumnFamilyData::AddDirectories(
1648
102k
    std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
1649
102k
  Status s;
1650
102k
  assert(created_dirs != nullptr);
1651
102k
  assert(data_dirs_.empty());
1652
102k
  for (auto& p : ioptions_.cf_paths) {
1653
102k
    auto existing_dir = created_dirs->find(p.path);
1654
1655
102k
    if (existing_dir == created_dirs->end()) {
1656
80.3k
      std::unique_ptr<FSDirectory> path_directory;
1657
80.3k
      s = DBImpl::CreateAndNewDirectory(ioptions_.fs.get(), p.path,
1658
80.3k
                                        &path_directory);
1659
80.3k
      if (!s.ok()) {
1660
0
        return s;
1661
0
      }
1662
80.3k
      assert(path_directory != nullptr);
1663
80.3k
      data_dirs_.emplace_back(path_directory.release());
1664
80.3k
      (*created_dirs)[p.path] = data_dirs_.back();
1665
80.3k
    } else {
1666
21.9k
      data_dirs_.emplace_back(existing_dir->second);
1667
21.9k
    }
1668
102k
  }
1669
102k
  assert(data_dirs_.size() == ioptions_.cf_paths.size());
1670
102k
  return s;
1671
102k
}
1672
1673
12.9k
FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
1674
12.9k
  if (data_dirs_.empty()) {
1675
0
    return nullptr;
1676
0
  }
1677
1678
12.9k
  assert(path_id < data_dirs_.size());
1679
12.9k
  return data_dirs_[path_id].get();
1680
12.9k
}
1681
1682
2.23k
void ColumnFamilyData::SetFlushSkipReschedule() {
1683
2.23k
  const Comparator* ucmp = user_comparator();
1684
2.23k
  const size_t ts_sz = ucmp->timestamp_size();
1685
2.23k
  if (ts_sz == 0 || ioptions_.persist_user_defined_timestamps) {
1686
2.23k
    return;
1687
2.23k
  }
1688
0
  flush_skip_reschedule_.store(true);
1689
0
}
1690
1691
2.23k
bool ColumnFamilyData::GetAndClearFlushSkipReschedule() {
1692
2.23k
  return flush_skip_reschedule_.exchange(false);
1693
2.23k
}
1694
1695
bool ColumnFamilyData::ShouldPostponeFlushToRetainUDT(
1696
2.23k
    uint64_t max_memtable_id) {
1697
2.23k
  const Comparator* ucmp = user_comparator();
1698
2.23k
  const size_t ts_sz = ucmp->timestamp_size();
1699
2.23k
  if (ts_sz == 0 || ioptions_.persist_user_defined_timestamps) {
1700
2.23k
    return false;
1701
2.23k
  }
1702
  // If users set the `persist_user_defined_timestamps` flag to false, they
1703
  // should also set the `full_history_ts_low` flag to indicate the range of
1704
  // user-defined timestamps to retain in memory. Otherwise, we do not
1705
  // explicitly postpone flush to retain UDTs.
1706
0
  const std::string& full_history_ts_low = GetFullHistoryTsLow();
1707
0
  if (full_history_ts_low.empty()) {
1708
0
    return false;
1709
0
  }
1710
0
  for (const Slice& table_newest_udt :
1711
0
       imm()->GetTablesNewestUDT(max_memtable_id)) {
1712
0
    if (table_newest_udt.empty()) {
1713
0
      continue;
1714
0
    }
1715
0
    assert(table_newest_udt.size() == full_history_ts_low.size());
1716
    // Checking the newest UDT contained in MemTable with ascending ID up to
1717
    // `max_memtable_id`. Return immediately on finding the first MemTable that
1718
    // needs postponing.
1719
0
    if (ucmp->CompareTimestamp(table_newest_udt, full_history_ts_low) >= 0) {
1720
0
      return true;
1721
0
    }
1722
0
  }
1723
0
  return false;
1724
0
}
1725
1726
80.3k
void ColumnFamilyData::RecoverEpochNumbers() {
1727
80.3k
  assert(current_);
1728
80.3k
  auto* vstorage = current_->storage_info();
1729
80.3k
  assert(vstorage);
1730
80.3k
  vstorage->RecoverEpochNumbers(this);
1731
80.3k
}
1732
1733
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1734
                                 const ImmutableDBOptions* db_options,
1735
                                 const FileOptions& file_options,
1736
                                 Cache* table_cache,
1737
                                 WriteBufferManager* _write_buffer_manager,
1738
                                 WriteController* _write_controller,
1739
                                 BlockCacheTracer* const block_cache_tracer,
1740
                                 const std::shared_ptr<IOTracer>& io_tracer,
1741
                                 const std::string& db_id,
1742
                                 const std::string& db_session_id)
1743
58.3k
    : max_column_family_(0),
1744
58.3k
      file_options_(file_options),
1745
58.3k
      dummy_cfd_(new ColumnFamilyData(
1746
58.3k
          ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
1747
58.3k
          nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr,
1748
58.3k
          block_cache_tracer, io_tracer, db_id, db_session_id,
1749
58.3k
          /*read_only*/ true)),
1750
58.3k
      default_cfd_cache_(nullptr),
1751
58.3k
      db_name_(dbname),
1752
58.3k
      db_options_(db_options),
1753
58.3k
      table_cache_(table_cache),
1754
58.3k
      write_buffer_manager_(_write_buffer_manager),
1755
58.3k
      write_controller_(_write_controller),
1756
58.3k
      block_cache_tracer_(block_cache_tracer),
1757
58.3k
      io_tracer_(io_tracer),
1758
58.3k
      db_id_(db_id),
1759
58.3k
      db_session_id_(db_session_id) {
1760
  // initialize linked list
1761
58.3k
  dummy_cfd_->prev_ = dummy_cfd_;
1762
58.3k
  dummy_cfd_->next_ = dummy_cfd_;
1763
58.3k
}
1764
1765
58.3k
ColumnFamilySet::~ColumnFamilySet() {
1766
138k
  while (column_family_data_.size() > 0) {
1767
    // cfd destructor will delete itself from column_family_data_
1768
80.3k
    auto cfd = column_family_data_.begin()->second;
1769
80.3k
    bool last_ref __attribute__((__unused__));
1770
80.3k
    last_ref = cfd->UnrefAndTryDelete();
1771
80.3k
    assert(last_ref);
1772
80.3k
  }
1773
58.3k
  bool dummy_last_ref __attribute__((__unused__));
1774
58.3k
  dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
1775
58.3k
  assert(dummy_last_ref);
1776
58.3k
}
1777
1778
2.21M
ColumnFamilyData* ColumnFamilySet::GetDefault() const {
1779
2.21M
  assert(default_cfd_cache_ != nullptr);
1780
2.21M
  return default_cfd_cache_;
1781
2.21M
}
1782
1783
789k
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
1784
789k
  auto cfd_iter = column_family_data_.find(id);
1785
789k
  if (cfd_iter != column_family_data_.end()) {
1786
770k
    return cfd_iter->second;
1787
770k
  } else {
1788
19.6k
    return nullptr;
1789
19.6k
  }
1790
789k
}
1791
1792
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(
1793
146k
    const std::string& name) const {
1794
146k
  auto cfd_iter = column_families_.find(name);
1795
146k
  if (cfd_iter != column_families_.end()) {
1796
124k
    auto cfd = GetColumnFamily(cfd_iter->second);
1797
124k
    assert(cfd != nullptr);
1798
124k
    return cfd;
1799
124k
  } else {
1800
21.9k
    return nullptr;
1801
21.9k
  }
1802
146k
}
1803
1804
21.9k
uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
1805
21.9k
  return ++max_column_family_;
1806
21.9k
}
1807
1808
170k
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
1809
1810
58.3k
void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
1811
58.3k
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
1812
58.3k
}
1813
1814
121k
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
1815
121k
  return column_families_.size();
1816
121k
}
1817
1818
// under a DB mutex AND write thread
1819
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
1820
    const std::string& name, uint32_t id, Version* dummy_versions,
1821
118k
    const ColumnFamilyOptions& options, bool read_only) {
1822
118k
  assert(column_families_.find(name) == column_families_.end());
1823
118k
  ColumnFamilyData* new_cfd = new ColumnFamilyData(
1824
118k
      id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1825
118k
      *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
1826
118k
      db_id_, db_session_id_, read_only);
1827
118k
  column_families_.insert({name, id});
1828
118k
  column_family_data_.insert({id, new_cfd});
1829
118k
  auto ucmp = new_cfd->user_comparator();
1830
118k
  assert(ucmp);
1831
118k
  size_t ts_sz = ucmp->timestamp_size();
1832
118k
  running_ts_sz_.insert({id, ts_sz});
1833
118k
  if (ts_sz > 0) {
1834
0
    ts_sz_for_record_.insert({id, ts_sz});
1835
0
  }
1836
118k
  max_column_family_ = std::max(max_column_family_, id);
1837
  // add to linked list
1838
118k
  new_cfd->next_ = dummy_cfd_;
1839
118k
  auto prev = dummy_cfd_->prev_;
1840
118k
  new_cfd->prev_ = prev;
1841
118k
  prev->next_ = new_cfd;
1842
118k
  dummy_cfd_->prev_ = new_cfd;
1843
118k
  if (id == 0) {
1844
58.3k
    default_cfd_cache_ = new_cfd;
1845
58.3k
  }
1846
118k
  return new_cfd;
1847
118k
}
1848
1849
// under a DB mutex AND from a write thread
1850
118k
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1851
118k
  uint32_t cf_id = cfd->GetID();
1852
118k
  auto cfd_iter = column_family_data_.find(cf_id);
1853
118k
  assert(cfd_iter != column_family_data_.end());
1854
118k
  column_family_data_.erase(cfd_iter);
1855
118k
  column_families_.erase(cfd->GetName());
1856
118k
  running_ts_sz_.erase(cf_id);
1857
118k
  ts_sz_for_record_.erase(cf_id);
1858
118k
}
1859
1860
// under a DB mutex OR from a write thread
1861
2.04M
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
1862
2.04M
  if (column_family_id == 0) {
1863
    // optimization for common case
1864
2.00M
    current_ = column_family_set_->GetDefault();
1865
2.00M
  } else {
1866
41.5k
    current_ = column_family_set_->GetColumnFamily(column_family_id);
1867
41.5k
  }
1868
2.04M
  handle_.SetCFD(current_);
1869
2.04M
  return current_ != nullptr;
1870
2.04M
}
1871
1872
997k
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
1873
997k
  assert(current_ != nullptr);
1874
997k
  return current_->GetLogNumber();
1875
997k
}
1876
1877
1.89M
MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
1878
1.89M
  assert(current_ != nullptr);
1879
1.89M
  return current_->mem();
1880
1.89M
}
1881
1882
962k
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1883
962k
  assert(current_ != nullptr);
1884
962k
  return &handle_;
1885
962k
}
1886
1887
1.02M
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
1888
1.02M
  uint32_t column_family_id = 0;
1889
1.02M
  if (column_family != nullptr) {
1890
1.02M
    auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1891
1.02M
    column_family_id = cfh->GetID();
1892
1.02M
  }
1893
1.02M
  return column_family_id;
1894
1.02M
}
1895
1896
const Comparator* GetColumnFamilyUserComparator(
1897
0
    ColumnFamilyHandle* column_family) {
1898
0
  if (column_family != nullptr) {
1899
0
    return column_family->GetComparator();
1900
0
  }
1901
0
  return nullptr;
1902
0
}
1903
1904
0
const ImmutableOptions& GetImmutableOptions(ColumnFamilyHandle* column_family) {
1905
0
  assert(column_family);
1906
1907
0
  ColumnFamilyHandleImpl* const handle =
1908
0
      static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1909
0
  assert(handle);
1910
1911
0
  const ColumnFamilyData* const cfd = handle->cfd();
1912
0
  assert(cfd);
1913
1914
0
  return cfd->ioptions();
1915
0
}
1916
1917
}  // namespace ROCKSDB_NAMESPACE