Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "io"
11 : "runtime"
12 : "strconv"
13 : "strings"
14 : "time"
15 : "unicode"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/fifo"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/cache"
21 : "github.com/cockroachdb/pebble/internal/humanize"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/manifest"
24 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
25 : "github.com/cockroachdb/pebble/objstorage/remote"
26 : "github.com/cockroachdb/pebble/rangekey"
27 : "github.com/cockroachdb/pebble/sstable"
28 : "github.com/cockroachdb/pebble/vfs"
29 : "github.com/cockroachdb/pebble/wal"
30 : )
31 :
32 : const (
33 : cacheDefaultSize = 8 << 20 // 8 MB
34 : defaultLevelMultiplier = 10
35 : )
36 :
37 : // Compression exports the base.Compression type.
38 : type Compression = sstable.Compression
39 :
40 : // Exported Compression constants.
41 : const (
42 : DefaultCompression = sstable.DefaultCompression
43 : NoCompression = sstable.NoCompression
44 : SnappyCompression = sstable.SnappyCompression
45 : ZstdCompression = sstable.ZstdCompression
46 : )
47 :
48 : // FilterType exports the base.FilterType type.
49 : type FilterType = base.FilterType
50 :
51 : // Exported TableFilter constants.
52 : const (
53 : TableFilter = base.TableFilter
54 : )
55 :
56 : // FilterWriter exports the base.FilterWriter type.
57 : type FilterWriter = base.FilterWriter
58 :
59 : // FilterPolicy exports the base.FilterPolicy type.
60 : type FilterPolicy = base.FilterPolicy
61 :
62 : // BlockPropertyCollector exports the sstable.BlockPropertyCollector type.
63 : type BlockPropertyCollector = sstable.BlockPropertyCollector
64 :
65 : // BlockPropertyFilter exports the sstable.BlockPropertyFilter type.
66 : type BlockPropertyFilter = base.BlockPropertyFilter
67 :
68 : // ShortAttributeExtractor exports the base.ShortAttributeExtractor type.
69 : type ShortAttributeExtractor = base.ShortAttributeExtractor
70 :
71 : // UserKeyPrefixBound exports the sstable.UserKeyPrefixBound type.
72 : type UserKeyPrefixBound = sstable.UserKeyPrefixBound
73 :
74 : // IterKeyType configures which types of keys an iterator should surface.
75 : type IterKeyType int8
76 :
77 : const (
78 : // IterKeyTypePointsOnly configures an iterator to iterate over point keys
79 : // only.
80 : IterKeyTypePointsOnly IterKeyType = iota
81 : // IterKeyTypeRangesOnly configures an iterator to iterate over range keys
82 : // only.
83 : IterKeyTypeRangesOnly
84 : // IterKeyTypePointsAndRanges configures an iterator iterate over both point
85 : // keys and range keys simultaneously.
86 : IterKeyTypePointsAndRanges
87 : )
88 :
89 : // String implements fmt.Stringer.
90 1 : func (t IterKeyType) String() string {
91 1 : switch t {
92 1 : case IterKeyTypePointsOnly:
93 1 : return "points-only"
94 1 : case IterKeyTypeRangesOnly:
95 1 : return "ranges-only"
96 1 : case IterKeyTypePointsAndRanges:
97 1 : return "points-and-ranges"
98 0 : default:
99 0 : panic(fmt.Sprintf("unknown key type %d", t))
100 : }
101 : }
102 :
103 : // IterOptions hold the optional per-query parameters for NewIter.
104 : //
105 : // Like Options, a nil *IterOptions is valid and means to use the default
106 : // values.
107 : type IterOptions struct {
108 : // LowerBound specifies the smallest key (inclusive) that the iterator will
109 : // return during iteration. If the iterator is seeked or iterated past this
110 : // boundary the iterator will return Valid()==false. Setting LowerBound
111 : // effectively truncates the key space visible to the iterator.
112 : LowerBound []byte
113 : // UpperBound specifies the largest key (exclusive) that the iterator will
114 : // return during iteration. If the iterator is seeked or iterated past this
115 : // boundary the iterator will return Valid()==false. Setting UpperBound
116 : // effectively truncates the key space visible to the iterator.
117 : UpperBound []byte
118 : // TableFilter can be used to filter the tables that are scanned during
119 : // iteration based on the user properties. Return true to scan the table and
120 : // false to skip scanning. This function must be thread-safe since the same
121 : // function can be used by multiple iterators, if the iterator is cloned.
122 : TableFilter func(userProps map[string]string) bool
123 : // SkipPoint may be used to skip over point keys that don't match an
124 : // arbitrary predicate during iteration. If set, the Iterator invokes
125 : // SkipPoint for keys encountered. If SkipPoint returns true, the iterator
126 : // will skip the key without yielding it to the iterator operation in
127 : // progress.
128 : //
129 : // SkipPoint must be a pure function and always return the same result when
130 : // provided the same arguments. The iterator may call SkipPoint multiple
131 : // times for the same user key.
132 : SkipPoint func(userKey []byte) bool
133 : // PointKeyFilters can be used to avoid scanning tables and blocks in tables
134 : // when iterating over point keys. This slice represents an intersection
135 : // across all filters, i.e., all filters must indicate that the block is
136 : // relevant.
137 : //
138 : // Performance note: When len(PointKeyFilters) > 0, the caller should ensure
139 : // that cap(PointKeyFilters) is at least len(PointKeyFilters)+1. This helps
140 : // avoid allocations in Pebble internal code that mutates the slice.
141 : PointKeyFilters []BlockPropertyFilter
142 : // RangeKeyFilters can be usefd to avoid scanning tables and blocks in tables
143 : // when iterating over range keys. The same requirements that apply to
144 : // PointKeyFilters apply here too.
145 : RangeKeyFilters []BlockPropertyFilter
146 : // KeyTypes configures which types of keys to iterate over: point keys,
147 : // range keys, or both.
148 : KeyTypes IterKeyType
149 : // RangeKeyMasking can be used to enable automatic masking of point keys by
150 : // range keys. Range key masking is only supported during combined range key
151 : // and point key iteration mode (IterKeyTypePointsAndRanges).
152 : RangeKeyMasking RangeKeyMasking
153 :
154 : // OnlyReadGuaranteedDurable is an advanced option that is only supported by
155 : // the Reader implemented by DB. When set to true, only the guaranteed to be
156 : // durable state is visible in the iterator.
157 : // - This definition is made under the assumption that the FS implementation
158 : // is providing a durability guarantee when data is synced.
159 : // - The visible state represents a consistent point in the history of the
160 : // DB.
161 : // - The implementation is free to choose a conservative definition of what
162 : // is guaranteed durable. For simplicity, the current implementation
163 : // ignores memtables. A more sophisticated implementation could track the
164 : // highest seqnum that is synced to the WAL and published and use that as
165 : // the visible seqnum for an iterator. Note that the latter approach is
166 : // not strictly better than the former since we can have DBs that are (a)
167 : // synced more rarely than memtable flushes, (b) have no WAL. (a) is
168 : // likely to be true in a future CockroachDB context where the DB
169 : // containing the state machine may be rarely synced.
170 : // NB: this current implementation relies on the fact that memtables are
171 : // flushed in seqnum order, and any ingested sstables that happen to have a
172 : // lower seqnum than a non-flushed memtable don't have any overlapping keys.
173 : // This is the fundamental level invariant used in other code too, like when
174 : // merging iterators.
175 : //
176 : // Semantically, using this option provides the caller a "snapshot" as of
177 : // the time the most recent memtable was flushed. An alternate interface
178 : // would be to add a NewSnapshot variant. Creating a snapshot is heavier
179 : // weight than creating an iterator, so we have opted to support this
180 : // iterator option.
181 : OnlyReadGuaranteedDurable bool
182 : // UseL6Filters allows the caller to opt into reading filter blocks for L6
183 : // sstables. Helpful if a lot of SeekPrefixGEs are expected in quick
184 : // succession, that are also likely to not yield a single key. Filter blocks in
185 : // L6 can be relatively large, often larger than data blocks, so the benefit of
186 : // loading them in the cache is minimized if the probability of the key
187 : // existing is not low or if we just expect a one-time Seek (where loading the
188 : // data block directly is better).
189 : UseL6Filters bool
190 : // CategoryAndQoS is used for categorized iterator stats. This should not be
191 : // changed by calling SetOptions.
192 : sstable.CategoryAndQoS
193 :
194 : DebugRangeKeyStack bool
195 :
196 : // Internal options.
197 :
198 : logger Logger
199 : // Level corresponding to this file. Only passed in if constructed by a
200 : // levelIter.
201 : level manifest.Level
202 : // disableLazyCombinedIteration is an internal testing option.
203 : disableLazyCombinedIteration bool
204 : // snapshotForHideObsoletePoints is specified for/by levelIter when opening
205 : // files and is used to decide whether to hide obsolete points. A value of 0
206 : // implies obsolete points should not be hidden.
207 : snapshotForHideObsoletePoints base.SeqNum
208 :
209 : // NB: If adding new Options, you must account for them in iterator
210 : // construction and Iterator.SetOptions.
211 : }
212 :
213 : // GetLowerBound returns the LowerBound or nil if the receiver is nil.
214 2 : func (o *IterOptions) GetLowerBound() []byte {
215 2 : if o == nil {
216 2 : return nil
217 2 : }
218 2 : return o.LowerBound
219 : }
220 :
221 : // GetUpperBound returns the UpperBound or nil if the receiver is nil.
222 2 : func (o *IterOptions) GetUpperBound() []byte {
223 2 : if o == nil {
224 2 : return nil
225 2 : }
226 2 : return o.UpperBound
227 : }
228 :
229 2 : func (o *IterOptions) pointKeys() bool {
230 2 : if o == nil {
231 0 : return true
232 0 : }
233 2 : return o.KeyTypes == IterKeyTypePointsOnly || o.KeyTypes == IterKeyTypePointsAndRanges
234 : }
235 :
236 2 : func (o *IterOptions) rangeKeys() bool {
237 2 : if o == nil {
238 0 : return false
239 0 : }
240 2 : return o.KeyTypes == IterKeyTypeRangesOnly || o.KeyTypes == IterKeyTypePointsAndRanges
241 : }
242 :
243 2 : func (o *IterOptions) getLogger() Logger {
244 2 : if o == nil || o.logger == nil {
245 2 : return DefaultLogger
246 2 : }
247 2 : return o.logger
248 : }
249 :
250 : // SpanIterOptions creates a SpanIterOptions from this IterOptions.
251 2 : func (o *IterOptions) SpanIterOptions() keyspan.SpanIterOptions {
252 2 : if o == nil {
253 2 : return keyspan.SpanIterOptions{}
254 2 : }
255 2 : return keyspan.SpanIterOptions{
256 2 : RangeKeyFilters: o.RangeKeyFilters,
257 2 : }
258 : }
259 :
260 : // scanInternalOptions is similar to IterOptions, meant for use with
261 : // scanInternalIterator.
262 : type scanInternalOptions struct {
263 : sstable.CategoryAndQoS
264 : IterOptions
265 :
266 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error
267 : visitRangeDel func(start, end []byte, seqNum SeqNum) error
268 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error
269 : visitSharedFile func(sst *SharedSSTMeta) error
270 : visitExternalFile func(sst *ExternalFile) error
271 :
272 : // includeObsoleteKeys specifies whether keys shadowed by newer internal keys
273 : // are exposed. If false, only one internal key per user key is exposed.
274 : includeObsoleteKeys bool
275 :
276 : // rateLimitFunc is used to limit the amount of bytes read per second.
277 : rateLimitFunc func(key *InternalKey, value LazyValue) error
278 : }
279 :
280 : // RangeKeyMasking configures automatic hiding of point keys by range keys. A
281 : // non-nil Suffix enables range-key masking. When enabled, range keys with
282 : // suffixes ≥ Suffix behave as masks. All point keys that are contained within a
283 : // masking range key's bounds and have suffixes greater than the range key's
284 : // suffix are automatically skipped.
285 : //
286 : // Specifically, when configured with a RangeKeyMasking.Suffix _s_, and there
287 : // exists a range key with suffix _r_ covering a point key with suffix _p_, and
288 : //
289 : // _s_ ≤ _r_ < _p_
290 : //
291 : // then the point key is elided.
292 : //
293 : // Range-key masking may only be used when iterating over both point keys and
294 : // range keys with IterKeyTypePointsAndRanges.
295 : type RangeKeyMasking struct {
296 : // Suffix configures which range keys may mask point keys. Only range keys
297 : // that are defined at suffixes greater than or equal to Suffix will mask
298 : // point keys.
299 : Suffix []byte
300 : // Filter is an optional field that may be used to improve performance of
301 : // range-key masking through a block-property filter defined over key
302 : // suffixes. If non-nil, Filter is called by Pebble to construct a
303 : // block-property filter mask at iterator creation. The filter is used to
304 : // skip whole point-key blocks containing point keys with suffixes greater
305 : // than a covering range-key's suffix.
306 : //
307 : // To use this functionality, the caller must create and configure (through
308 : // Options.BlockPropertyCollectors) a block-property collector that records
309 : // the maxmimum suffix contained within a block. The caller then must write
310 : // and provide a BlockPropertyFilterMask implementation on that same
311 : // property. See the BlockPropertyFilterMask type for more information.
312 : Filter func() BlockPropertyFilterMask
313 : }
314 :
315 : // BlockPropertyFilterMask extends the BlockPropertyFilter interface for use
316 : // with range-key masking. Unlike an ordinary block property filter, a
317 : // BlockPropertyFilterMask's filtering criteria is allowed to change when Pebble
318 : // invokes its SetSuffix method.
319 : //
320 : // When a Pebble iterator steps into a range key's bounds and the range key has
321 : // a suffix greater than or equal to RangeKeyMasking.Suffix, the range key acts
322 : // as a mask. The masking range key hides all point keys that fall within the
323 : // range key's bounds and have suffixes > the range key's suffix. Without a
324 : // filter mask configured, Pebble performs this hiding by stepping through point
325 : // keys and comparing suffixes. If large numbers of point keys are masked, this
326 : // requires Pebble to load, iterate through and discard a large number of
327 : // sstable blocks containing masked point keys.
328 : //
329 : // If a block-property collector and a filter mask are configured, Pebble may
330 : // skip loading some point-key blocks altogether. If a block's keys are known to
331 : // all fall within the bounds of the masking range key and the block was
332 : // annotated by a block-property collector with the maximal suffix, Pebble can
333 : // ask the filter mask to compare the property to the current masking range
334 : // key's suffix. If the mask reports no intersection, the block may be skipped.
335 : //
336 : // If unsuffixed and suffixed keys are written to the database, care must be
337 : // taken to avoid unintentionally masking un-suffixed keys located in the same
338 : // block as suffixed keys. One solution is to interpret unsuffixed keys as
339 : // containing the maximal suffix value, ensuring that blocks containing
340 : // unsuffixed keys are always loaded.
341 : type BlockPropertyFilterMask interface {
342 : BlockPropertyFilter
343 :
344 : // SetSuffix configures the mask with the suffix of a range key. The filter
345 : // should return false from Intersects whenever it's provided with a
346 : // property encoding a block's minimum suffix that's greater (according to
347 : // Compare) than the provided suffix.
348 : SetSuffix(suffix []byte) error
349 : }
350 :
351 : // WriteOptions hold the optional per-query parameters for Set and Delete
352 : // operations.
353 : //
354 : // Like Options, a nil *WriteOptions is valid and means to use the default
355 : // values.
356 : type WriteOptions struct {
357 : // Sync is whether to sync writes through the OS buffer cache and down onto
358 : // the actual disk, if applicable. Setting Sync is required for durability of
359 : // individual write operations but can result in slower writes.
360 : //
361 : // If false, and the process or machine crashes, then a recent write may be
362 : // lost. This is due to the recently written data being buffered inside the
363 : // process running Pebble. This differs from the semantics of a write system
364 : // call in which the data is buffered in the OS buffer cache and would thus
365 : // survive a process crash.
366 : //
367 : // The default value is true.
368 : Sync bool
369 : }
370 :
371 : // Sync specifies the default write options for writes which synchronize to
372 : // disk.
373 : var Sync = &WriteOptions{Sync: true}
374 :
375 : // NoSync specifies the default write options for writes which do not
376 : // synchronize to disk.
377 : var NoSync = &WriteOptions{Sync: false}
378 :
379 : // GetSync returns the Sync value or true if the receiver is nil.
380 2 : func (o *WriteOptions) GetSync() bool {
381 2 : return o == nil || o.Sync
382 2 : }
383 :
384 : // LevelOptions holds the optional per-level parameters.
385 : type LevelOptions struct {
386 : // BlockRestartInterval is the number of keys between restart points
387 : // for delta encoding of keys.
388 : //
389 : // The default value is 16.
390 : BlockRestartInterval int
391 :
392 : // BlockSize is the target uncompressed size in bytes of each table block.
393 : //
394 : // The default value is 4096.
395 : BlockSize int
396 :
397 : // BlockSizeThreshold finishes a block if the block size is larger than the
398 : // specified percentage of the target block size and adding the next entry
399 : // would cause the block to be larger than the target block size.
400 : //
401 : // The default value is 90
402 : BlockSizeThreshold int
403 :
404 : // Compression defines the per-block compression to use.
405 : //
406 : // The default value (DefaultCompression) uses snappy compression.
407 : Compression func() Compression
408 :
409 : // FilterPolicy defines a filter algorithm (such as a Bloom filter) that can
410 : // reduce disk reads for Get calls.
411 : //
412 : // One such implementation is bloom.FilterPolicy(10) from the pebble/bloom
413 : // package.
414 : //
415 : // The default value means to use no filter.
416 : FilterPolicy FilterPolicy
417 :
418 : // FilterType defines whether an existing filter policy is applied at a
419 : // block-level or table-level. Block-level filters use less memory to create,
420 : // but are slower to access as a check for the key in the index must first be
421 : // performed to locate the filter block. A table-level filter will require
422 : // memory proportional to the number of keys in an sstable to create, but
423 : // avoids the index lookup when determining if a key is present. Table-level
424 : // filters should be preferred except under constrained memory situations.
425 : FilterType FilterType
426 :
427 : // IndexBlockSize is the target uncompressed size in bytes of each index
428 : // block. When the index block size is larger than this target, two-level
429 : // indexes are automatically enabled. Setting this option to a large value
430 : // (such as math.MaxInt32) disables the automatic creation of two-level
431 : // indexes.
432 : //
433 : // The default value is the value of BlockSize.
434 : IndexBlockSize int
435 :
436 : // The target file size for the level.
437 : TargetFileSize int64
438 : }
439 :
440 : // EnsureDefaults ensures that the default values for all of the options have
441 : // been initialized. It is valid to call EnsureDefaults on a nil receiver. A
442 : // non-nil result will always be returned.
443 2 : func (o *LevelOptions) EnsureDefaults() *LevelOptions {
444 2 : if o == nil {
445 0 : o = &LevelOptions{}
446 0 : }
447 2 : if o.BlockRestartInterval <= 0 {
448 1 : o.BlockRestartInterval = base.DefaultBlockRestartInterval
449 1 : }
450 2 : if o.BlockSize <= 0 {
451 1 : o.BlockSize = base.DefaultBlockSize
452 2 : } else if o.BlockSize > sstable.MaximumBlockSize {
453 0 : panic(errors.Errorf("BlockSize %d exceeds MaximumBlockSize", o.BlockSize))
454 : }
455 2 : if o.BlockSizeThreshold <= 0 {
456 1 : o.BlockSizeThreshold = base.DefaultBlockSizeThreshold
457 1 : }
458 2 : if o.Compression == nil {
459 1 : o.Compression = func() Compression { return DefaultCompression }
460 : }
461 2 : if o.IndexBlockSize <= 0 {
462 1 : o.IndexBlockSize = o.BlockSize
463 1 : }
464 2 : if o.TargetFileSize <= 0 {
465 1 : o.TargetFileSize = 2 << 20 // 2 MB
466 1 : }
467 2 : return o
468 : }
469 :
470 : // Options holds the optional parameters for configuring pebble. These options
471 : // apply to the DB at large; per-query options are defined by the IterOptions
472 : // and WriteOptions types.
473 : type Options struct {
474 : // Sync sstables periodically in order to smooth out writes to disk. This
475 : // option does not provide any persistency guarantee, but is used to avoid
476 : // latency spikes if the OS automatically decides to write out a large chunk
477 : // of dirty filesystem buffers. This option only controls SSTable syncs; WAL
478 : // syncs are controlled by WALBytesPerSync.
479 : //
480 : // The default value is 512KB.
481 : BytesPerSync int
482 :
483 : // Cache is used to cache uncompressed blocks from sstables.
484 : //
485 : // The default cache size is 8 MB.
486 : Cache *cache.Cache
487 :
488 : // LoadBlockSema, if set, is used to limit the number of blocks that can be
489 : // loaded (i.e. read from the filesystem) in parallel. Each load acquires one
490 : // unit from the semaphore for the duration of the read.
491 : LoadBlockSema *fifo.Semaphore
492 :
493 : // Cleaner cleans obsolete files.
494 : //
495 : // The default cleaner uses the DeleteCleaner.
496 : Cleaner Cleaner
497 :
498 : // Local contains option that pertain to files stored on the local filesystem.
499 : Local struct {
500 : // ReadaheadConfig is used to retrieve the current readahead mode; it is
501 : // consulted whenever a read handle is initialized.
502 : ReadaheadConfig *ReadaheadConfig
503 :
504 : // TODO(radu): move BytesPerSync, LoadBlockSema, Cleaner here.
505 : }
506 :
507 : // Comparer defines a total ordering over the space of []byte keys: a 'less
508 : // than' relationship. The same comparison algorithm must be used for reads
509 : // and writes over the lifetime of the DB.
510 : //
511 : // The default value uses the same ordering as bytes.Compare.
512 : Comparer *Comparer
513 :
514 : // DebugCheck is invoked, if non-nil, whenever a new version is being
515 : // installed. Typically, this is set to pebble.DebugCheckLevels in tests
516 : // or tools only, to check invariants over all the data in the database.
517 : DebugCheck func(*DB) error
518 :
519 : // Disable the write-ahead log (WAL). Disabling the write-ahead log prohibits
520 : // crash recovery, but can improve performance if crash recovery is not
521 : // needed (e.g. when only temporary state is being stored in the database).
522 : //
523 : // TODO(peter): untested
524 : DisableWAL bool
525 :
526 : // ErrorIfExists causes an error on Open if the database already exists.
527 : // The error can be checked with errors.Is(err, ErrDBAlreadyExists).
528 : //
529 : // The default value is false.
530 : ErrorIfExists bool
531 :
532 : // ErrorIfNotExists causes an error on Open if the database does not already
533 : // exist. The error can be checked with errors.Is(err, ErrDBDoesNotExist).
534 : //
535 : // The default value is false which will cause a database to be created if it
536 : // does not already exist.
537 : ErrorIfNotExists bool
538 :
539 : // ErrorIfNotPristine causes an error on Open if the database already exists
540 : // and any operations have been performed on the database. The error can be
541 : // checked with errors.Is(err, ErrDBNotPristine).
542 : //
543 : // Note that a database that contained keys that were all subsequently deleted
544 : // may or may not trigger the error. Currently, we check if there are any live
545 : // SSTs or log records to replay.
546 : ErrorIfNotPristine bool
547 :
548 : // EventListener provides hooks to listening to significant DB events such as
549 : // flushes, compactions, and table deletion.
550 : EventListener *EventListener
551 :
552 : // Experimental contains experimental options which are off by default.
553 : // These options are temporary and will eventually either be deleted, moved
554 : // out of the experimental group, or made the non-adjustable default. These
555 : // options may change at any time, so do not rely on them.
556 : Experimental struct {
557 : // The threshold of L0 read-amplification at which compaction concurrency
558 : // is enabled (if CompactionDebtConcurrency was not already exceeded).
559 : // Every multiple of this value enables another concurrent
560 : // compaction up to MaxConcurrentCompactions.
561 : L0CompactionConcurrency int
562 :
563 : // CompactionDebtConcurrency controls the threshold of compaction debt
564 : // at which additional compaction concurrency slots are added. For every
565 : // multiple of this value in compaction debt bytes, an additional
566 : // concurrent compaction is added. This works "on top" of
567 : // L0CompactionConcurrency, so the higher of the count of compaction
568 : // concurrency slots as determined by the two options is chosen.
569 : CompactionDebtConcurrency uint64
570 :
571 : // IngestSplit, if it returns true, allows for ingest-time splitting of
572 : // existing sstables into two virtual sstables to allow ingestion sstables to
573 : // slot into a lower level than they otherwise would have.
574 : IngestSplit func() bool
575 :
576 : // ReadCompactionRate controls the frequency of read triggered
577 : // compactions by adjusting `AllowedSeeks` in manifest.FileMetadata:
578 : //
579 : // AllowedSeeks = FileSize / ReadCompactionRate
580 : //
581 : // From LevelDB:
582 : // ```
583 : // We arrange to automatically compact this file after
584 : // a certain number of seeks. Let's assume:
585 : // (1) One seek costs 10ms
586 : // (2) Writing or reading 1MB costs 10ms (100MB/s)
587 : // (3) A compaction of 1MB does 25MB of IO:
588 : // 1MB read from this level
589 : // 10-12MB read from next level (boundaries may be misaligned)
590 : // 10-12MB written to next level
591 : // This implies that 25 seeks cost the same as the compaction
592 : // of 1MB of data. I.e., one seek costs approximately the
593 : // same as the compaction of 40KB of data. We are a little
594 : // conservative and allow approximately one seek for every 16KB
595 : // of data before triggering a compaction.
596 : // ```
597 : ReadCompactionRate int64
598 :
599 : // ReadSamplingMultiplier is a multiplier for the readSamplingPeriod in
600 : // iterator.maybeSampleRead() to control the frequency of read sampling
601 : // to trigger a read triggered compaction. A value of -1 prevents sampling
602 : // and disables read triggered compactions. The default is 1 << 4. which
603 : // gets multiplied with a constant of 1 << 16 to yield 1 << 20 (1MB).
604 : ReadSamplingMultiplier int64
605 :
606 : // TableCacheShards is the number of shards per table cache.
607 : // Reducing the value can reduce the number of idle goroutines per DB
608 : // instance which can be useful in scenarios with a lot of DB instances
609 : // and a large number of CPUs, but doing so can lead to higher contention
610 : // in the table cache and reduced performance.
611 : //
612 : // The default value is the number of logical CPUs, which can be
613 : // limited by runtime.GOMAXPROCS.
614 : TableCacheShards int
615 :
616 : // KeyValidationFunc is a function to validate a user key in an SSTable.
617 : //
618 : // Currently, this function is used to validate the smallest and largest
619 : // keys in an SSTable undergoing compaction. In this case, returning an
620 : // error from the validation function will result in a panic at runtime,
621 : // given that there is rarely any way of recovering from malformed keys
622 : // present in compacted files. By default, validation is not performed.
623 : //
624 : // Additional use-cases may be added in the future.
625 : //
626 : // NOTE: callers should take care to not mutate the key being validated.
627 : KeyValidationFunc func(userKey []byte) error
628 :
629 : // ValidateOnIngest schedules validation of sstables after they have
630 : // been ingested.
631 : //
632 : // By default, this value is false.
633 : ValidateOnIngest bool
634 :
635 : // LevelMultiplier configures the size multiplier used to determine the
636 : // desired size of each level of the LSM. Defaults to 10.
637 : LevelMultiplier int
638 :
639 : // MultiLevelCompactionHeuristic determines whether to add an additional
640 : // level to a conventional two level compaction. If nil, a multilevel
641 : // compaction will never get triggered.
642 : MultiLevelCompactionHeuristic MultiLevelHeuristic
643 :
644 : // MaxWriterConcurrency is used to indicate the maximum number of
645 : // compression workers the compression queue is allowed to use. If
646 : // MaxWriterConcurrency > 0, then the Writer will use parallelism, to
647 : // compress and write blocks to disk. Otherwise, the writer will
648 : // compress and write blocks to disk synchronously.
649 : MaxWriterConcurrency int
650 :
651 : // ForceWriterParallelism is used to force parallelism in the sstable
652 : // Writer for the metamorphic tests. Even with the MaxWriterConcurrency
653 : // option set, we only enable parallelism in the sstable Writer if there
654 : // is enough CPU available, and this option bypasses that.
655 : ForceWriterParallelism bool
656 :
657 : // CPUWorkPermissionGranter should be set if Pebble should be given the
658 : // ability to optionally schedule additional CPU. See the documentation
659 : // for CPUWorkPermissionGranter for more details.
660 : CPUWorkPermissionGranter CPUWorkPermissionGranter
661 :
662 : // EnableValueBlocks is used to decide whether to enable writing
663 : // TableFormatPebblev3 sstables. This setting is only respected by a
664 : // specific subset of format major versions: FormatSSTableValueBlocks,
665 : // FormatFlushableIngest and FormatPrePebblev1MarkedCompacted. In lower
666 : // format major versions, value blocks are never enabled. In higher
667 : // format major versions, value blocks are always enabled.
668 : EnableValueBlocks func() bool
669 :
670 : // ShortAttributeExtractor is used iff EnableValueBlocks() returns true
671 : // (else ignored). If non-nil, a ShortAttribute can be extracted from the
672 : // value and stored with the key, when the value is stored elsewhere.
673 : ShortAttributeExtractor ShortAttributeExtractor
674 :
675 : // RequiredInPlaceValueBound specifies an optional span of user key
676 : // prefixes that are not-MVCC, but have a suffix. For these the values
677 : // must be stored with the key, since the concept of "older versions" is
678 : // not defined. It is also useful for statically known exclusions to value
679 : // separation. In CockroachDB, this will be used for the lock table key
680 : // space that has non-empty suffixes, but those locks don't represent
681 : // actual MVCC versions (the suffix ordering is arbitrary). We will also
682 : // need to add support for dynamically configured exclusions (we want the
683 : // default to be to allow Pebble to decide whether to separate the value
684 : // or not, hence this is structured as exclusions), for example, for users
685 : // of CockroachDB to dynamically exclude certain tables.
686 : //
687 : // Any change in exclusion behavior takes effect only on future written
688 : // sstables, and does not start rewriting existing sstables.
689 : //
690 : // Even ignoring changes in this setting, exclusions are interpreted as a
691 : // guidance by Pebble, and not necessarily honored. Specifically, user
692 : // keys with multiple Pebble-versions *may* have the older versions stored
693 : // in value blocks.
694 : RequiredInPlaceValueBound UserKeyPrefixBound
695 :
696 : // DisableIngestAsFlushable disables lazy ingestion of sstables through
697 : // a WAL write and memtable rotation. Only effectual if the format
698 : // major version is at least `FormatFlushableIngest`.
699 : DisableIngestAsFlushable func() bool
700 :
701 : // RemoteStorage enables use of remote storage (e.g. S3) for storing
702 : // sstables. Setting this option enables use of CreateOnShared option and
703 : // allows ingestion of external files.
704 : RemoteStorage remote.StorageFactory
705 :
706 : // If CreateOnShared is non-zero, new sstables are created on remote storage
707 : // (using CreateOnSharedLocator and with the appropriate
708 : // CreateOnSharedStrategy). These sstables can be shared between different
709 : // Pebble instances; the lifecycle of such objects is managed by the
710 : // remote.Storage constructed by options.RemoteStorage.
711 : //
712 : // Can only be used when RemoteStorage is set (and recognizes
713 : // CreateOnSharedLocator).
714 : CreateOnShared remote.CreateOnSharedStrategy
715 : CreateOnSharedLocator remote.Locator
716 :
717 : // CacheSizeBytesBytes is the size of the on-disk block cache for objects
718 : // on shared storage in bytes. If it is 0, no cache is used.
719 : SecondaryCacheSizeBytes int64
720 :
721 : // NB: DO NOT crash on SingleDeleteInvariantViolationCallback or
722 : // IneffectualSingleDeleteCallback, since these can be false positives
723 : // even if SingleDel has been used correctly.
724 : //
725 : // Pebble's delete-only compactions can cause a recent RANGEDEL to peek
726 : // below an older SINGLEDEL and delete an arbitrary subset of data below
727 : // that SINGLEDEL. When that SINGLEDEL gets compacted (without the
728 : // RANGEDEL), any of these callbacks can happen, without it being a real
729 : // correctness problem.
730 : //
731 : // Example 1:
732 : // RANGEDEL [a, c)#10 in L0
733 : // SINGLEDEL b#5 in L1
734 : // SET b#3 in L6
735 : //
736 : // If the L6 file containing the SET is narrow and the L1 file containing
737 : // the SINGLEDEL is wide, a delete-only compaction can remove the file in
738 : // L2 before the SINGLEDEL is compacted down. Then when the SINGLEDEL is
739 : // compacted down, it will not find any SET to delete, resulting in the
740 : // ineffectual callback.
741 : //
742 : // Example 2:
743 : // RANGEDEL [a, z)#60 in L0
744 : // SINGLEDEL g#50 in L1
745 : // SET g#40 in L2
746 : // RANGEDEL [g,h)#30 in L3
747 : // SET g#20 in L6
748 : //
749 : // In this example, the two SETs represent the same user write, and the
750 : // RANGEDELs are caused by the CockroachDB range being dropped. That is,
751 : // the user wrote to g once, range was dropped, then added back, which
752 : // caused the SET again, then at some point g was validly deleted using a
753 : // SINGLEDEL, and then the range was dropped again. The older RANGEDEL can
754 : // get fragmented due to compactions it has been part of. Say this L3 file
755 : // containing the RANGEDEL is very narrow, while the L1, L2, L6 files are
756 : // wider than the RANGEDEL in L0. Then the RANGEDEL in L3 can be dropped
757 : // using a delete-only compaction, resulting in an LSM with state:
758 : //
759 : // RANGEDEL [a, z)#60 in L0
760 : // SINGLEDEL g#50 in L1
761 : // SET g#40 in L2
762 : // SET g#20 in L6
763 : //
764 : // A multi-level compaction involving L1, L2, L6 will cause the invariant
765 : // violation callback. This example doesn't need multi-level compactions:
766 : // say there was a Pebble snapshot at g#21 preventing g#20 from being
767 : // dropped when it meets g#40 in a compaction. That snapshot will not save
768 : // RANGEDEL [g,h)#30, so we can have:
769 : //
770 : // SINGLEDEL g#50 in L1
771 : // SET g#40, SET g#20 in L6
772 : //
773 : // And say the snapshot is removed and then the L1 and L6 compaction
774 : // happens, resulting in the invariant violation callback.
775 : //
776 : // TODO(sumeer): rename SingleDeleteInvariantViolationCallback to remove
777 : // the word "invariant".
778 :
779 : // IneffectualPointDeleteCallback is called in compactions/flushes if any
780 : // single delete is being elided without deleting a point set/merge.
781 : IneffectualSingleDeleteCallback func(userKey []byte)
782 :
783 : // SingleDeleteInvariantViolationCallback is called in compactions/flushes if any
784 : // single delete has consumed a Set/Merge, and there is another immediately older
785 : // Set/SetWithDelete/Merge. The user of Pebble has violated the invariant under
786 : // which SingleDelete can be used correctly.
787 : //
788 : // Consider the sequence SingleDelete#3, Set#2, Set#1. There are three
789 : // ways some of these keys can first meet in a compaction.
790 : //
791 : // - All 3 keys in the same compaction: this callback will detect the
792 : // violation.
793 : //
794 : // - SingleDelete#3, Set#2 meet in a compaction first: Both keys will
795 : // disappear. The violation will not be detected, and the DB will have
796 : // Set#1 which is likely incorrect (from the user's perspective).
797 : //
798 : // - Set#2, Set#1 meet in a compaction first: The output will be Set#2,
799 : // which will later be consumed by SingleDelete#3. The violation will
800 : // not be detected and the DB will be correct.
801 : SingleDeleteInvariantViolationCallback func(userKey []byte)
802 : }
803 :
804 : // Filters is a map from filter policy name to filter policy. It is used for
805 : // debugging tools which may be used on multiple databases configured with
806 : // different filter policies. It is not necessary to populate this filters
807 : // map during normal usage of a DB (it will be done automatically by
808 : // EnsureDefaults).
809 : Filters map[string]FilterPolicy
810 :
811 : // FlushDelayDeleteRange configures how long the database should wait before
812 : // forcing a flush of a memtable that contains a range deletion. Disk space
813 : // cannot be reclaimed until the range deletion is flushed. No automatic
814 : // flush occurs if zero.
815 : FlushDelayDeleteRange time.Duration
816 :
817 : // FlushDelayRangeKey configures how long the database should wait before
818 : // forcing a flush of a memtable that contains a range key. Range keys in
819 : // the memtable prevent lazy combined iteration, so it's desirable to flush
820 : // range keys promptly. No automatic flush occurs if zero.
821 : FlushDelayRangeKey time.Duration
822 :
823 : // FlushSplitBytes denotes the target number of bytes per sublevel in
824 : // each flush split interval (i.e. range between two flush split keys)
825 : // in L0 sstables. When set to zero, only a single sstable is generated
826 : // by each flush. When set to a non-zero value, flushes are split at
827 : // points to meet L0's TargetFileSize, any grandparent-related overlap
828 : // options, and at boundary keys of L0 flush split intervals (which are
829 : // targeted to contain around FlushSplitBytes bytes in each sublevel
830 : // between pairs of boundary keys). Splitting sstables during flush
831 : // allows increased compaction flexibility and concurrency when those
832 : // tables are compacted to lower levels.
833 : FlushSplitBytes int64
834 :
835 : // FormatMajorVersion sets the format of on-disk files. It is
836 : // recommended to set the format major version to an explicit
837 : // version, as the default may change over time.
838 : //
839 : // At Open if the existing database is formatted using a later
840 : // format major version that is known to this version of Pebble,
841 : // Pebble will continue to use the later format major version. If
842 : // the existing database's version is unknown, the caller may use
843 : // FormatMostCompatible and will be able to open the database
844 : // regardless of its actual version.
845 : //
846 : // If the existing database is formatted using a format major
847 : // version earlier than the one specified, Open will automatically
848 : // ratchet the database to the specified format major version.
849 : FormatMajorVersion FormatMajorVersion
850 :
851 : // FS provides the interface for persistent file storage.
852 : //
853 : // The default value uses the underlying operating system's file system.
854 : FS vfs.FS
855 :
856 : // Lock, if set, must be a database lock acquired through LockDirectory for
857 : // the same directory passed to Open. If provided, Open will skip locking
858 : // the directory. Closing the database will not release the lock, and it's
859 : // the responsibility of the caller to release the lock after closing the
860 : // database.
861 : //
862 : // Open will enforce that the Lock passed locks the same directory passed to
863 : // Open. Concurrent calls to Open using the same Lock are detected and
864 : // prohibited.
865 : Lock *Lock
866 :
867 : // The count of L0 files necessary to trigger an L0 compaction.
868 : L0CompactionFileThreshold int
869 :
870 : // The amount of L0 read-amplification necessary to trigger an L0 compaction.
871 : L0CompactionThreshold int
872 :
873 : // Hard limit on L0 read-amplification, computed as the number of L0
874 : // sublevels. Writes are stopped when this threshold is reached.
875 : L0StopWritesThreshold int
876 :
877 : // The maximum number of bytes for LBase. The base level is the level which
878 : // L0 is compacted into. The base level is determined dynamically based on
879 : // the existing data in the LSM. The maximum number of bytes for other levels
880 : // is computed dynamically based on the base level's maximum size. When the
881 : // maximum number of bytes for a level is exceeded, compaction is requested.
882 : LBaseMaxBytes int64
883 :
884 : // Per-level options. Options for at least one level must be specified. The
885 : // options for the last level are used for all subsequent levels.
886 : Levels []LevelOptions
887 :
888 : // LoggerAndTracer will be used, if non-nil, else Logger will be used and
889 : // tracing will be a noop.
890 :
891 : // Logger used to write log messages.
892 : //
893 : // The default logger uses the Go standard library log package.
894 : Logger Logger
895 : // LoggerAndTracer is used for writing log messages and traces.
896 : LoggerAndTracer LoggerAndTracer
897 :
898 : // MaxManifestFileSize is the maximum size the MANIFEST file is allowed to
899 : // become. When the MANIFEST exceeds this size it is rolled over and a new
900 : // MANIFEST is created.
901 : MaxManifestFileSize int64
902 :
903 : // MaxOpenFiles is a soft limit on the number of open files that can be
904 : // used by the DB.
905 : //
906 : // The default value is 1000.
907 : MaxOpenFiles int
908 :
909 : // The size of a MemTable in steady state. The actual MemTable size starts at
910 : // min(256KB, MemTableSize) and doubles for each subsequent MemTable up to
911 : // MemTableSize. This reduces the memory pressure caused by MemTables for
912 : // short lived (test) DB instances. Note that more than one MemTable can be
913 : // in existence since flushing a MemTable involves creating a new one and
914 : // writing the contents of the old one in the
915 : // background. MemTableStopWritesThreshold places a hard limit on the size of
916 : // the queued MemTables.
917 : //
918 : // The default value is 4MB.
919 : MemTableSize uint64
920 :
921 : // Hard limit on the number of queued of MemTables. Writes are stopped when
922 : // the sum of the queued memtable sizes exceeds:
923 : // MemTableStopWritesThreshold * MemTableSize.
924 : //
925 : // This value should be at least 2 or writes will stop whenever a MemTable is
926 : // being flushed.
927 : //
928 : // The default value is 2.
929 : MemTableStopWritesThreshold int
930 :
931 : // Merger defines the associative merge operation to use for merging values
932 : // written with {Batch,DB}.Merge.
933 : //
934 : // The default merger concatenates values.
935 : Merger *Merger
936 :
937 : // MaxConcurrentCompactions specifies the maximum number of concurrent
938 : // compactions (not including download compactions).
939 : //
940 : // Concurrent compactions are performed:
941 : // - when L0 read-amplification passes the L0CompactionConcurrency threshold;
942 : // - for automatic background compactions;
943 : // - when a manual compaction for a level is split and parallelized.
944 : //
945 : // MaxConcurrentCompactions() must be greater than 0.
946 : //
947 : // The default value is 1.
948 : MaxConcurrentCompactions func() int
949 :
950 : // MaxConcurrentDownloads specifies the maximum number of download
951 : // compactions. These are compactions that copy an external file to the local
952 : // store.
953 : //
954 : // This limit is independent of MaxConcurrentCompactions; at any point in
955 : // time, we may be running MaxConcurrentCompactions non-download compactions
956 : // and MaxConcurrentDownloads download compactions.
957 : //
958 : // MaxConcurrentDownloads() must be greater than 0.
959 : //
960 : // The default value is 1.
961 : MaxConcurrentDownloads func() int
962 :
963 : // DisableAutomaticCompactions dictates whether automatic compactions are
964 : // scheduled or not. The default is false (enabled). This option is only used
965 : // externally when running a manual compaction, and internally for tests.
966 : DisableAutomaticCompactions bool
967 :
968 : // DisableConsistencyCheck disables the consistency check that is performed on
969 : // open. Should only be used when a database cannot be opened normally (e.g.
970 : // some of the tables don't exist / aren't accessible).
971 : DisableConsistencyCheck bool
972 :
973 : // DisableTableStats dictates whether tables should be loaded asynchronously
974 : // to compute statistics that inform compaction heuristics. The collection
975 : // of table stats improves compaction of tombstones, reclaiming disk space
976 : // more quickly and in some cases reducing write amplification in the
977 : // presence of tombstones. Disabling table stats may be useful in tests
978 : // that require determinism as the asynchronicity of table stats collection
979 : // introduces significant nondeterminism.
980 : DisableTableStats bool
981 :
982 : // NoSyncOnClose decides whether the Pebble instance will enforce a
983 : // close-time synchronization (e.g., fdatasync() or sync_file_range())
984 : // on files it writes to. Setting this to true removes the guarantee for a
985 : // sync on close. Some implementations can still issue a non-blocking sync.
986 : NoSyncOnClose bool
987 :
988 : // NumPrevManifest is the number of non-current or older manifests which
989 : // we want to keep around for debugging purposes. By default, we're going
990 : // to keep one older manifest.
991 : NumPrevManifest int
992 :
993 : // ReadOnly indicates that the DB should be opened in read-only mode. Writes
994 : // to the DB will return an error, background compactions are disabled, and
995 : // the flush that normally occurs after replaying the WAL at startup is
996 : // disabled.
997 : ReadOnly bool
998 :
999 : // TableCache is an initialized TableCache which should be set as an
1000 : // option if the DB needs to be initialized with a pre-existing table cache.
1001 : // If TableCache is nil, then a table cache which is unique to the DB instance
1002 : // is created. TableCache can be shared between db instances by setting it here.
1003 : // The TableCache set here must use the same underlying cache as Options.Cache
1004 : // and pebble will panic otherwise.
1005 : TableCache *TableCache
1006 :
1007 : // BlockPropertyCollectors is a list of BlockPropertyCollector creation
1008 : // functions. A new BlockPropertyCollector is created for each sstable
1009 : // built and lives for the lifetime of writing that table.
1010 : BlockPropertyCollectors []func() BlockPropertyCollector
1011 :
1012 : // WALBytesPerSync sets the number of bytes to write to a WAL before calling
1013 : // Sync on it in the background. Just like with BytesPerSync above, this
1014 : // helps smooth out disk write latencies, and avoids cases where the OS
1015 : // writes a lot of buffered data to disk at once. However, this is less
1016 : // necessary with WALs, as many write operations already pass in
1017 : // Sync = true.
1018 : //
1019 : // The default value is 0, i.e. no background syncing. This matches the
1020 : // default behaviour in RocksDB.
1021 : WALBytesPerSync int
1022 :
1023 : // WALDir specifies the directory to store write-ahead logs (WALs) in. If
1024 : // empty (the default), WALs will be stored in the same directory as sstables
1025 : // (i.e. the directory passed to pebble.Open).
1026 : WALDir string
1027 :
1028 : // WALFailover may be set to configure Pebble to monitor writes to its
1029 : // write-ahead log and failover to writing write-ahead log entries to a
1030 : // secondary location (eg, a separate physical disk). WALFailover may be
1031 : // used to improve write availability in the presence of transient disk
1032 : // unavailability.
1033 : WALFailover *WALFailoverOptions
1034 :
1035 : // WALRecoveryDirs is a list of additional directories that should be
1036 : // scanned for the existence of additional write-ahead logs. WALRecoveryDirs
1037 : // is expected to be used when starting Pebble with a new WALDir or a new
1038 : // WALFailover configuration. The directories associated with the previous
1039 : // configuration may still contain WALs that are required for recovery of
1040 : // the current database state.
1041 : //
1042 : // If a previous WAL configuration may have stored WALs elsewhere but there
1043 : // is not a corresponding entry in WALRecoveryDirs, Open will error.
1044 : WALRecoveryDirs []wal.Dir
1045 :
1046 : // WALMinSyncInterval is the minimum duration between syncs of the WAL. If
1047 : // WAL syncs are requested faster than this interval, they will be
1048 : // artificially delayed. Introducing a small artificial delay (500us) between
1049 : // WAL syncs can allow more operations to arrive and reduce IO operations
1050 : // while having a minimal impact on throughput. This option is supplied as a
1051 : // closure in order to allow the value to be changed dynamically. The default
1052 : // value is 0.
1053 : //
1054 : // TODO(peter): rather than a closure, should there be another mechanism for
1055 : // changing options dynamically?
1056 : WALMinSyncInterval func() time.Duration
1057 :
1058 : // TargetByteDeletionRate is the rate (in bytes per second) at which sstable file
1059 : // deletions are limited to (under normal circumstances).
1060 : //
1061 : // Deletion pacing is used to slow down deletions when compactions finish up
1062 : // or readers close and newly-obsolete files need cleaning up. Deleting lots
1063 : // of files at once can cause disk latency to go up on some SSDs, which this
1064 : // functionality guards against.
1065 : //
1066 : // This value is only a best-effort target; the effective rate can be
1067 : // higher if deletions are falling behind or disk space is running low.
1068 : //
1069 : // Setting this to 0 disables deletion pacing, which is also the default.
1070 : TargetByteDeletionRate int
1071 :
1072 : // EnableSQLRowSpillMetrics specifies whether the Pebble instance will only be used
1073 : // to temporarily persist data spilled to disk for row-oriented SQL query execution.
1074 : EnableSQLRowSpillMetrics bool
1075 :
1076 : // AllocatorSizeClasses provides a sorted list containing the supported size
1077 : // classes of the underlying memory allocator. This provides hints to the
1078 : // sstable block writer's flushing policy to select block sizes that
1079 : // preemptively reduce internal fragmentation when loaded into the block cache.
1080 : AllocatorSizeClasses []int
1081 :
1082 : // private options are only used by internal tests or are used internally
1083 : // for facilitating upgrade paths of unconfigurable functionality.
1084 : private struct {
1085 : // disableDeleteOnlyCompactions prevents the scheduling of delete-only
1086 : // compactions that drop sstables wholy covered by range tombstones or
1087 : // range key tombstones.
1088 : disableDeleteOnlyCompactions bool
1089 :
1090 : // disableElisionOnlyCompactions prevents the scheduling of elision-only
1091 : // compactions that rewrite sstables in place in order to elide obsolete
1092 : // keys.
1093 : disableElisionOnlyCompactions bool
1094 :
1095 : // disableLazyCombinedIteration is a private option used by the
1096 : // metamorphic tests to test equivalence between lazy-combined iteration
1097 : // and constructing the range-key iterator upfront. It's a private
1098 : // option to avoid littering the public interface with options that we
1099 : // do not want to allow users to actually configure.
1100 : disableLazyCombinedIteration bool
1101 :
1102 : // testingAlwaysWaitForCleanup is set by some tests to force waiting for
1103 : // obsolete file deletion (to make events deterministic).
1104 : testingAlwaysWaitForCleanup bool
1105 :
1106 : // fsCloser holds a closer that should be invoked after a DB using these
1107 : // Options is closed. This is used to automatically stop the
1108 : // long-running goroutine associated with the disk-health-checking FS.
1109 : // See the initialization of FS in EnsureDefaults. Note that care has
1110 : // been taken to ensure that it is still safe to continue using the FS
1111 : // after this closer has been invoked. However, if write operations
1112 : // against the FS are made after the DB is closed, the FS may leak a
1113 : // goroutine indefinitely.
1114 : fsCloser io.Closer
1115 : }
1116 : }
1117 :
1118 : // WALFailoverOptions configures the WAL failover mechanics to use during
1119 : // transient write unavailability on the primary WAL volume.
1120 : type WALFailoverOptions struct {
1121 : // Secondary indicates the secondary directory and VFS to use in the event a
1122 : // write to the primary WAL stalls.
1123 : Secondary wal.Dir
1124 : // FailoverOptions provides configuration of the thresholds and intervals
1125 : // involved in WAL failover. If any of its fields are left unspecified,
1126 : // reasonable defaults will be used.
1127 : wal.FailoverOptions
1128 : }
1129 :
1130 : // ReadaheadConfig controls the use of read-ahead.
1131 : type ReadaheadConfig = objstorageprovider.ReadaheadConfig
1132 :
1133 : // DebugCheckLevels calls CheckLevels on the provided database.
1134 : // It may be set in the DebugCheck field of Options to check
1135 : // level invariants whenever a new version is installed.
1136 2 : func DebugCheckLevels(db *DB) error {
1137 2 : return db.CheckLevels(nil)
1138 2 : }
1139 :
1140 : // EnsureDefaults ensures that the default values for all options are set if a
1141 : // valid value was not already specified. Returns the new options.
1142 2 : func (o *Options) EnsureDefaults() *Options {
1143 2 : if o == nil {
1144 1 : o = &Options{}
1145 1 : }
1146 2 : o.Comparer = o.Comparer.EnsureDefaults()
1147 2 :
1148 2 : if o.BytesPerSync <= 0 {
1149 1 : o.BytesPerSync = 512 << 10 // 512 KB
1150 1 : }
1151 2 : if o.Cleaner == nil {
1152 1 : o.Cleaner = DeleteCleaner{}
1153 1 : }
1154 :
1155 2 : if o.Experimental.DisableIngestAsFlushable == nil {
1156 2 : o.Experimental.DisableIngestAsFlushable = func() bool { return false }
1157 : }
1158 2 : if o.Experimental.L0CompactionConcurrency <= 0 {
1159 1 : o.Experimental.L0CompactionConcurrency = 10
1160 1 : }
1161 2 : if o.Experimental.CompactionDebtConcurrency <= 0 {
1162 1 : o.Experimental.CompactionDebtConcurrency = 1 << 30 // 1 GB
1163 1 : }
1164 2 : if o.Experimental.KeyValidationFunc == nil {
1165 2 : o.Experimental.KeyValidationFunc = func([]byte) error { return nil }
1166 : }
1167 2 : if o.L0CompactionThreshold <= 0 {
1168 1 : o.L0CompactionThreshold = 4
1169 1 : }
1170 2 : if o.L0CompactionFileThreshold <= 0 {
1171 1 : // Some justification for the default of 500:
1172 1 : // Why not smaller?:
1173 1 : // - The default target file size for L0 is 2MB, so 500 files is <= 1GB
1174 1 : // of data. At observed compaction speeds of > 20MB/s, L0 can be
1175 1 : // cleared of all files in < 1min, so this backlog is not huge.
1176 1 : // - 500 files is low overhead for instantiating L0 sublevels from
1177 1 : // scratch.
1178 1 : // - Lower values were observed to cause excessive and inefficient
1179 1 : // compactions out of L0 in a TPCC import benchmark.
1180 1 : // Why not larger?:
1181 1 : // - More than 1min to compact everything out of L0.
1182 1 : // - CockroachDB's admission control system uses a threshold of 1000
1183 1 : // files to start throttling writes to Pebble. Using 500 here gives
1184 1 : // us headroom between when Pebble should start compacting L0 and
1185 1 : // when the admission control threshold is reached.
1186 1 : //
1187 1 : // We can revisit this default in the future based on better
1188 1 : // experimental understanding.
1189 1 : //
1190 1 : // TODO(jackson): Experiment with slightly lower thresholds [or higher
1191 1 : // admission control thresholds] to see whether a higher L0 score at the
1192 1 : // threshold (currently 2.0) is necessary for some workloads to avoid
1193 1 : // starving L0 in favor of lower-level compactions.
1194 1 : o.L0CompactionFileThreshold = 500
1195 1 : }
1196 2 : if o.L0StopWritesThreshold <= 0 {
1197 1 : o.L0StopWritesThreshold = 12
1198 1 : }
1199 2 : if o.LBaseMaxBytes <= 0 {
1200 1 : o.LBaseMaxBytes = 64 << 20 // 64 MB
1201 1 : }
1202 2 : if o.Levels == nil {
1203 1 : o.Levels = make([]LevelOptions, 1)
1204 1 : for i := range o.Levels {
1205 1 : if i > 0 {
1206 0 : l := &o.Levels[i]
1207 0 : if l.TargetFileSize <= 0 {
1208 0 : l.TargetFileSize = o.Levels[i-1].TargetFileSize * 2
1209 0 : }
1210 : }
1211 1 : o.Levels[i].EnsureDefaults()
1212 : }
1213 2 : } else {
1214 2 : for i := range o.Levels {
1215 2 : o.Levels[i].EnsureDefaults()
1216 2 : }
1217 : }
1218 2 : if o.Logger == nil {
1219 2 : o.Logger = DefaultLogger
1220 2 : }
1221 2 : if o.EventListener == nil {
1222 2 : o.EventListener = &EventListener{}
1223 2 : }
1224 2 : o.EventListener.EnsureDefaults(o.Logger)
1225 2 : if o.MaxManifestFileSize == 0 {
1226 1 : o.MaxManifestFileSize = 128 << 20 // 128 MB
1227 1 : }
1228 2 : if o.MaxOpenFiles == 0 {
1229 1 : o.MaxOpenFiles = 1000
1230 1 : }
1231 2 : if o.MemTableSize <= 0 {
1232 1 : o.MemTableSize = 4 << 20 // 4 MB
1233 1 : }
1234 2 : if o.MemTableStopWritesThreshold <= 0 {
1235 1 : o.MemTableStopWritesThreshold = 2
1236 1 : }
1237 2 : if o.Merger == nil {
1238 1 : o.Merger = DefaultMerger
1239 1 : }
1240 2 : if o.MaxConcurrentCompactions == nil {
1241 1 : o.MaxConcurrentCompactions = func() int { return 1 }
1242 : }
1243 2 : if o.MaxConcurrentDownloads == nil {
1244 1 : o.MaxConcurrentDownloads = func() int { return 1 }
1245 : }
1246 2 : if o.NumPrevManifest <= 0 {
1247 2 : o.NumPrevManifest = 1
1248 2 : }
1249 :
1250 2 : if o.FormatMajorVersion == FormatDefault {
1251 1 : o.FormatMajorVersion = FormatMinSupported
1252 1 : if o.Experimental.CreateOnShared != remote.CreateOnSharedNone {
1253 1 : o.FormatMajorVersion = FormatMinForSharedObjects
1254 1 : }
1255 : }
1256 :
1257 2 : if o.FS == nil {
1258 1 : o.WithFSDefaults()
1259 1 : }
1260 2 : if o.FlushSplitBytes <= 0 {
1261 1 : o.FlushSplitBytes = 2 * o.Levels[0].TargetFileSize
1262 1 : }
1263 2 : if o.WALFailover != nil {
1264 2 : o.WALFailover.FailoverOptions.EnsureDefaults()
1265 2 : }
1266 2 : if o.Experimental.LevelMultiplier <= 0 {
1267 2 : o.Experimental.LevelMultiplier = defaultLevelMultiplier
1268 2 : }
1269 2 : if o.Experimental.ReadCompactionRate == 0 {
1270 1 : o.Experimental.ReadCompactionRate = 16000
1271 1 : }
1272 2 : if o.Experimental.ReadSamplingMultiplier == 0 {
1273 1 : o.Experimental.ReadSamplingMultiplier = 1 << 4
1274 1 : }
1275 2 : if o.Experimental.TableCacheShards <= 0 {
1276 1 : o.Experimental.TableCacheShards = runtime.GOMAXPROCS(0)
1277 1 : }
1278 2 : if o.Experimental.CPUWorkPermissionGranter == nil {
1279 2 : o.Experimental.CPUWorkPermissionGranter = defaultCPUWorkGranter{}
1280 2 : }
1281 2 : if o.Experimental.MultiLevelCompactionHeuristic == nil {
1282 1 : o.Experimental.MultiLevelCompactionHeuristic = WriteAmpHeuristic{}
1283 1 : }
1284 :
1285 2 : o.initMaps()
1286 2 : return o
1287 : }
1288 :
1289 : // WithFSDefaults configures the Options to wrap the configured filesystem with
1290 : // the default virtual file system middleware, like disk-health checking.
1291 2 : func (o *Options) WithFSDefaults() *Options {
1292 2 : if o.FS == nil {
1293 1 : o.FS = vfs.Default
1294 1 : }
1295 2 : o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(o.FS, 5*time.Second, nil,
1296 2 : func(info vfs.DiskSlowInfo) {
1297 0 : o.EventListener.DiskSlow(info)
1298 0 : })
1299 2 : return o
1300 : }
1301 :
1302 : // AddEventListener adds the provided event listener to the Options, in addition
1303 : // to any existing event listener.
1304 1 : func (o *Options) AddEventListener(l EventListener) {
1305 1 : if o.EventListener != nil {
1306 1 : l = TeeEventListener(l, *o.EventListener)
1307 1 : }
1308 1 : o.EventListener = &l
1309 : }
1310 :
1311 : // initMaps initializes the Comparers, Filters, and Mergers maps.
1312 2 : func (o *Options) initMaps() {
1313 2 : for i := range o.Levels {
1314 2 : l := &o.Levels[i]
1315 2 : if l.FilterPolicy != nil {
1316 2 : if o.Filters == nil {
1317 2 : o.Filters = make(map[string]FilterPolicy)
1318 2 : }
1319 2 : name := l.FilterPolicy.Name()
1320 2 : if _, ok := o.Filters[name]; !ok {
1321 2 : o.Filters[name] = l.FilterPolicy
1322 2 : }
1323 : }
1324 : }
1325 : }
1326 :
1327 : // Level returns the LevelOptions for the specified level.
1328 2 : func (o *Options) Level(level int) LevelOptions {
1329 2 : if level < len(o.Levels) {
1330 2 : return o.Levels[level]
1331 2 : }
1332 2 : n := len(o.Levels) - 1
1333 2 : l := o.Levels[n]
1334 2 : for i := n; i < level; i++ {
1335 2 : l.TargetFileSize *= 2
1336 2 : }
1337 2 : return l
1338 : }
1339 :
1340 : // Clone creates a shallow-copy of the supplied options.
1341 2 : func (o *Options) Clone() *Options {
1342 2 : n := &Options{}
1343 2 : if o != nil {
1344 2 : *n = *o
1345 2 : }
1346 2 : return n
1347 : }
1348 :
1349 2 : func filterPolicyName(p FilterPolicy) string {
1350 2 : if p == nil {
1351 2 : return "none"
1352 2 : }
1353 2 : return p.Name()
1354 : }
1355 :
1356 2 : func (o *Options) String() string {
1357 2 : var buf bytes.Buffer
1358 2 :
1359 2 : cacheSize := int64(cacheDefaultSize)
1360 2 : if o.Cache != nil {
1361 2 : cacheSize = o.Cache.MaxSize()
1362 2 : }
1363 :
1364 2 : fmt.Fprintf(&buf, "[Version]\n")
1365 2 : fmt.Fprintf(&buf, " pebble_version=0.1\n")
1366 2 : fmt.Fprintf(&buf, "\n")
1367 2 : fmt.Fprintf(&buf, "[Options]\n")
1368 2 : fmt.Fprintf(&buf, " bytes_per_sync=%d\n", o.BytesPerSync)
1369 2 : fmt.Fprintf(&buf, " cache_size=%d\n", cacheSize)
1370 2 : fmt.Fprintf(&buf, " cleaner=%s\n", o.Cleaner)
1371 2 : fmt.Fprintf(&buf, " compaction_debt_concurrency=%d\n", o.Experimental.CompactionDebtConcurrency)
1372 2 : fmt.Fprintf(&buf, " comparer=%s\n", o.Comparer.Name)
1373 2 : fmt.Fprintf(&buf, " disable_wal=%t\n", o.DisableWAL)
1374 2 : if o.Experimental.DisableIngestAsFlushable != nil && o.Experimental.DisableIngestAsFlushable() {
1375 2 : fmt.Fprintf(&buf, " disable_ingest_as_flushable=%t\n", true)
1376 2 : }
1377 2 : fmt.Fprintf(&buf, " flush_delay_delete_range=%s\n", o.FlushDelayDeleteRange)
1378 2 : fmt.Fprintf(&buf, " flush_delay_range_key=%s\n", o.FlushDelayRangeKey)
1379 2 : fmt.Fprintf(&buf, " flush_split_bytes=%d\n", o.FlushSplitBytes)
1380 2 : fmt.Fprintf(&buf, " format_major_version=%d\n", o.FormatMajorVersion)
1381 2 : fmt.Fprintf(&buf, " l0_compaction_concurrency=%d\n", o.Experimental.L0CompactionConcurrency)
1382 2 : fmt.Fprintf(&buf, " l0_compaction_file_threshold=%d\n", o.L0CompactionFileThreshold)
1383 2 : fmt.Fprintf(&buf, " l0_compaction_threshold=%d\n", o.L0CompactionThreshold)
1384 2 : fmt.Fprintf(&buf, " l0_stop_writes_threshold=%d\n", o.L0StopWritesThreshold)
1385 2 : fmt.Fprintf(&buf, " lbase_max_bytes=%d\n", o.LBaseMaxBytes)
1386 2 : if o.Experimental.LevelMultiplier != defaultLevelMultiplier {
1387 2 : fmt.Fprintf(&buf, " level_multiplier=%d\n", o.Experimental.LevelMultiplier)
1388 2 : }
1389 2 : fmt.Fprintf(&buf, " max_concurrent_compactions=%d\n", o.MaxConcurrentCompactions())
1390 2 : fmt.Fprintf(&buf, " max_concurrent_downloads=%d\n", o.MaxConcurrentDownloads())
1391 2 : fmt.Fprintf(&buf, " max_manifest_file_size=%d\n", o.MaxManifestFileSize)
1392 2 : fmt.Fprintf(&buf, " max_open_files=%d\n", o.MaxOpenFiles)
1393 2 : fmt.Fprintf(&buf, " mem_table_size=%d\n", o.MemTableSize)
1394 2 : fmt.Fprintf(&buf, " mem_table_stop_writes_threshold=%d\n", o.MemTableStopWritesThreshold)
1395 2 : fmt.Fprintf(&buf, " min_deletion_rate=%d\n", o.TargetByteDeletionRate)
1396 2 : fmt.Fprintf(&buf, " merger=%s\n", o.Merger.Name)
1397 2 : if o.Experimental.MultiLevelCompactionHeuristic != nil {
1398 2 : fmt.Fprintf(&buf, " multilevel_compaction_heuristic=%s\n", o.Experimental.MultiLevelCompactionHeuristic.String())
1399 2 : }
1400 2 : fmt.Fprintf(&buf, " read_compaction_rate=%d\n", o.Experimental.ReadCompactionRate)
1401 2 : fmt.Fprintf(&buf, " read_sampling_multiplier=%d\n", o.Experimental.ReadSamplingMultiplier)
1402 2 : // We no longer care about strict_wal_tail, but set it to true in case an
1403 2 : // older version reads the options.
1404 2 : fmt.Fprintf(&buf, " strict_wal_tail=%t\n", true)
1405 2 : fmt.Fprintf(&buf, " table_cache_shards=%d\n", o.Experimental.TableCacheShards)
1406 2 : fmt.Fprintf(&buf, " validate_on_ingest=%t\n", o.Experimental.ValidateOnIngest)
1407 2 : fmt.Fprintf(&buf, " wal_dir=%s\n", o.WALDir)
1408 2 : fmt.Fprintf(&buf, " wal_bytes_per_sync=%d\n", o.WALBytesPerSync)
1409 2 : fmt.Fprintf(&buf, " max_writer_concurrency=%d\n", o.Experimental.MaxWriterConcurrency)
1410 2 : fmt.Fprintf(&buf, " force_writer_parallelism=%t\n", o.Experimental.ForceWriterParallelism)
1411 2 : fmt.Fprintf(&buf, " secondary_cache_size_bytes=%d\n", o.Experimental.SecondaryCacheSizeBytes)
1412 2 : fmt.Fprintf(&buf, " create_on_shared=%d\n", o.Experimental.CreateOnShared)
1413 2 :
1414 2 : // Private options.
1415 2 : //
1416 2 : // These options are only encoded if true, because we do not want them to
1417 2 : // appear in production serialized Options files, since they're testing-only
1418 2 : // options. They're only serialized when true, which still ensures that the
1419 2 : // metamorphic tests may propagate them to subprocesses.
1420 2 : if o.private.disableDeleteOnlyCompactions {
1421 2 : fmt.Fprintln(&buf, " disable_delete_only_compactions=true")
1422 2 : }
1423 2 : if o.private.disableElisionOnlyCompactions {
1424 2 : fmt.Fprintln(&buf, " disable_elision_only_compactions=true")
1425 2 : }
1426 2 : if o.private.disableLazyCombinedIteration {
1427 2 : fmt.Fprintln(&buf, " disable_lazy_combined_iteration=true")
1428 2 : }
1429 :
1430 2 : if o.WALFailover != nil {
1431 2 : unhealthyThreshold, _ := o.WALFailover.FailoverOptions.UnhealthyOperationLatencyThreshold()
1432 2 : fmt.Fprintf(&buf, "\n")
1433 2 : fmt.Fprintf(&buf, "[WAL Failover]\n")
1434 2 : fmt.Fprintf(&buf, " secondary_dir=%s\n", o.WALFailover.Secondary.Dirname)
1435 2 : fmt.Fprintf(&buf, " primary_dir_probe_interval=%s\n", o.WALFailover.FailoverOptions.PrimaryDirProbeInterval)
1436 2 : fmt.Fprintf(&buf, " healthy_probe_latency_threshold=%s\n", o.WALFailover.FailoverOptions.HealthyProbeLatencyThreshold)
1437 2 : fmt.Fprintf(&buf, " healthy_interval=%s\n", o.WALFailover.FailoverOptions.HealthyInterval)
1438 2 : fmt.Fprintf(&buf, " unhealthy_sampling_interval=%s\n", o.WALFailover.FailoverOptions.UnhealthySamplingInterval)
1439 2 : fmt.Fprintf(&buf, " unhealthy_operation_latency_threshold=%s\n", unhealthyThreshold)
1440 2 : fmt.Fprintf(&buf, " elevated_write_stall_threshold_lag=%s\n", o.WALFailover.FailoverOptions.ElevatedWriteStallThresholdLag)
1441 2 : }
1442 :
1443 2 : for i := range o.Levels {
1444 2 : l := &o.Levels[i]
1445 2 : fmt.Fprintf(&buf, "\n")
1446 2 : fmt.Fprintf(&buf, "[Level \"%d\"]\n", i)
1447 2 : fmt.Fprintf(&buf, " block_restart_interval=%d\n", l.BlockRestartInterval)
1448 2 : fmt.Fprintf(&buf, " block_size=%d\n", l.BlockSize)
1449 2 : fmt.Fprintf(&buf, " block_size_threshold=%d\n", l.BlockSizeThreshold)
1450 2 : fmt.Fprintf(&buf, " compression=%s\n", resolveDefaultCompression(l.Compression()))
1451 2 : fmt.Fprintf(&buf, " filter_policy=%s\n", filterPolicyName(l.FilterPolicy))
1452 2 : fmt.Fprintf(&buf, " filter_type=%s\n", l.FilterType)
1453 2 : fmt.Fprintf(&buf, " index_block_size=%d\n", l.IndexBlockSize)
1454 2 : fmt.Fprintf(&buf, " target_file_size=%d\n", l.TargetFileSize)
1455 2 : }
1456 :
1457 2 : return buf.String()
1458 : }
1459 :
1460 : // parseOptions takes options serialized by Options.String() and parses them into
1461 : // keys and values, calling fn for each one.
1462 2 : func parseOptions(s string, fn func(section, key, value string) error) error {
1463 2 : var section string
1464 2 : for _, line := range strings.Split(s, "\n") {
1465 2 : line = strings.TrimSpace(line)
1466 2 : if len(line) == 0 {
1467 2 : // Skip blank lines.
1468 2 : continue
1469 : }
1470 2 : if line[0] == ';' || line[0] == '#' {
1471 0 : // Skip comments.
1472 0 : continue
1473 : }
1474 2 : n := len(line)
1475 2 : if line[0] == '[' && line[n-1] == ']' {
1476 2 : // Parse section.
1477 2 : section = line[1 : n-1]
1478 2 : continue
1479 : }
1480 :
1481 2 : pos := strings.Index(line, "=")
1482 2 : if pos < 0 {
1483 1 : const maxLen = 50
1484 1 : if len(line) > maxLen {
1485 0 : line = line[:maxLen-3] + "..."
1486 0 : }
1487 1 : return base.CorruptionErrorf("invalid key=value syntax: %q", errors.Safe(line))
1488 : }
1489 :
1490 2 : key := strings.TrimSpace(line[:pos])
1491 2 : value := strings.TrimSpace(line[pos+1:])
1492 2 :
1493 2 : // RocksDB uses a similar (INI-style) syntax for the OPTIONS file, but
1494 2 : // different section names and keys. The "CFOptions ..." paths are the
1495 2 : // RocksDB versions which we map to the Pebble paths.
1496 2 : mappedSection := section
1497 2 : if section == `CFOptions "default"` {
1498 1 : mappedSection = "Options"
1499 1 : switch key {
1500 1 : case "comparator":
1501 1 : key = "comparer"
1502 1 : case "merge_operator":
1503 1 : key = "merger"
1504 : }
1505 : }
1506 :
1507 2 : if err := fn(mappedSection, key, value); err != nil {
1508 1 : return err
1509 1 : }
1510 : }
1511 2 : return nil
1512 : }
1513 :
1514 : // ParseHooks contains callbacks to create options fields which can have
1515 : // user-defined implementations.
1516 : type ParseHooks struct {
1517 : NewCache func(size int64) *Cache
1518 : NewCleaner func(name string) (Cleaner, error)
1519 : NewComparer func(name string) (*Comparer, error)
1520 : NewFilterPolicy func(name string) (FilterPolicy, error)
1521 : NewMerger func(name string) (*Merger, error)
1522 : SkipUnknown func(name, value string) bool
1523 : }
1524 :
1525 : // Parse parses the options from the specified string. Note that certain
1526 : // options cannot be parsed into populated fields. For example, comparer and
1527 : // merger.
1528 2 : func (o *Options) Parse(s string, hooks *ParseHooks) error {
1529 2 : return parseOptions(s, func(section, key, value string) error {
1530 2 : // WARNING: DO NOT remove entries from the switches below because doing so
1531 2 : // causes a key previously written to the OPTIONS file to be considered unknown,
1532 2 : // a backwards incompatible change. Instead, leave in support for parsing the
1533 2 : // key but simply don't parse the value.
1534 2 :
1535 2 : switch {
1536 2 : case section == "Version":
1537 2 : switch key {
1538 2 : case "pebble_version":
1539 0 : default:
1540 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
1541 0 : return nil
1542 0 : }
1543 0 : return errors.Errorf("pebble: unknown option: %s.%s",
1544 0 : errors.Safe(section), errors.Safe(key))
1545 : }
1546 2 : return nil
1547 :
1548 2 : case section == "Options":
1549 2 : var err error
1550 2 : switch key {
1551 2 : case "bytes_per_sync":
1552 2 : o.BytesPerSync, err = strconv.Atoi(value)
1553 2 : case "cache_size":
1554 2 : var n int64
1555 2 : n, err = strconv.ParseInt(value, 10, 64)
1556 2 : if err == nil && hooks != nil && hooks.NewCache != nil {
1557 2 : if o.Cache != nil {
1558 0 : o.Cache.Unref()
1559 0 : }
1560 2 : o.Cache = hooks.NewCache(n)
1561 : }
1562 : // We avoid calling cache.New in parsing because it makes it
1563 : // too easy to leak a cache.
1564 2 : case "cleaner":
1565 2 : switch value {
1566 2 : case "archive":
1567 2 : o.Cleaner = ArchiveCleaner{}
1568 1 : case "delete":
1569 1 : o.Cleaner = DeleteCleaner{}
1570 0 : default:
1571 0 : if hooks != nil && hooks.NewCleaner != nil {
1572 0 : o.Cleaner, err = hooks.NewCleaner(value)
1573 0 : }
1574 : }
1575 2 : case "comparer":
1576 2 : switch value {
1577 1 : case "leveldb.BytewiseComparator":
1578 1 : o.Comparer = DefaultComparer
1579 2 : default:
1580 2 : if hooks != nil && hooks.NewComparer != nil {
1581 1 : o.Comparer, err = hooks.NewComparer(value)
1582 1 : }
1583 : }
1584 2 : case "compaction_debt_concurrency":
1585 2 : o.Experimental.CompactionDebtConcurrency, err = strconv.ParseUint(value, 10, 64)
1586 0 : case "delete_range_flush_delay":
1587 0 : // NB: This is a deprecated serialization of the
1588 0 : // `flush_delay_delete_range`.
1589 0 : o.FlushDelayDeleteRange, err = time.ParseDuration(value)
1590 2 : case "disable_delete_only_compactions":
1591 2 : o.private.disableDeleteOnlyCompactions, err = strconv.ParseBool(value)
1592 2 : case "disable_elision_only_compactions":
1593 2 : o.private.disableElisionOnlyCompactions, err = strconv.ParseBool(value)
1594 2 : case "disable_ingest_as_flushable":
1595 2 : var v bool
1596 2 : v, err = strconv.ParseBool(value)
1597 2 : if err == nil {
1598 2 : o.Experimental.DisableIngestAsFlushable = func() bool { return v }
1599 : }
1600 2 : case "disable_lazy_combined_iteration":
1601 2 : o.private.disableLazyCombinedIteration, err = strconv.ParseBool(value)
1602 2 : case "disable_wal":
1603 2 : o.DisableWAL, err = strconv.ParseBool(value)
1604 2 : case "flush_delay_delete_range":
1605 2 : o.FlushDelayDeleteRange, err = time.ParseDuration(value)
1606 2 : case "flush_delay_range_key":
1607 2 : o.FlushDelayRangeKey, err = time.ParseDuration(value)
1608 2 : case "flush_split_bytes":
1609 2 : o.FlushSplitBytes, err = strconv.ParseInt(value, 10, 64)
1610 2 : case "format_major_version":
1611 2 : // NB: The version written here may be stale. Open does
1612 2 : // not use the format major version encoded in the
1613 2 : // OPTIONS file other than to validate that the encoded
1614 2 : // version is valid right here.
1615 2 : var v uint64
1616 2 : v, err = strconv.ParseUint(value, 10, 64)
1617 2 : if vers := FormatMajorVersion(v); vers > internalFormatNewest || vers == FormatDefault {
1618 0 : err = errors.Newf("unsupported format major version %d", o.FormatMajorVersion)
1619 0 : }
1620 2 : if err == nil {
1621 2 : o.FormatMajorVersion = FormatMajorVersion(v)
1622 2 : }
1623 2 : case "l0_compaction_concurrency":
1624 2 : o.Experimental.L0CompactionConcurrency, err = strconv.Atoi(value)
1625 2 : case "l0_compaction_file_threshold":
1626 2 : o.L0CompactionFileThreshold, err = strconv.Atoi(value)
1627 2 : case "l0_compaction_threshold":
1628 2 : o.L0CompactionThreshold, err = strconv.Atoi(value)
1629 2 : case "l0_stop_writes_threshold":
1630 2 : o.L0StopWritesThreshold, err = strconv.Atoi(value)
1631 0 : case "l0_sublevel_compactions":
1632 : // Do nothing; option existed in older versions of pebble.
1633 2 : case "lbase_max_bytes":
1634 2 : o.LBaseMaxBytes, err = strconv.ParseInt(value, 10, 64)
1635 2 : case "level_multiplier":
1636 2 : o.Experimental.LevelMultiplier, err = strconv.Atoi(value)
1637 2 : case "max_concurrent_compactions":
1638 2 : var concurrentCompactions int
1639 2 : concurrentCompactions, err = strconv.Atoi(value)
1640 2 : if concurrentCompactions <= 0 {
1641 0 : err = errors.New("max_concurrent_compactions cannot be <= 0")
1642 2 : } else {
1643 2 : o.MaxConcurrentCompactions = func() int { return concurrentCompactions }
1644 : }
1645 2 : case "max_concurrent_downloads":
1646 2 : var concurrentDownloads int
1647 2 : concurrentDownloads, err = strconv.Atoi(value)
1648 2 : if concurrentDownloads <= 0 {
1649 0 : err = errors.New("max_concurrent_compactions cannot be <= 0")
1650 2 : } else {
1651 2 : o.MaxConcurrentDownloads = func() int { return concurrentDownloads }
1652 : }
1653 2 : case "max_manifest_file_size":
1654 2 : o.MaxManifestFileSize, err = strconv.ParseInt(value, 10, 64)
1655 2 : case "max_open_files":
1656 2 : o.MaxOpenFiles, err = strconv.Atoi(value)
1657 2 : case "mem_table_size":
1658 2 : o.MemTableSize, err = strconv.ParseUint(value, 10, 64)
1659 2 : case "mem_table_stop_writes_threshold":
1660 2 : o.MemTableStopWritesThreshold, err = strconv.Atoi(value)
1661 0 : case "min_compaction_rate":
1662 : // Do nothing; option existed in older versions of pebble, and
1663 : // may be meaningful again eventually.
1664 2 : case "min_deletion_rate":
1665 2 : o.TargetByteDeletionRate, err = strconv.Atoi(value)
1666 0 : case "min_flush_rate":
1667 : // Do nothing; option existed in older versions of pebble, and
1668 : // may be meaningful again eventually.
1669 2 : case "multilevel_compaction_heuristic":
1670 2 : switch {
1671 2 : case value == "none":
1672 2 : o.Experimental.MultiLevelCompactionHeuristic = NoMultiLevel{}
1673 2 : case strings.HasPrefix(value, "wamp"):
1674 2 : fields := strings.FieldsFunc(strings.TrimPrefix(value, "wamp"), func(r rune) bool {
1675 2 : return unicode.IsSpace(r) || r == ',' || r == '(' || r == ')'
1676 2 : })
1677 2 : if len(fields) != 2 {
1678 0 : err = errors.Newf("require 2 arguments")
1679 0 : }
1680 2 : var h WriteAmpHeuristic
1681 2 : if err == nil {
1682 2 : h.AddPropensity, err = strconv.ParseFloat(fields[0], 64)
1683 2 : }
1684 2 : if err == nil {
1685 2 : h.AllowL0, err = strconv.ParseBool(fields[1])
1686 2 : }
1687 2 : if err == nil {
1688 2 : o.Experimental.MultiLevelCompactionHeuristic = h
1689 2 : } else {
1690 0 : err = errors.Wrapf(err, "unexpected wamp heuristic arguments: %s", value)
1691 0 : }
1692 0 : default:
1693 0 : err = errors.Newf("unrecognized multilevel compaction heuristic: %s", value)
1694 : }
1695 0 : case "point_tombstone_weight":
1696 : // Do nothing; deprecated.
1697 2 : case "strict_wal_tail":
1698 2 : var strictWALTail bool
1699 2 : strictWALTail, err = strconv.ParseBool(value)
1700 2 : if err == nil && !strictWALTail {
1701 0 : err = errors.Newf("reading from versions with strict_wal_tail=false no longer supported")
1702 0 : }
1703 2 : case "merger":
1704 2 : switch value {
1705 0 : case "nullptr":
1706 0 : o.Merger = nil
1707 2 : case "pebble.concatenate":
1708 2 : o.Merger = DefaultMerger
1709 1 : default:
1710 1 : if hooks != nil && hooks.NewMerger != nil {
1711 1 : o.Merger, err = hooks.NewMerger(value)
1712 1 : }
1713 : }
1714 2 : case "read_compaction_rate":
1715 2 : o.Experimental.ReadCompactionRate, err = strconv.ParseInt(value, 10, 64)
1716 2 : case "read_sampling_multiplier":
1717 2 : o.Experimental.ReadSamplingMultiplier, err = strconv.ParseInt(value, 10, 64)
1718 2 : case "table_cache_shards":
1719 2 : o.Experimental.TableCacheShards, err = strconv.Atoi(value)
1720 0 : case "table_format":
1721 0 : switch value {
1722 0 : case "leveldb":
1723 0 : case "rocksdbv2":
1724 0 : default:
1725 0 : return errors.Errorf("pebble: unknown table format: %q", errors.Safe(value))
1726 : }
1727 1 : case "table_property_collectors":
1728 : // No longer implemented; ignore.
1729 2 : case "validate_on_ingest":
1730 2 : o.Experimental.ValidateOnIngest, err = strconv.ParseBool(value)
1731 2 : case "wal_dir":
1732 2 : o.WALDir = value
1733 2 : case "wal_bytes_per_sync":
1734 2 : o.WALBytesPerSync, err = strconv.Atoi(value)
1735 2 : case "max_writer_concurrency":
1736 2 : o.Experimental.MaxWriterConcurrency, err = strconv.Atoi(value)
1737 2 : case "force_writer_parallelism":
1738 2 : o.Experimental.ForceWriterParallelism, err = strconv.ParseBool(value)
1739 2 : case "secondary_cache_size_bytes":
1740 2 : o.Experimental.SecondaryCacheSizeBytes, err = strconv.ParseInt(value, 10, 64)
1741 2 : case "create_on_shared":
1742 2 : var createOnSharedInt int64
1743 2 : createOnSharedInt, err = strconv.ParseInt(value, 10, 64)
1744 2 : o.Experimental.CreateOnShared = remote.CreateOnSharedStrategy(createOnSharedInt)
1745 0 : default:
1746 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
1747 0 : return nil
1748 0 : }
1749 0 : return errors.Errorf("pebble: unknown option: %s.%s",
1750 0 : errors.Safe(section), errors.Safe(key))
1751 : }
1752 2 : return err
1753 :
1754 2 : case section == "WAL Failover":
1755 2 : if o.WALFailover == nil {
1756 2 : o.WALFailover = new(WALFailoverOptions)
1757 2 : }
1758 2 : var err error
1759 2 : switch key {
1760 2 : case "secondary_dir":
1761 2 : o.WALFailover.Secondary = wal.Dir{Dirname: value, FS: vfs.Default}
1762 2 : case "primary_dir_probe_interval":
1763 2 : o.WALFailover.PrimaryDirProbeInterval, err = time.ParseDuration(value)
1764 2 : case "healthy_probe_latency_threshold":
1765 2 : o.WALFailover.HealthyProbeLatencyThreshold, err = time.ParseDuration(value)
1766 2 : case "healthy_interval":
1767 2 : o.WALFailover.HealthyInterval, err = time.ParseDuration(value)
1768 2 : case "unhealthy_sampling_interval":
1769 2 : o.WALFailover.UnhealthySamplingInterval, err = time.ParseDuration(value)
1770 2 : case "unhealthy_operation_latency_threshold":
1771 2 : var threshold time.Duration
1772 2 : threshold, err = time.ParseDuration(value)
1773 2 : o.WALFailover.UnhealthyOperationLatencyThreshold = func() (time.Duration, bool) {
1774 2 : return threshold, true
1775 2 : }
1776 2 : case "elevated_write_stall_threshold_lag":
1777 2 : o.WALFailover.ElevatedWriteStallThresholdLag, err = time.ParseDuration(value)
1778 0 : default:
1779 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
1780 0 : return nil
1781 0 : }
1782 0 : return errors.Errorf("pebble: unknown option: %s.%s",
1783 0 : errors.Safe(section), errors.Safe(key))
1784 : }
1785 2 : return err
1786 :
1787 2 : case strings.HasPrefix(section, "Level "):
1788 2 : var index int
1789 2 : if n, err := fmt.Sscanf(section, `Level "%d"`, &index); err != nil {
1790 0 : return err
1791 2 : } else if n != 1 {
1792 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section, value) {
1793 0 : return nil
1794 0 : }
1795 0 : return errors.Errorf("pebble: unknown section: %q", errors.Safe(section))
1796 : }
1797 :
1798 2 : if len(o.Levels) <= index {
1799 1 : newLevels := make([]LevelOptions, index+1)
1800 1 : copy(newLevels, o.Levels)
1801 1 : o.Levels = newLevels
1802 1 : }
1803 2 : l := &o.Levels[index]
1804 2 :
1805 2 : var err error
1806 2 : switch key {
1807 2 : case "block_restart_interval":
1808 2 : l.BlockRestartInterval, err = strconv.Atoi(value)
1809 2 : case "block_size":
1810 2 : l.BlockSize, err = strconv.Atoi(value)
1811 2 : case "block_size_threshold":
1812 2 : l.BlockSizeThreshold, err = strconv.Atoi(value)
1813 2 : case "compression":
1814 2 : switch value {
1815 0 : case "Default":
1816 0 : l.Compression = func() sstable.Compression { return DefaultCompression }
1817 2 : case "NoCompression":
1818 2 : l.Compression = func() sstable.Compression { return NoCompression }
1819 2 : case "Snappy":
1820 2 : l.Compression = func() sstable.Compression { return SnappyCompression }
1821 2 : case "ZSTD":
1822 2 : l.Compression = func() sstable.Compression { return ZstdCompression }
1823 0 : default:
1824 0 : return errors.Errorf("pebble: unknown compression: %q", errors.Safe(value))
1825 : }
1826 2 : case "filter_policy":
1827 2 : if hooks != nil && hooks.NewFilterPolicy != nil {
1828 2 : l.FilterPolicy, err = hooks.NewFilterPolicy(value)
1829 2 : }
1830 2 : case "filter_type":
1831 2 : switch value {
1832 2 : case "table":
1833 2 : l.FilterType = TableFilter
1834 0 : default:
1835 0 : return errors.Errorf("pebble: unknown filter type: %q", errors.Safe(value))
1836 : }
1837 2 : case "index_block_size":
1838 2 : l.IndexBlockSize, err = strconv.Atoi(value)
1839 2 : case "target_file_size":
1840 2 : l.TargetFileSize, err = strconv.ParseInt(value, 10, 64)
1841 0 : default:
1842 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
1843 0 : return nil
1844 0 : }
1845 0 : return errors.Errorf("pebble: unknown option: %s.%s", errors.Safe(section), errors.Safe(key))
1846 : }
1847 2 : return err
1848 : }
1849 2 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
1850 2 : return nil
1851 2 : }
1852 0 : return errors.Errorf("pebble: unknown section: %q", errors.Safe(section))
1853 : })
1854 : }
1855 :
1856 : // ErrMissingWALRecoveryDir is an error returned when a database is attempted to be
1857 : // opened without supplying a Options.WALRecoveryDir entry for a directory that
1858 : // may contain WALs required to recover a consistent database state.
1859 : type ErrMissingWALRecoveryDir struct {
1860 : Dir string
1861 : }
1862 :
1863 : // Error implements error.
1864 1 : func (e ErrMissingWALRecoveryDir) Error() string {
1865 1 : return fmt.Sprintf("directory %q may contain relevant WALs", e.Dir)
1866 1 : }
1867 :
1868 : // CheckCompatibility verifies the options are compatible with the previous options
1869 : // serialized by Options.String(). For example, the Comparer and Merger must be
1870 : // the same, or data will not be able to be properly read from the DB.
1871 : //
1872 : // This function only looks at specific keys and does not error out if the
1873 : // options are newer and contain unknown keys.
1874 2 : func (o *Options) CheckCompatibility(previousOptions string) error {
1875 2 : return parseOptions(previousOptions, func(section, key, value string) error {
1876 2 : switch section + "." + key {
1877 2 : case "Options.comparer":
1878 2 : if value != o.Comparer.Name {
1879 1 : return errors.Errorf("pebble: comparer name from file %q != comparer name from options %q",
1880 1 : errors.Safe(value), errors.Safe(o.Comparer.Name))
1881 1 : }
1882 2 : case "Options.merger":
1883 2 : // RocksDB allows the merge operator to be unspecified, in which case it
1884 2 : // shows up as "nullptr".
1885 2 : if value != "nullptr" && value != o.Merger.Name {
1886 1 : return errors.Errorf("pebble: merger name from file %q != merger name from options %q",
1887 1 : errors.Safe(value), errors.Safe(o.Merger.Name))
1888 1 : }
1889 2 : case "Options.wal_dir", "WAL Failover.secondary_dir":
1890 2 : switch {
1891 2 : case o.WALDir == value:
1892 2 : return nil
1893 2 : case o.WALFailover != nil && o.WALFailover.Secondary.Dirname == value:
1894 2 : return nil
1895 1 : default:
1896 1 : for _, d := range o.WALRecoveryDirs {
1897 1 : if d.Dirname == value {
1898 1 : return nil
1899 1 : }
1900 : }
1901 1 : return ErrMissingWALRecoveryDir{Dir: value}
1902 : }
1903 : }
1904 2 : return nil
1905 : })
1906 : }
1907 :
1908 : // Validate verifies that the options are mutually consistent. For example,
1909 : // L0StopWritesThreshold must be >= L0CompactionThreshold, otherwise a write
1910 : // stall would persist indefinitely.
1911 2 : func (o *Options) Validate() error {
1912 2 : // Note that we can presume Options.EnsureDefaults has been called, so there
1913 2 : // is no need to check for zero values.
1914 2 :
1915 2 : var buf strings.Builder
1916 2 : if o.Experimental.L0CompactionConcurrency < 1 {
1917 1 : fmt.Fprintf(&buf, "L0CompactionConcurrency (%d) must be >= 1\n",
1918 1 : o.Experimental.L0CompactionConcurrency)
1919 1 : }
1920 2 : if o.L0StopWritesThreshold < o.L0CompactionThreshold {
1921 1 : fmt.Fprintf(&buf, "L0StopWritesThreshold (%d) must be >= L0CompactionThreshold (%d)\n",
1922 1 : o.L0StopWritesThreshold, o.L0CompactionThreshold)
1923 1 : }
1924 2 : if uint64(o.MemTableSize) >= maxMemTableSize {
1925 1 : fmt.Fprintf(&buf, "MemTableSize (%s) must be < %s\n",
1926 1 : humanize.Bytes.Uint64(uint64(o.MemTableSize)), humanize.Bytes.Uint64(maxMemTableSize))
1927 1 : }
1928 2 : if o.MemTableStopWritesThreshold < 2 {
1929 1 : fmt.Fprintf(&buf, "MemTableStopWritesThreshold (%d) must be >= 2\n",
1930 1 : o.MemTableStopWritesThreshold)
1931 1 : }
1932 2 : if o.FormatMajorVersion < FormatMinSupported || o.FormatMajorVersion > internalFormatNewest {
1933 0 : fmt.Fprintf(&buf, "FormatMajorVersion (%d) must be between %d and %d\n",
1934 0 : o.FormatMajorVersion, FormatMinSupported, internalFormatNewest)
1935 0 : }
1936 2 : if o.Experimental.CreateOnShared != remote.CreateOnSharedNone && o.FormatMajorVersion < FormatMinForSharedObjects {
1937 0 : fmt.Fprintf(&buf, "FormatMajorVersion (%d) when CreateOnShared is set must be at least %d\n",
1938 0 : o.FormatMajorVersion, FormatMinForSharedObjects)
1939 0 :
1940 0 : }
1941 2 : if o.TableCache != nil && o.Cache != o.TableCache.cache {
1942 1 : fmt.Fprintf(&buf, "underlying cache in the TableCache and the Cache dont match\n")
1943 1 : }
1944 2 : if buf.Len() == 0 {
1945 2 : return nil
1946 2 : }
1947 1 : return errors.New(buf.String())
1948 : }
1949 :
1950 : // MakeReaderOptions constructs sstable.ReaderOptions from the corresponding
1951 : // options in the receiver.
1952 2 : func (o *Options) MakeReaderOptions() sstable.ReaderOptions {
1953 2 : var readerOpts sstable.ReaderOptions
1954 2 : if o != nil {
1955 2 : readerOpts.Cache = o.Cache
1956 2 : readerOpts.LoadBlockSema = o.LoadBlockSema
1957 2 : readerOpts.Comparer = o.Comparer
1958 2 : readerOpts.Filters = o.Filters
1959 2 : if o.Merger != nil {
1960 2 : readerOpts.Merge = o.Merger.Merge
1961 2 : readerOpts.MergerName = o.Merger.Name
1962 2 : }
1963 2 : readerOpts.LoggerAndTracer = o.LoggerAndTracer
1964 : }
1965 2 : return readerOpts
1966 : }
1967 :
1968 : // MakeWriterOptions constructs sstable.WriterOptions for the specified level
1969 : // from the corresponding options in the receiver.
1970 2 : func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstable.WriterOptions {
1971 2 : var writerOpts sstable.WriterOptions
1972 2 : writerOpts.TableFormat = format
1973 2 : if o != nil {
1974 2 : writerOpts.Cache = o.Cache
1975 2 : writerOpts.Comparer = o.Comparer
1976 2 : if o.Merger != nil {
1977 2 : writerOpts.MergerName = o.Merger.Name
1978 2 : }
1979 2 : writerOpts.BlockPropertyCollectors = o.BlockPropertyCollectors
1980 : }
1981 2 : if format >= sstable.TableFormatPebblev3 {
1982 2 : writerOpts.ShortAttributeExtractor = o.Experimental.ShortAttributeExtractor
1983 2 : writerOpts.RequiredInPlaceValueBound = o.Experimental.RequiredInPlaceValueBound
1984 2 : if format >= sstable.TableFormatPebblev4 && level == numLevels-1 {
1985 2 : writerOpts.WritingToLowestLevel = true
1986 2 : }
1987 : }
1988 2 : levelOpts := o.Level(level)
1989 2 : writerOpts.BlockRestartInterval = levelOpts.BlockRestartInterval
1990 2 : writerOpts.BlockSize = levelOpts.BlockSize
1991 2 : writerOpts.BlockSizeThreshold = levelOpts.BlockSizeThreshold
1992 2 : writerOpts.Compression = resolveDefaultCompression(levelOpts.Compression())
1993 2 : writerOpts.FilterPolicy = levelOpts.FilterPolicy
1994 2 : writerOpts.FilterType = levelOpts.FilterType
1995 2 : writerOpts.IndexBlockSize = levelOpts.IndexBlockSize
1996 2 : writerOpts.AllocatorSizeClasses = o.AllocatorSizeClasses
1997 2 : return writerOpts
1998 : }
1999 :
2000 2 : func resolveDefaultCompression(c Compression) Compression {
2001 2 : if c <= DefaultCompression || c >= sstable.NCompression {
2002 1 : c = SnappyCompression
2003 1 : }
2004 2 : return c
2005 : }
|