Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "io"
11 : "regexp"
12 : "runtime"
13 : "slices"
14 : "sort"
15 : "strconv"
16 : "strings"
17 : "time"
18 : "unicode"
19 :
20 : "github.com/cockroachdb/crlib/fifo"
21 : "github.com/cockroachdb/errors"
22 : "github.com/cockroachdb/pebble/internal/base"
23 : "github.com/cockroachdb/pebble/internal/cache"
24 : "github.com/cockroachdb/pebble/internal/deletepacer"
25 : "github.com/cockroachdb/pebble/internal/humanize"
26 : "github.com/cockroachdb/pebble/internal/keyspan"
27 : "github.com/cockroachdb/pebble/internal/manifest"
28 : "github.com/cockroachdb/pebble/internal/testkeys"
29 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
30 : "github.com/cockroachdb/pebble/objstorage/remote"
31 : "github.com/cockroachdb/pebble/rangekey"
32 : "github.com/cockroachdb/pebble/sstable"
33 : "github.com/cockroachdb/pebble/sstable/blob"
34 : "github.com/cockroachdb/pebble/sstable/block"
35 : "github.com/cockroachdb/pebble/sstable/colblk"
36 : "github.com/cockroachdb/pebble/vfs"
37 : "github.com/cockroachdb/pebble/wal"
38 : "github.com/cockroachdb/redact"
39 : )
40 :
41 : const (
42 : cacheDefaultSize = 8 << 20 // 8 MB
43 : defaultLevelMultiplier = 10
44 : defaultVirtualTableUnreferencedFraction = 0.3
45 : )
46 :
47 : // FilterType exports the base.FilterType type.
48 : type FilterType = base.FilterType
49 :
50 : // Exported TableFilter constants.
51 : const (
52 : TableFilter = base.TableFilter
53 : )
54 :
55 : // FilterWriter exports the base.FilterWriter type.
56 : type FilterWriter = base.FilterWriter
57 :
58 : // FilterPolicy exports the base.FilterPolicy type.
59 : type FilterPolicy = base.FilterPolicy
60 :
61 : var NoFilterPolicy = base.NoFilterPolicy
62 :
63 : // KeySchema exports the colblk.KeySchema type.
64 : type KeySchema = colblk.KeySchema
65 :
66 : // BlockPropertyCollector exports the sstable.BlockPropertyCollector type.
67 : type BlockPropertyCollector = sstable.BlockPropertyCollector
68 :
69 : // BlockPropertyFilter exports the sstable.BlockPropertyFilter type.
70 : type BlockPropertyFilter = base.BlockPropertyFilter
71 :
72 : // MaximumSuffixProperty exports the sstable.MaximumSuffixProperty type.
73 : type MaximumSuffixProperty = sstable.MaximumSuffixProperty
74 :
75 : // ShortAttributeExtractor exports the base.ShortAttributeExtractor type.
76 : type ShortAttributeExtractor = base.ShortAttributeExtractor
77 :
78 : // UserKeyPrefixBound exports the sstable.UserKeyPrefixBound type.
79 : type UserKeyPrefixBound = sstable.UserKeyPrefixBound
80 :
81 : // IterKeyType configures which types of keys an iterator should surface.
82 : type IterKeyType int8
83 :
84 : // DirLock represents a file lock on a directory. It may be passed to Open through
85 : // Options.Lock to elide lock aquisition during Open.
86 : type DirLock = base.DirLock
87 :
88 : // LockDirectory acquires the directory lock in the named directory, preventing
89 : // another process from opening the database. LockDirectory returns a
90 : // handle to the held lock that may be passed to Open, skipping lock acquisition
91 : // during Open.
92 : //
93 : // LockDirectory may be used to expand the critical section protected by the
94 : // database lock to include setup before the call to Open.
95 0 : func LockDirectory(dirname string, fs vfs.FS) (*DirLock, error) {
96 0 : return base.LockDirectory(dirname, fs)
97 0 : }
98 :
99 : const (
100 : // IterKeyTypePointsOnly configures an iterator to iterate over point keys
101 : // only.
102 : IterKeyTypePointsOnly IterKeyType = iota
103 : // IterKeyTypeRangesOnly configures an iterator to iterate over range keys
104 : // only.
105 : IterKeyTypeRangesOnly
106 : // IterKeyTypePointsAndRanges configures an iterator iterate over both point
107 : // keys and range keys simultaneously.
108 : IterKeyTypePointsAndRanges
109 : )
110 :
111 : // String implements fmt.Stringer.
112 0 : func (t IterKeyType) String() string {
113 0 : switch t {
114 0 : case IterKeyTypePointsOnly:
115 0 : return "points-only"
116 0 : case IterKeyTypeRangesOnly:
117 0 : return "ranges-only"
118 0 : case IterKeyTypePointsAndRanges:
119 0 : return "points-and-ranges"
120 0 : default:
121 0 : panic(fmt.Sprintf("unknown key type %d", t))
122 : }
123 : }
124 :
125 : // IterOptions hold the optional per-query parameters for NewIter.
126 : //
127 : // Like Options, a nil *IterOptions is valid and means to use the default
128 : // values.
129 : type IterOptions struct {
130 : // LowerBound specifies the smallest key (inclusive) that the iterator will
131 : // return during iteration. If the iterator is seeked or iterated past this
132 : // boundary the iterator will return Valid()==false. Setting LowerBound
133 : // effectively truncates the key space visible to the iterator.
134 : LowerBound []byte
135 : // UpperBound specifies the largest key (exclusive) that the iterator will
136 : // return during iteration. If the iterator is seeked or iterated past this
137 : // boundary the iterator will return Valid()==false. Setting UpperBound
138 : // effectively truncates the key space visible to the iterator.
139 : UpperBound []byte
140 : // SkipPoint may be used to skip over point keys that don't match an
141 : // arbitrary predicate during iteration. If set, the Iterator invokes
142 : // SkipPoint for keys encountered. If SkipPoint returns true, the iterator
143 : // will skip the key without yielding it to the iterator operation in
144 : // progress.
145 : //
146 : // SkipPoint must be a pure function and always return the same result when
147 : // provided the same arguments. The iterator may call SkipPoint multiple
148 : // times for the same user key.
149 : SkipPoint func(userKey []byte) bool
150 : // PointKeyFilters can be used to avoid scanning tables and blocks in tables
151 : // when iterating over point keys. This slice represents an intersection
152 : // across all filters, i.e., all filters must indicate that the block is
153 : // relevant.
154 : //
155 : // Performance note: When len(PointKeyFilters) > 0, the caller should ensure
156 : // that cap(PointKeyFilters) is at least len(PointKeyFilters)+1. This helps
157 : // avoid allocations in Pebble internal code that mutates the slice.
158 : PointKeyFilters []BlockPropertyFilter
159 : // RangeKeyFilters can be usefd to avoid scanning tables and blocks in tables
160 : // when iterating over range keys. The same requirements that apply to
161 : // PointKeyFilters apply here too.
162 : RangeKeyFilters []BlockPropertyFilter
163 : // KeyTypes configures which types of keys to iterate over: point keys,
164 : // range keys, or both.
165 : KeyTypes IterKeyType
166 : // RangeKeyMasking can be used to enable automatic masking of point keys by
167 : // range keys. Range key masking is only supported during combined range key
168 : // and point key iteration mode (IterKeyTypePointsAndRanges).
169 : RangeKeyMasking RangeKeyMasking
170 :
171 : // OnlyReadGuaranteedDurable is an advanced option that is only supported by
172 : // the Reader implemented by DB. When set to true, only the guaranteed to be
173 : // durable state is visible in the iterator.
174 : // - This definition is made under the assumption that the FS implementation
175 : // is providing a durability guarantee when data is synced.
176 : // - The visible state represents a consistent point in the history of the
177 : // DB.
178 : // - The implementation is free to choose a conservative definition of what
179 : // is guaranteed durable. For simplicity, the current implementation
180 : // ignores memtables. A more sophisticated implementation could track the
181 : // highest seqnum that is synced to the WAL and published and use that as
182 : // the visible seqnum for an iterator. Note that the latter approach is
183 : // not strictly better than the former since we can have DBs that are (a)
184 : // synced more rarely than memtable flushes, (b) have no WAL. (a) is
185 : // likely to be true in a future CockroachDB context where the DB
186 : // containing the state machine may be rarely synced.
187 : // NB: this current implementation relies on the fact that memtables are
188 : // flushed in seqnum order, and any ingested sstables that happen to have a
189 : // lower seqnum than a non-flushed memtable don't have any overlapping keys.
190 : // This is the fundamental level invariant used in other code too, like when
191 : // merging iterators.
192 : //
193 : // Semantically, using this option provides the caller a "snapshot" as of
194 : // the time the most recent memtable was flushed. An alternate interface
195 : // would be to add a NewSnapshot variant. Creating a snapshot is heavier
196 : // weight than creating an iterator, so we have opted to support this
197 : // iterator option.
198 : OnlyReadGuaranteedDurable bool
199 : // UseL6Filters allows the caller to opt into reading filter blocks for L6
200 : // sstables. Helpful if a lot of SeekPrefixGEs are expected in quick
201 : // succession, that are also likely to not yield a single key. Filter blocks in
202 : // L6 can be relatively large, often larger than data blocks, so the benefit of
203 : // loading them in the cache is minimized if the probability of the key
204 : // existing is not low or if we just expect a one-time Seek (where loading the
205 : // data block directly is better).
206 : UseL6Filters bool
207 : // Category is used for categorized iterator stats. This should not be
208 : // changed by calling SetOptions.
209 : Category block.Category
210 :
211 : // ExemptFromTracking indicates that we should not track the lifetime of the
212 : // iterator (used to log information about long-lived iterators). Useful for
213 : // hot paths where we know the iterator will be short-lived.
214 : ExemptFromTracking bool
215 :
216 : // DebugRangeKeyStack enables additional logging of the range key stack
217 : // iterator, via keyspan.InjectLogging. Only used for debugging.
218 : DebugRangeKeyStack bool
219 :
220 : // Internal options.
221 :
222 : logger Logger
223 : // Layer corresponding to this file. Only passed in if constructed by a
224 : // levelIter.
225 : layer manifest.Layer
226 : // disableLazyCombinedIteration is an internal testing option.
227 : disableLazyCombinedIteration bool
228 : // snapshotForHideObsoletePoints is specified for/by levelIter when opening
229 : // files and is used to decide whether to hide obsolete points. A value of 0
230 : // implies obsolete points should not be hidden.
231 : snapshotForHideObsoletePoints base.SeqNum
232 :
233 : // MaximumSuffixProperty is the maximum suffix property for the iterator.
234 : // This is used to perform the synthetic key optimization.
235 : MaximumSuffixProperty MaximumSuffixProperty
236 : // NB: If adding new Options, you must account for them in iterator
237 : // construction and Iterator.SetOptions.
238 : }
239 :
240 : // GetLowerBound returns the LowerBound or nil if the receiver is nil.
241 1 : func (o *IterOptions) GetLowerBound() []byte {
242 1 : if o == nil {
243 1 : return nil
244 1 : }
245 1 : return o.LowerBound
246 : }
247 :
248 : // GetUpperBound returns the UpperBound or nil if the receiver is nil.
249 1 : func (o *IterOptions) GetUpperBound() []byte {
250 1 : if o == nil {
251 1 : return nil
252 1 : }
253 1 : return o.UpperBound
254 : }
255 :
256 : // GetMaximumSuffixProperty returns the MaximumSuffixProperty.
257 1 : func (o *IterOptions) GetMaximumSuffixProperty() MaximumSuffixProperty {
258 1 : if o == nil {
259 0 : return nil
260 0 : }
261 1 : return o.MaximumSuffixProperty
262 : }
263 :
264 1 : func (o *IterOptions) pointKeys() bool {
265 1 : if o == nil {
266 0 : return true
267 0 : }
268 1 : return o.KeyTypes == IterKeyTypePointsOnly || o.KeyTypes == IterKeyTypePointsAndRanges
269 : }
270 :
271 1 : func (o *IterOptions) rangeKeys() bool {
272 1 : if o == nil {
273 0 : return false
274 0 : }
275 1 : return o.KeyTypes == IterKeyTypeRangesOnly || o.KeyTypes == IterKeyTypePointsAndRanges
276 : }
277 :
278 1 : func (o *IterOptions) getLogger() Logger {
279 1 : if o == nil || o.logger == nil {
280 1 : return DefaultLogger
281 1 : }
282 1 : return o.logger
283 : }
284 :
285 : // SpanIterOptions creates a SpanIterOptions from this IterOptions.
286 1 : func (o *IterOptions) SpanIterOptions() keyspan.SpanIterOptions {
287 1 : if o == nil {
288 1 : return keyspan.SpanIterOptions{}
289 1 : }
290 1 : return keyspan.SpanIterOptions{
291 1 : RangeKeyFilters: o.RangeKeyFilters,
292 1 : }
293 : }
294 :
295 : // ScanInternalOptions is similar to IterOptions, meant for use with
296 : // scanInternalIterator.
297 : type ScanInternalOptions struct {
298 : IterOptions
299 :
300 : Category block.Category
301 :
302 : VisitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error
303 : VisitRangeDel func(start, end []byte, seqNum SeqNum) error
304 : VisitRangeKey func(start, end []byte, keys []rangekey.Key) error
305 : VisitSharedFile func(sst *SharedSSTMeta) error
306 : VisitExternalFile func(sst *ExternalFile) error
307 :
308 : // IncludeObsoleteKeys specifies whether keys shadowed by newer internal keys
309 : // are exposed. If false, only one internal key per user key is exposed.
310 : IncludeObsoleteKeys bool
311 :
312 : // RateLimitFunc is used to limit the amount of bytes read per second.
313 : RateLimitFunc func(key *InternalKey, value LazyValue) error
314 : }
315 :
316 : // RangeKeyMasking configures automatic hiding of point keys by range keys. A
317 : // non-nil Suffix enables range-key masking. When enabled, range keys with
318 : // suffixes ≥ Suffix behave as masks. All point keys that are contained within a
319 : // masking range key's bounds and have suffixes greater than the range key's
320 : // suffix are automatically skipped.
321 : //
322 : // Specifically, when configured with a RangeKeyMasking.Suffix _s_, and there
323 : // exists a range key with suffix _r_ covering a point key with suffix _p_, and
324 : //
325 : // _s_ ≤ _r_ < _p_
326 : //
327 : // then the point key is elided.
328 : //
329 : // Range-key masking may only be used when iterating over both point keys and
330 : // range keys with IterKeyTypePointsAndRanges.
331 : type RangeKeyMasking struct {
332 : // Suffix configures which range keys may mask point keys. Only range keys
333 : // that are defined at suffixes greater than or equal to Suffix will mask
334 : // point keys.
335 : Suffix []byte
336 : // Filter is an optional field that may be used to improve performance of
337 : // range-key masking through a block-property filter defined over key
338 : // suffixes. If non-nil, Filter is called by Pebble to construct a
339 : // block-property filter mask at iterator creation. The filter is used to
340 : // skip whole point-key blocks containing point keys with suffixes greater
341 : // than a covering range-key's suffix.
342 : //
343 : // To use this functionality, the caller must create and configure (through
344 : // Options.BlockPropertyCollectors) a block-property collector that records
345 : // the maxmimum suffix contained within a block. The caller then must write
346 : // and provide a BlockPropertyFilterMask implementation on that same
347 : // property. See the BlockPropertyFilterMask type for more information.
348 : Filter func() BlockPropertyFilterMask
349 : }
350 :
351 : // BlockPropertyFilterMask extends the BlockPropertyFilter interface for use
352 : // with range-key masking. Unlike an ordinary block property filter, a
353 : // BlockPropertyFilterMask's filtering criteria is allowed to change when Pebble
354 : // invokes its SetSuffix method.
355 : //
356 : // When a Pebble iterator steps into a range key's bounds and the range key has
357 : // a suffix greater than or equal to RangeKeyMasking.Suffix, the range key acts
358 : // as a mask. The masking range key hides all point keys that fall within the
359 : // range key's bounds and have suffixes > the range key's suffix. Without a
360 : // filter mask configured, Pebble performs this hiding by stepping through point
361 : // keys and comparing suffixes. If large numbers of point keys are masked, this
362 : // requires Pebble to load, iterate through and discard a large number of
363 : // sstable blocks containing masked point keys.
364 : //
365 : // If a block-property collector and a filter mask are configured, Pebble may
366 : // skip loading some point-key blocks altogether. If a block's keys are known to
367 : // all fall within the bounds of the masking range key and the block was
368 : // annotated by a block-property collector with the maximal suffix, Pebble can
369 : // ask the filter mask to compare the property to the current masking range
370 : // key's suffix. If the mask reports no intersection, the block may be skipped.
371 : //
372 : // If unsuffixed and suffixed keys are written to the database, care must be
373 : // taken to avoid unintentionally masking un-suffixed keys located in the same
374 : // block as suffixed keys. One solution is to interpret unsuffixed keys as
375 : // containing the maximal suffix value, ensuring that blocks containing
376 : // unsuffixed keys are always loaded.
377 : type BlockPropertyFilterMask interface {
378 : BlockPropertyFilter
379 :
380 : // SetSuffix configures the mask with the suffix of a range key. The filter
381 : // should return false from Intersects whenever it's provided with a
382 : // property encoding a block's minimum suffix that's greater (according to
383 : // Compare) than the provided suffix.
384 : SetSuffix(suffix []byte) error
385 : }
386 :
387 : // WriteOptions hold the optional per-query parameters for Set and Delete
388 : // operations.
389 : //
390 : // Like Options, a nil *WriteOptions is valid and means to use the default
391 : // values.
392 : type WriteOptions struct {
393 : // Sync is whether to sync writes through the OS buffer cache and down onto
394 : // the actual disk, if applicable. Setting Sync is required for durability of
395 : // individual write operations but can result in slower writes.
396 : //
397 : // If false, and the process or machine crashes, then a recent write may be
398 : // lost. This is due to the recently written data being buffered inside the
399 : // process running Pebble. This differs from the semantics of a write system
400 : // call in which the data is buffered in the OS buffer cache and would thus
401 : // survive a process crash.
402 : //
403 : // The default value is true.
404 : Sync bool
405 : }
406 :
407 : // Sync specifies the default write options for writes which synchronize to
408 : // disk.
409 : var Sync = &WriteOptions{Sync: true}
410 :
411 : // NoSync specifies the default write options for writes which do not
412 : // synchronize to disk.
413 : var NoSync = &WriteOptions{Sync: false}
414 :
415 : // GetSync returns the Sync value or true if the receiver is nil.
416 1 : func (o *WriteOptions) GetSync() bool {
417 1 : return o == nil || o.Sync
418 1 : }
419 :
420 : // LevelOptions holds the optional per-level parameters.
421 : type LevelOptions struct {
422 : // BlockRestartInterval is the number of keys between restart points
423 : // for delta encoding of keys.
424 : //
425 : // The default value is 16 for L0, and the value from the previous level for
426 : // all other levels.
427 : BlockRestartInterval int
428 :
429 : // BlockSize is the target uncompressed size in bytes of each table block.
430 : //
431 : // The default value is 4096 for L0, and the value from the previous level for
432 : // all other levels.
433 : BlockSize int
434 :
435 : // BlockSizeThreshold finishes a block if the block size is larger than the
436 : // specified percentage of the target block size and adding the next entry
437 : // would cause the block to be larger than the target block size.
438 : //
439 : // The default value is 90 for L0, and the value from the previous level for
440 : // all other levels.
441 : BlockSizeThreshold int
442 :
443 : // Compression defines the per-block compression to use.
444 : //
445 : // The default value is Snappy for L0, or the function from the previous level
446 : // for all other levels.
447 : //
448 : // ApplyCompressionSettings can be used to initialize this field for all levels.
449 : Compression func() *sstable.CompressionProfile
450 :
451 : // FilterPolicy defines a filter algorithm (such as a Bloom filter) that can
452 : // reduce disk reads for Get calls.
453 : //
454 : // One such implementation is bloom.FilterPolicy(10) from the pebble/bloom
455 : // package.
456 : //
457 : // The default value for L0 is NoFilterPolicy (no filter), and the value from
458 : // the previous level for all other levels.
459 : FilterPolicy FilterPolicy
460 :
461 : // FilterType is a legacy field. The default and only possible value is
462 : // TableFilter.
463 : FilterType FilterType
464 :
465 : // IndexBlockSize is the target uncompressed size in bytes of each index
466 : // block. When the index block size is larger than this target, two-level
467 : // indexes are automatically enabled. Setting this option to a large value
468 : // (such as math.MaxInt32) disables the automatic creation of two-level
469 : // indexes.
470 : //
471 : // The default value is the value of BlockSize for L0, or the value from the
472 : // previous level for all other levels.
473 : IndexBlockSize int
474 : }
475 :
476 : // EnsureL0Defaults ensures that the L0 default values for the options have been
477 : // initialized.
478 1 : func (o *LevelOptions) EnsureL0Defaults() {
479 1 : if o.BlockRestartInterval <= 0 {
480 0 : o.BlockRestartInterval = base.DefaultBlockRestartInterval
481 0 : }
482 1 : if o.BlockSize <= 0 {
483 0 : o.BlockSize = base.DefaultBlockSize
484 1 : } else if o.BlockSize > sstable.MaximumRestartOffset {
485 0 : panic(errors.Errorf("BlockSize %d exceeds MaximumRestartOffset", o.BlockSize))
486 : }
487 1 : if o.BlockSizeThreshold <= 0 {
488 0 : o.BlockSizeThreshold = base.DefaultBlockSizeThreshold
489 0 : }
490 1 : if o.Compression == nil {
491 0 : o.Compression = func() *sstable.CompressionProfile { return sstable.SnappyCompression }
492 : }
493 1 : if o.FilterPolicy == nil {
494 0 : o.FilterPolicy = NoFilterPolicy
495 0 : }
496 1 : if o.IndexBlockSize <= 0 {
497 0 : o.IndexBlockSize = o.BlockSize
498 0 : }
499 : }
500 :
501 : // EnsureL1PlusDefaults ensures that the L1+ default values for the options have
502 : // been initialized. Requires the fully initialized options for the level above.
503 1 : func (o *LevelOptions) EnsureL1PlusDefaults(previousLevel *LevelOptions) {
504 1 : if o.BlockRestartInterval <= 0 {
505 0 : o.BlockRestartInterval = previousLevel.BlockRestartInterval
506 0 : }
507 1 : if o.BlockSize <= 0 {
508 0 : o.BlockSize = previousLevel.BlockSize
509 1 : } else if o.BlockSize > sstable.MaximumRestartOffset {
510 0 : panic(errors.Errorf("BlockSize %d exceeds MaximumRestartOffset", o.BlockSize))
511 : }
512 1 : if o.BlockSizeThreshold <= 0 {
513 0 : o.BlockSizeThreshold = previousLevel.BlockSizeThreshold
514 0 : }
515 1 : if o.Compression == nil {
516 0 : o.Compression = previousLevel.Compression
517 0 : }
518 1 : if o.FilterPolicy == nil {
519 0 : o.FilterPolicy = previousLevel.FilterPolicy
520 0 : }
521 1 : if o.IndexBlockSize <= 0 {
522 0 : o.IndexBlockSize = previousLevel.IndexBlockSize
523 0 : }
524 : }
525 :
526 : // Options holds the optional parameters for configuring pebble. These options
527 : // apply to the DB at large; per-query options are defined by the IterOptions
528 : // and WriteOptions types.
529 : type Options struct {
530 : // Sync sstables periodically in order to smooth out writes to disk. This
531 : // option does not provide any persistency guarantee, but is used to avoid
532 : // latency spikes if the OS automatically decides to write out a large chunk
533 : // of dirty filesystem buffers. This option only controls SSTable syncs; WAL
534 : // syncs are controlled by WALBytesPerSync.
535 : //
536 : // The default value is 512KB.
537 : BytesPerSync int
538 :
539 : // Cache is used to cache uncompressed blocks from sstables. If it is nil,
540 : // a block cache of CacheSize will be created for each DB.
541 : Cache *cache.Cache
542 : // CacheSize is used when Cache is not set. The default value is 8 MB.
543 : CacheSize int64
544 :
545 : // LoadBlockSema, if set, is used to limit the number of blocks that can be
546 : // loaded (i.e. read from the filesystem) in parallel. Each load acquires one
547 : // unit from the semaphore for the duration of the read.
548 : LoadBlockSema *fifo.Semaphore
549 :
550 : // Cleaner cleans obsolete files.
551 : //
552 : // The default cleaner uses the DeleteCleaner.
553 : Cleaner Cleaner
554 :
555 : // Local contains option that pertain to files stored on the local filesystem.
556 : Local struct {
557 : // ReadaheadConfig is used to retrieve the current readahead mode; it is
558 : // consulted whenever a read handle is initialized.
559 : ReadaheadConfig *ReadaheadConfig
560 :
561 : // TODO(radu): move BytesPerSync, LoadBlockSema, Cleaner here.
562 : }
563 :
564 : // Comparer defines a total ordering over the space of []byte keys: a 'less
565 : // than' relationship. The same comparison algorithm must be used for reads
566 : // and writes over the lifetime of the DB.
567 : //
568 : // The default value uses the same ordering as bytes.Compare.
569 : Comparer *Comparer
570 :
571 : // DebugCheck is invoked, if non-nil, whenever a new version is being
572 : // installed. Typically, this is set to pebble.DebugCheckLevels in tests
573 : // or tools only, to check invariants over all the data in the database.
574 : DebugCheck func(*DB) error
575 :
576 : // Disable the write-ahead log (WAL). Disabling the write-ahead log prohibits
577 : // crash recovery, but can improve performance if crash recovery is not
578 : // needed (e.g. when only temporary state is being stored in the database).
579 : //
580 : // TODO(peter): untested
581 : DisableWAL bool
582 :
583 : // ErrorIfExists causes an error on Open if the database already exists.
584 : // The error can be checked with errors.Is(err, ErrDBAlreadyExists).
585 : //
586 : // The default value is false.
587 : ErrorIfExists bool
588 :
589 : // ErrorIfNotExists causes an error on Open if the database does not already
590 : // exist. The error can be checked with errors.Is(err, ErrDBDoesNotExist).
591 : //
592 : // The default value is false which will cause a database to be created if it
593 : // does not already exist.
594 : ErrorIfNotExists bool
595 :
596 : // ErrorIfNotPristine causes an error on Open if the database already exists
597 : // and any operations have been performed on the database. The error can be
598 : // checked with errors.Is(err, ErrDBNotPristine).
599 : //
600 : // Note that a database that contained keys that were all subsequently deleted
601 : // may or may not trigger the error. Currently, we check if there are any live
602 : // SSTs or log records to replay.
603 : ErrorIfNotPristine bool
604 :
605 : // EventListener provides hooks to listening to significant DB events such as
606 : // flushes, compactions, and table deletion.
607 : EventListener *EventListener
608 :
609 : // Experimental contains experimental options which are off by default.
610 : // These options are temporary and will eventually either be deleted, moved
611 : // out of the experimental group, or made the non-adjustable default. These
612 : // options may change at any time, so do not rely on them.
613 : Experimental struct {
614 : // The threshold of L0 read-amplification at which compaction concurrency
615 : // is enabled (if CompactionDebtConcurrency was not already exceeded).
616 : // Every multiple of this value enables another concurrent
617 : // compaction up to CompactionConcurrencyRange.
618 : L0CompactionConcurrency int
619 :
620 : // CompactionDebtConcurrency controls the threshold of compaction debt
621 : // at which additional compaction concurrency slots are added. For every
622 : // multiple of this value in compaction debt bytes, an additional
623 : // concurrent compaction is added. This works "on top" of
624 : // L0CompactionConcurrency, so the higher of the count of compaction
625 : // concurrency slots as determined by the two options is chosen.
626 : CompactionDebtConcurrency uint64
627 :
628 : // CompactionGarbageFractionForMaxConcurrency is the fraction of garbage
629 : // due to DELs and RANGEDELs that causes MaxConcurrentCompactions to be
630 : // allowed. Concurrent compactions are allowed in a linear manner upto
631 : // this limit being reached. A value <= 0.0 disables adding concurrency
632 : // due to garbage.
633 : CompactionGarbageFractionForMaxConcurrency func() float64
634 :
635 : // UseDeprecatedCompensatedScore is a temporary option to revert the
636 : // compaction picker to the previous behavior of only considering
637 : // compaction out of a level if its compensated fill factor divided by
638 : // the next level's fill factor is >= 1.0. The details of this setting
639 : // are documented within its use within the compaction picker.
640 : //
641 : // The default value is false.
642 : UseDeprecatedCompensatedScore func() bool
643 :
644 : // IngestSplit, if it returns true, allows for ingest-time splitting of
645 : // existing sstables into two virtual sstables to allow ingestion sstables to
646 : // slot into a lower level than they otherwise would have.
647 : IngestSplit func() bool
648 :
649 : // ReadCompactionRate controls the frequency of read triggered
650 : // compactions by adjusting `AllowedSeeks` in manifest.TableMetadata:
651 : //
652 : // AllowedSeeks = FileSize / ReadCompactionRate
653 : //
654 : // From LevelDB:
655 : // ```
656 : // We arrange to automatically compact this file after
657 : // a certain number of seeks. Let's assume:
658 : // (1) One seek costs 10ms
659 : // (2) Writing or reading 1MB costs 10ms (100MB/s)
660 : // (3) A compaction of 1MB does 25MB of IO:
661 : // 1MB read from this level
662 : // 10-12MB read from next level (boundaries may be misaligned)
663 : // 10-12MB written to next level
664 : // This implies that 25 seeks cost the same as the compaction
665 : // of 1MB of data. I.e., one seek costs approximately the
666 : // same as the compaction of 40KB of data. We are a little
667 : // conservative and allow approximately one seek for every 16KB
668 : // of data before triggering a compaction.
669 : // ```
670 : ReadCompactionRate int64
671 :
672 : // ReadSamplingMultiplier is a multiplier for the readSamplingPeriod in
673 : // iterator.maybeSampleRead() to control the frequency of read sampling
674 : // to trigger a read triggered compaction. A value of -1 prevents sampling
675 : // and disables read triggered compactions. The default is 1 << 4. which
676 : // gets multiplied with a constant of 1 << 16 to yield 1 << 20 (1MB).
677 : ReadSamplingMultiplier int64
678 :
679 : // NumDeletionsThreshold defines the minimum number of point tombstones
680 : // that must be present in a single data block for that block to be
681 : // considered tombstone-dense for the purposes of triggering a
682 : // tombstone density compaction. Data blocks may also be considered
683 : // tombstone-dense if they meet the criteria defined by
684 : // DeletionSizeRatioThreshold below. Tombstone-dense blocks are identified
685 : // when sstables are written, and so this is effectively an option for
686 : // sstable writers. The default value is 100.
687 : NumDeletionsThreshold int
688 :
689 : // DeletionSizeRatioThreshold defines the minimum ratio of the size of
690 : // point tombstones to the size of a data block that must be reached
691 : // for that block to be considered tombstone-dense for the purposes of
692 : // triggering a tombstone density compaction. Data blocks may also be
693 : // considered tombstone-dense if they meet the criteria defined by
694 : // NumDeletionsThreshold above. Tombstone-dense blocks are identified
695 : // when sstables are written, and so this is effectively an option for
696 : // sstable writers. The default value is 0.5.
697 : DeletionSizeRatioThreshold float32
698 :
699 : // TombstoneDenseCompactionThreshold is a function that returns the minimum
700 : // percent of data blocks in a table that must be tombstone-dense for that
701 : // table to be eligible for a tombstone density compaction. The value should
702 : // be defined as a ratio out of 1. The default value is 0.10.
703 : //
704 : // If multiple tables are eligible for a tombstone density compaction, then
705 : // tables with a higher percent of tombstone-dense blocks are still
706 : // prioritized for compaction.
707 : //
708 : // Using a function allows for dynamic reconfiguration of the threshold based
709 : // on workload characteristics. A zero or negative value disables tombstone
710 : // density compactions.
711 : TombstoneDenseCompactionThreshold func() float64
712 :
713 : // FileCacheShards is the number of shards per file cache.
714 : // Reducing the value can reduce the number of idle goroutines per DB
715 : // instance which can be useful in scenarios with a lot of DB instances
716 : // and a large number of CPUs, but doing so can lead to higher contention
717 : // in the file cache and reduced performance.
718 : //
719 : // The default value is the number of logical CPUs, which can be
720 : // limited by runtime.GOMAXPROCS.
721 : FileCacheShards int
722 :
723 : // ValidateOnIngest schedules validation of sstables after they have
724 : // been ingested.
725 : //
726 : // By default, this value is false.
727 : ValidateOnIngest bool
728 :
729 : // LevelMultiplier configures the size multiplier used to determine the
730 : // desired size of each level of the LSM. Defaults to 10.
731 : LevelMultiplier int
732 :
733 : // MultiLevelCompactionHeuristic determines whether to add an additional
734 : // level to a conventional two level compaction.
735 : MultiLevelCompactionHeuristic func() MultiLevelHeuristic
736 :
737 : // EnableValueBlocks is used to decide whether to enable writing
738 : // TableFormatPebblev3 sstables. This setting is only respected by a
739 : // specific subset of format major versions: FormatSSTableValueBlocks,
740 : // FormatFlushableIngest and FormatPrePebblev1MarkedCompacted. In lower
741 : // format major versions, value blocks are never enabled. In higher
742 : // format major versions, value blocks are always enabled.
743 : EnableValueBlocks func() bool
744 :
745 : // ShortAttributeExtractor is used iff EnableValueBlocks() returns true
746 : // (else ignored). If non-nil, a ShortAttribute can be extracted from the
747 : // value and stored with the key, when the value is stored elsewhere.
748 : ShortAttributeExtractor ShortAttributeExtractor
749 :
750 : // DisableIngestAsFlushable disables lazy ingestion of sstables through
751 : // a WAL write and memtable rotation. Only effectual if the format
752 : // major version is at least `FormatFlushableIngest`.
753 : DisableIngestAsFlushable func() bool
754 :
755 : // RemoteStorage enables use of remote storage (e.g. S3) for storing
756 : // sstables. Setting this option enables use of CreateOnShared option and
757 : // allows ingestion of external files.
758 : RemoteStorage remote.StorageFactory
759 :
760 : // If CreateOnShared is non-zero, new sstables are created on remote storage
761 : // (using CreateOnSharedLocator and with the appropriate
762 : // CreateOnSharedStrategy). These sstables can be shared between different
763 : // Pebble instances; the lifecycle of such objects is managed by the
764 : // remote.Storage constructed by options.RemoteStorage.
765 : //
766 : // Can only be used when RemoteStorage is set (and recognizes
767 : // CreateOnSharedLocator).
768 : CreateOnShared remote.CreateOnSharedStrategy
769 : CreateOnSharedLocator remote.Locator
770 :
771 : // CacheSizeBytesBytes is the size of the on-disk block cache for objects
772 : // on shared storage in bytes. If it is 0, no cache is used.
773 : SecondaryCacheSizeBytes int64
774 :
775 : // EnableDeleteOnlyCompactionExcises enables delete-only compactions to also
776 : // apply delete-only compaction hints on sstables that partially overlap
777 : // with it. This application happens through an excise, similar to
778 : // the excise phase of IngestAndExcise.
779 : EnableDeleteOnlyCompactionExcises func() bool
780 :
781 : // CompactionScheduler, if set, is used to create a scheduler to limit
782 : // concurrent compactions as well as to pace compactions already chosen. If
783 : // nil, a default scheduler is created and used.
784 : CompactionScheduler func() CompactionScheduler
785 :
786 : UserKeyCategories UserKeyCategories
787 :
788 : // ValueSeparationPolicy controls the policy for separating values into
789 : // external blob files. If nil, value separation defaults to disabled.
790 : // The value separation policy is ignored if EnableColumnarBlocks() is
791 : // false.
792 : ValueSeparationPolicy func() ValueSeparationPolicy
793 :
794 : // SpanPolicyFunc is used to determine the SpanPolicy for a key region.
795 : SpanPolicyFunc SpanPolicyFunc
796 :
797 : // VirtualTableRewriteUnreferencedFraction configures the minimum fraction of
798 : // unreferenced data in a backing table required to trigger a virtual table
799 : // rewrite compaction. This is calculated as the ratio of unreferenced
800 : // data size to total backing file size. A value of 0.0 triggers
801 : // rewrites for any amount of unreferenced data. A value of 1.0 disables
802 : // virtual table rewrite compactions entirely. The default value is 0.30
803 : // (rewrite when >= 30% of backing data is unreferenced).
804 : VirtualTableRewriteUnreferencedFraction func() float64
805 :
806 : // IteratorTracking configures periodic logging of iterators held open for
807 : // too long.
808 : IteratorTracking struct {
809 : // PollInterval is the interval at which to log a report of long-lived
810 : // iterators. If zero, disables iterator tracking.
811 : //
812 : // The default value is 0 (disabled).
813 : PollInterval time.Duration
814 :
815 : // MaxAge is the age above which iterators are considered long-lived. If
816 : // zero, disables iterator tracking.
817 : //
818 : // The default value is 0 (disabled).
819 : MaxAge time.Duration
820 : }
821 : }
822 :
823 : // Filters is a map from filter policy name to filter policy. It is used for
824 : // debugging tools which may be used on multiple databases configured with
825 : // different filter policies. It is not necessary to populate this filters
826 : // map during normal usage of a DB (it will be done automatically by
827 : // EnsureDefaults).
828 : Filters map[string]FilterPolicy
829 :
830 : // FlushDelayDeleteRange configures how long the database should wait before
831 : // forcing a flush of a memtable that contains a range deletion. Disk space
832 : // cannot be reclaimed until the range deletion is flushed. No automatic
833 : // flush occurs if zero.
834 : FlushDelayDeleteRange time.Duration
835 :
836 : // FlushDelayRangeKey configures how long the database should wait before
837 : // forcing a flush of a memtable that contains a range key. Range keys in
838 : // the memtable prevent lazy combined iteration, so it's desirable to flush
839 : // range keys promptly. No automatic flush occurs if zero.
840 : FlushDelayRangeKey time.Duration
841 :
842 : // FlushSplitBytes denotes the target number of bytes per sublevel in
843 : // each flush split interval (i.e. range between two flush split keys)
844 : // in L0 sstables. When set to zero, only a single sstable is generated
845 : // by each flush. When set to a non-zero value, flushes are split at
846 : // points to meet L0's TargetFileSize, any grandparent-related overlap
847 : // options, and at boundary keys of L0 flush split intervals (which are
848 : // targeted to contain around FlushSplitBytes bytes in each sublevel
849 : // between pairs of boundary keys). Splitting sstables during flush
850 : // allows increased compaction flexibility and concurrency when those
851 : // tables are compacted to lower levels.
852 : FlushSplitBytes int64
853 :
854 : // FormatMajorVersion sets the format of on-disk files. It is
855 : // recommended to set the format major version to an explicit
856 : // version, as the default may change over time.
857 : //
858 : // At Open if the existing database is formatted using a later
859 : // format major version that is known to this version of Pebble,
860 : // Pebble will continue to use the later format major version. If
861 : // the existing database's version is unknown, the caller may use
862 : // FormatMostCompatible and will be able to open the database
863 : // regardless of its actual version.
864 : //
865 : // If the existing database is formatted using a format major
866 : // version earlier than the one specified, Open will automatically
867 : // ratchet the database to the specified format major version.
868 : FormatMajorVersion FormatMajorVersion
869 :
870 : // FS provides the interface for persistent file storage.
871 : //
872 : // The default value uses the underlying operating system's file system.
873 : FS vfs.FS
874 :
875 : // KeySchema is the name of the key schema that should be used when writing
876 : // new sstables. There must be a key schema with this name defined in
877 : // KeySchemas. If not set, colblk.DefaultKeySchema is used to construct a
878 : // default key schema.
879 : KeySchema string
880 :
881 : // KeySchemas defines the set of known schemas of user keys. When columnar
882 : // blocks are in use (see FormatColumnarBlocks), the user may specify how a
883 : // key should be decomposed into columns. Each KeySchema must have a unique
884 : // name. The schema named by Options.KeySchema is used while writing
885 : // sstables during flushes and compactions.
886 : //
887 : // Multiple KeySchemas may be used over the lifetime of a database. Once a
888 : // KeySchema is used, it must be provided in KeySchemas in subsequent calls
889 : // to Open for perpetuity.
890 : KeySchemas sstable.KeySchemas
891 :
892 : // Lock, if set, must be a database lock acquired through LockDirectory for
893 : // the same directory passed to Open. If provided, Open will skip locking
894 : // the directory. Closing the database will not release the lock, and it's
895 : // the responsibility of the caller to release the lock after closing the
896 : // database.
897 : //
898 : // Open will enforce that the Lock passed locks the same directory passed to
899 : // Open. Concurrent calls to Open using the same Lock are detected and
900 : // prohibited.
901 : Lock *base.DirLock
902 :
903 : // The count of L0 files necessary to trigger an L0 compaction.
904 : L0CompactionFileThreshold int
905 :
906 : // The amount of L0 read-amplification necessary to trigger an L0 compaction.
907 : L0CompactionThreshold int
908 :
909 : // Hard limit on L0 read-amplification, computed as the number of L0
910 : // sublevels. Writes are stopped when this threshold is reached.
911 : L0StopWritesThreshold int
912 :
913 : // The maximum number of bytes for LBase. The base level is the level which
914 : // L0 is compacted into. The base level is determined dynamically based on
915 : // the existing data in the LSM. The maximum number of bytes for other levels
916 : // is computed dynamically based on the base level's maximum size. When the
917 : // maximum number of bytes for a level is exceeded, compaction is requested.
918 : LBaseMaxBytes int64
919 :
920 : // TargetFileSizes contains the target file size for each level, ignoring
921 : // unpopulated levels. Specifically:
922 : // - TargetFileSizes[0] is the target file size for L0;
923 : // - TargetFileSizes[1] is the target file size for Lbase;
924 : // - TargetFileSizes[2] is the target file size for Lbase+1;
925 : // and so on.
926 : //
927 : // The default value for TargetFileSizes[0] is 2MB.
928 : // The default value for TargetFileSizes[i] is TargetFileSizes[i-1] * 2.
929 : TargetFileSizes [manifest.NumLevels]int64
930 :
931 : // Per-level options. Levels[i] contains the options for Li (regardless of
932 : // what Lbase is).
933 : Levels [manifest.NumLevels]LevelOptions
934 :
935 : // LoggerAndTracer will be used, if non-nil, else Logger will be used and
936 : // tracing will be a noop.
937 :
938 : // Logger used to write log messages.
939 : //
940 : // The default logger uses the Go standard library log package.
941 : Logger Logger
942 : // LoggerAndTracer is used for writing log messages and traces.
943 : LoggerAndTracer LoggerAndTracer
944 :
945 : // MaxManifestFileSize is the maximum size the MANIFEST file is allowed to
946 : // become. When the MANIFEST exceeds this size it is rolled over and a new
947 : // MANIFEST is created.
948 : MaxManifestFileSize int64
949 :
950 : // MaxOpenFiles is a soft limit on the number of open files that can be
951 : // used by the DB.
952 : //
953 : // The default value is 1000.
954 : MaxOpenFiles int
955 :
956 : // The size of a MemTable in steady state. The actual MemTable size starts at
957 : // min(256KB, MemTableSize) and doubles for each subsequent MemTable up to
958 : // MemTableSize. This reduces the memory pressure caused by MemTables for
959 : // short lived (test) DB instances. Note that more than one MemTable can be
960 : // in existence since flushing a MemTable involves creating a new one and
961 : // writing the contents of the old one in the
962 : // background. MemTableStopWritesThreshold places a hard limit on the size of
963 : // the queued MemTables.
964 : //
965 : // The default value is 4MB.
966 : MemTableSize uint64
967 :
968 : // Hard limit on the number of queued of MemTables. Writes are stopped when
969 : // the sum of the queued memtable sizes exceeds:
970 : // MemTableStopWritesThreshold * MemTableSize.
971 : //
972 : // This value should be at least 2 or writes will stop whenever a MemTable is
973 : // being flushed.
974 : //
975 : // The default value is 2.
976 : MemTableStopWritesThreshold int
977 :
978 : // Merger defines the associative merge operation to use for merging values
979 : // written with {Batch,DB}.Merge.
980 : //
981 : // The default merger concatenates values.
982 : Merger *Merger
983 :
984 : // CompactionConcurrencyRange returns a [lower, upper] range for the number of
985 : // compactions Pebble runs in parallel (with the caveats below), not including
986 : // download compactions (which have a separate limit specified by
987 : // MaxConcurrentDownloads).
988 : //
989 : // The lower value is the concurrency allowed under normal circumstances.
990 : // Pebble can dynamically increase the concurrency based on heuristics (like
991 : // high read amplification or compaction debt) up to the maximum.
992 : //
993 : // The upper value is a rough upper bound since delete-only compactions (a) do
994 : // not use the CompactionScheduler, and (b) the CompactionScheduler may use
995 : // other criteria to decide on how many compactions to permit.
996 : //
997 : // Elaborating on (b), when the ConcurrencyLimitScheduler is being used, the
998 : // value returned by DB.GetAllowedWithoutPermission fully controls how many
999 : // compactions get to run. Other CompactionSchedulers may use additional
1000 : // criteria, like resource availability.
1001 : //
1002 : // Elaborating on (a), we don't use the CompactionScheduler to schedule
1003 : // delete-only compactions since they are expected to be almost free from a
1004 : // CPU and disk usage perspective. Since the CompactionScheduler does not
1005 : // know about their existence, the total running count can exceed this
1006 : // value. For example, consider CompactionConcurrencyRange returns 3, and the
1007 : // current value returned from DB.GetAllowedWithoutPermission is also 3. Say
1008 : // 3 delete-only compactions are also running. Then the
1009 : // ConcurrencyLimitScheduler can also start 3 other compactions, for a total
1010 : // of 6.
1011 : //
1012 : // DB.GetAllowedWithoutPermission returns a value in the interval
1013 : // [lower, upper]. A value > lower is returned:
1014 : // - when L0 read-amplification passes the L0CompactionConcurrency threshold;
1015 : // - when compaction debt passes the CompactionDebtConcurrency threshold;
1016 : // - when there are multiple manual compactions waiting to run.
1017 : //
1018 : // lower and upper must be greater than 0. If lower > upper, then upper is
1019 : // used for both.
1020 : //
1021 : // The default values are 1, 1.
1022 : CompactionConcurrencyRange func() (lower, upper int)
1023 :
1024 : // MaxConcurrentDownloads specifies the maximum number of download
1025 : // compactions. These are compactions that copy an external file to the local
1026 : // store.
1027 : //
1028 : // This limit is independent of CompactionConcurrencyRange; at any point in
1029 : // time, we may be running CompactionConcurrencyRange non-download compactions
1030 : // and MaxConcurrentDownloads download compactions.
1031 : //
1032 : // MaxConcurrentDownloads() must be greater than 0.
1033 : //
1034 : // The default value is 1.
1035 : MaxConcurrentDownloads func() int
1036 :
1037 : // DisableAutomaticCompactions dictates whether automatic compactions are
1038 : // scheduled or not. The default is false (enabled). This option is only used
1039 : // externally when running a manual compaction, and internally for tests.
1040 : //
1041 : // Note that this field is only consulted while holding DB.mu, so it it safe
1042 : // for a test to modify it while holding DB.mu.
1043 : DisableAutomaticCompactions bool
1044 :
1045 : // DisableConsistencyCheck disables the consistency check that is performed on
1046 : // open. Should only be used when a database cannot be opened normally (e.g.
1047 : // some of the tables don't exist / aren't accessible).
1048 : DisableConsistencyCheck bool
1049 :
1050 : // DisableTableStats dictates whether tables should be loaded asynchronously
1051 : // to compute statistics that inform compaction heuristics. The collection
1052 : // of table stats improves compaction of tombstones, reclaiming disk space
1053 : // more quickly and in some cases reducing write amplification in the
1054 : // presence of tombstones. Disabling table stats may be useful in tests
1055 : // that require determinism as the asynchronicity of table stats collection
1056 : // introduces significant nondeterminism.
1057 : DisableTableStats bool
1058 :
1059 : // NoSyncOnClose decides whether the Pebble instance will enforce a
1060 : // close-time synchronization (e.g., fdatasync() or sync_file_range())
1061 : // on files it writes to. Setting this to true removes the guarantee for a
1062 : // sync on close. Some implementations can still issue a non-blocking sync.
1063 : NoSyncOnClose bool
1064 :
1065 : // NumPrevManifest is the number of non-current or older manifests which
1066 : // we want to keep around for debugging purposes. By default, we're going
1067 : // to keep one older manifest.
1068 : NumPrevManifest int
1069 :
1070 : // ReadOnly indicates that the DB should be opened in read-only mode. Writes
1071 : // to the DB will return an error, background compactions are disabled, and
1072 : // the flush that normally occurs after replaying the WAL at startup is
1073 : // disabled.
1074 : ReadOnly bool
1075 :
1076 : // FileCache is an initialized FileCache which should be set as an
1077 : // option if the DB needs to be initialized with a pre-existing file cache.
1078 : // If FileCache is nil, then a file cache which is unique to the DB instance
1079 : // is created. FileCache can be shared between db instances by setting it here.
1080 : // The FileCache set here must use the same underlying cache as Options.Cache
1081 : // and pebble will panic otherwise.
1082 : FileCache *FileCache
1083 :
1084 : // BlockPropertyCollectors is a list of BlockPropertyCollector creation
1085 : // functions. A new BlockPropertyCollector is created for each sstable
1086 : // built and lives for the lifetime of writing that table.
1087 : BlockPropertyCollectors []func() BlockPropertyCollector
1088 :
1089 : // WALBytesPerSync sets the number of bytes to write to a WAL before calling
1090 : // Sync on it in the background. Just like with BytesPerSync above, this
1091 : // helps smooth out disk write latencies, and avoids cases where the OS
1092 : // writes a lot of buffered data to disk at once. However, this is less
1093 : // necessary with WALs, as many write operations already pass in
1094 : // Sync = true.
1095 : //
1096 : // The default value is 0, i.e. no background syncing. This matches the
1097 : // default behaviour in RocksDB.
1098 : WALBytesPerSync int
1099 :
1100 : // WALDir specifies the directory to store write-ahead logs (WALs) in. If
1101 : // empty (the default), WALs will be stored in the same directory as sstables
1102 : // (i.e. the directory passed to pebble.Open).
1103 : WALDir string
1104 :
1105 : // WALDirLock, if set, must be a directory lock acquired through LockDirectory
1106 : // for the same directory set on WALDir passed to Open. If provided, Open will
1107 : // skip locking the directory. Closing the database will not release the lock,
1108 : // and it's the responsibility of the caller to release the lock after closing the
1109 : // database.
1110 : //
1111 : // Open will enforce that the Lock passed locks the same WAL directory passed to
1112 : // Open. Concurrent calls to Open using the same Lock are detected and
1113 : // prohibited.
1114 : WALDirLock *base.DirLock
1115 :
1116 : // WALFailover may be set to configure Pebble to monitor writes to its
1117 : // write-ahead log and failover to writing write-ahead log entries to a
1118 : // secondary location (eg, a separate physical disk). WALFailover may be
1119 : // used to improve write availability in the presence of transient disk
1120 : // unavailability.
1121 : WALFailover *WALFailoverOptions
1122 :
1123 : // WALRecoveryDirs is a list of additional directories that should be
1124 : // scanned for the existence of additional write-ahead logs. WALRecoveryDirs
1125 : // is expected to be used when starting Pebble with a new WALDir or a new
1126 : // WALFailover configuration. The directories associated with the previous
1127 : // configuration may still contain WALs that are required for recovery of
1128 : // the current database state.
1129 : //
1130 : // If a previous WAL configuration may have stored WALs elsewhere but there
1131 : // is not a corresponding entry in WALRecoveryDirs, Open will error (unless
1132 : // Unsafe.AllowMissingWALDirs is true).
1133 : WALRecoveryDirs []wal.Dir
1134 :
1135 : // WALMinSyncInterval is the minimum duration between syncs of the WAL. If
1136 : // WAL syncs are requested faster than this interval, they will be
1137 : // artificially delayed. Introducing a small artificial delay (500us) between
1138 : // WAL syncs can allow more operations to arrive and reduce IO operations
1139 : // while having a minimal impact on throughput. This option is supplied as a
1140 : // closure in order to allow the value to be changed dynamically. The default
1141 : // value is 0.
1142 : //
1143 : // TODO(peter): rather than a closure, should there be another mechanism for
1144 : // changing options dynamically?
1145 : WALMinSyncInterval func() time.Duration
1146 :
1147 : // DeletionPacing manage deletion pacing, which slows down deletions when
1148 : // compactions finish or when readers close and obsolete files must be cleaned
1149 : // up. Rapid deletion of many files simultaneously can increase disk latency
1150 : // on certain SSDs, and this functionality helps protect against that.
1151 : DeletionPacing DeletionPacingOptions
1152 :
1153 : // EnableSQLRowSpillMetrics specifies whether the Pebble instance will only be used
1154 : // to temporarily persist data spilled to disk for row-oriented SQL query execution.
1155 : EnableSQLRowSpillMetrics bool
1156 :
1157 : // AllocatorSizeClasses provides a sorted list containing the supported size
1158 : // classes of the underlying memory allocator. This provides hints to the
1159 : // sstable block writer's flushing policy to select block sizes that
1160 : // preemptively reduce internal fragmentation when loaded into the block cache.
1161 : AllocatorSizeClasses []int
1162 :
1163 : // Unsafe contains options that must be used very carefully and in exceptional
1164 : // circumstances.
1165 : Unsafe struct {
1166 : // AllowMissingWALDirs, if set to true, allows opening a DB when the WAL or
1167 : // WAL secondary directory was changed and the previous directory is not in
1168 : // WALRecoveryDirs. This can be used to move WALs without having to keep the
1169 : // previous directory in the options forever.
1170 : //
1171 : // CAUTION: Enabling this option will lead to data loss if the missing
1172 : // directory contained any WAL files that were not flushed to sstables.
1173 : AllowMissingWALDirs bool
1174 : }
1175 :
1176 : // private options are only used by internal tests or are used internally
1177 : // for facilitating upgrade paths of unconfigurable functionality.
1178 : private struct {
1179 : // disableDeleteOnlyCompactions prevents the scheduling of delete-only
1180 : // compactions that drop sstables wholy covered by range tombstones or
1181 : // range key tombstones.
1182 : disableDeleteOnlyCompactions bool
1183 :
1184 : // disableElisionOnlyCompactions prevents the scheduling of elision-only
1185 : // compactions that rewrite sstables in place in order to elide obsolete
1186 : // keys.
1187 : disableElisionOnlyCompactions bool
1188 :
1189 : // disableLazyCombinedIteration is a private option used by the
1190 : // metamorphic tests to test equivalence between lazy-combined iteration
1191 : // and constructing the range-key iterator upfront. It's a private
1192 : // option to avoid littering the public interface with options that we
1193 : // do not want to allow users to actually configure.
1194 : disableLazyCombinedIteration bool
1195 :
1196 : // testingAlwaysWaitForCleanup is set by some tests to force waiting for
1197 : // obsolete file deletion (to make events deterministic).
1198 : testingAlwaysWaitForCleanup bool
1199 :
1200 : // testingBeforeIngestApplyFunc when non-nil, is called when ingesting,
1201 : // before calling DB.ingestApply.
1202 : testingBeforeIngestApplyFunc func()
1203 :
1204 : // timeNow returns the current time. It defaults to time.Now. It's
1205 : // configurable here so that tests can mock the current time.
1206 : timeNow func() time.Time
1207 :
1208 : // fsCloser holds a closer that should be invoked after a DB using these
1209 : // Options is closed. This is used to automatically stop the
1210 : // long-running goroutine associated with the disk-health-checking FS.
1211 : // See the initialization of FS in EnsureDefaults. Note that care has
1212 : // been taken to ensure that it is still safe to continue using the FS
1213 : // after this closer has been invoked. However, if write operations
1214 : // against the FS are made after the DB is closed, the FS may leak a
1215 : // goroutine indefinitely.
1216 : fsCloser io.Closer
1217 : }
1218 : }
1219 :
1220 : // DeletionPacingOptions contains configuration options contorlling the pacing
1221 : // of deletion of obsolete files.
1222 : type DeletionPacingOptions = deletepacer.Options
1223 :
1224 : // ValueSeparationPolicy controls the policy for separating values into
1225 : // external blob files.
1226 : type ValueSeparationPolicy struct {
1227 : // Enabled controls whether value separation is enabled.
1228 : Enabled bool
1229 : // MinimumSize imposes a lower bound on the size of values that can be
1230 : // separated into a blob file. Values smaller than this are always written
1231 : // to the sstable (but may still be written to a value block within the
1232 : // sstable).
1233 : //
1234 : // MinimumSize must be > 0.
1235 : MinimumSize int
1236 : // MaxBlobReferenceDepth limits the number of potentially overlapping (in
1237 : // the keyspace) blob files that can be referenced by a single sstable. If a
1238 : // compaction may produce an output sstable referencing more than this many
1239 : // overlapping blob files, the compaction will instead rewrite referenced
1240 : // values into new blob files.
1241 : //
1242 : // MaxBlobReferenceDepth must be > 0.
1243 : MaxBlobReferenceDepth int
1244 : // RewriteMinimumAge specifies how old a blob file must be in order for it
1245 : // to be eligible for a rewrite that reclaims disk space. Lower values
1246 : // reduce space amplification at the cost of write amplification
1247 : RewriteMinimumAge time.Duration
1248 : // GarbageRatioLowPriority is a value in the range [0, 1.0] configuring how
1249 : // aggressively blob files should be written in order to reduce space
1250 : // amplification induced by value separation. As sstable compactions rewrite
1251 : // references to blob files, data may be duplicated. Older blob files
1252 : // containing the duplicated data may need to remain because other sstables
1253 : // are referencing other values contained in the same file.
1254 : //
1255 : // The DB can rewrite these blob files in place in order to reduce this
1256 : // space amplification, but this incurs write amplification. This option
1257 : // configures how much garbage may accrue before the DB will attempt to
1258 : // rewrite blob files to reduce it if there is no other higher priority
1259 : // compaction work available. A value of 0.20 indicates that once 20% of
1260 : // values in blob files are unreferenced, the DB should attempt to rewrite
1261 : // blob files to reclaim disk space.
1262 : //
1263 : // A value of 1.0 indicates that the DB should never attempt to rewrite blob
1264 : // files.
1265 : GarbageRatioLowPriority float64
1266 : // GarbageRatioHighPriority is a value in the range [0, 1.0] configuring how
1267 : // much garbage must be present before the DB will schedule blob file
1268 : // rewrite compactions at a high priority (including above default
1269 : // compactions necessary to keep up with incoming writes). At most 1 blob file
1270 : // rewrite compaction will be scheduled at a time.
1271 : //
1272 : // See GarbageRatioLowPriority for more details. Must be >=
1273 : // GarbageRatioLowPriority.
1274 : GarbageRatioHighPriority float64
1275 : }
1276 :
1277 : // SpanPolicy contains policies that can vary by key range. The zero value is
1278 : // the default value.
1279 : type SpanPolicy struct {
1280 : // Prefer a faster compression algorithm for the keys in this span.
1281 : //
1282 : // This is useful for keys that are frequently read or written but which don't
1283 : // amount to a significant amount of space.
1284 : PreferFastCompression bool
1285 :
1286 : // DisableValueSeparationBySuffix disables discriminating KVs depending on
1287 : // suffix.
1288 : //
1289 : // Among a set of keys with the same prefix, Pebble's default heuristics
1290 : // optimize access to the KV with the smallest suffix. This is useful for MVCC
1291 : // keys (where the smallest suffix is the latest version), but should be
1292 : // disabled for keys where the suffix does not correspond to a version.
1293 : DisableValueSeparationBySuffix bool
1294 :
1295 : // ValueStoragePolicy is a hint used to determine where to store the values
1296 : // for KVs.
1297 : ValueStoragePolicy ValueStoragePolicy
1298 : }
1299 :
1300 : // String returns a string representation of the SpanPolicy.
1301 0 : func (p SpanPolicy) String() string {
1302 0 : var sb strings.Builder
1303 0 : if p.PreferFastCompression {
1304 0 : sb.WriteString("fast-compression,")
1305 0 : }
1306 0 : if p.DisableValueSeparationBySuffix {
1307 0 : sb.WriteString("disable-value-separation-by-suffix,")
1308 0 : }
1309 0 : switch p.ValueStoragePolicy {
1310 0 : case ValueStorageLowReadLatency:
1311 0 : sb.WriteString("low-read-latency,")
1312 0 : case ValueStorageLatencyTolerant:
1313 0 : sb.WriteString("latency-tolerant,")
1314 : }
1315 0 : return strings.TrimSuffix(sb.String(), ",")
1316 : }
1317 :
1318 : // ValueStoragePolicy is a hint used to determine where to store the values for
1319 : // KVs.
1320 : type ValueStoragePolicy uint8
1321 :
1322 : const (
1323 : // ValueStorageDefault is the default value; Pebble will respect global
1324 : // configuration for value blocks and value separation.
1325 : ValueStorageDefault ValueStoragePolicy = iota
1326 :
1327 : // ValueStorageLowReadLatency indicates Pebble should prefer storing values
1328 : // in-place.
1329 : ValueStorageLowReadLatency
1330 :
1331 : // ValueStorageLatencyTolerant indicates value retrieval can tolerate
1332 : // additional latency, so Pebble should aggressively prefer storing values
1333 : // separately if it can reduce write amplification.
1334 : //
1335 : // If the global Options' enable value separation, Pebble may choose to
1336 : // separate values under the LatencyTolerant policy even if they do not meet
1337 : // the minimum size threshold of the global Options' ValueSeparationPolicy.
1338 : ValueStorageLatencyTolerant
1339 : )
1340 :
1341 : // SpanPolicyFunc is used to determine the SpanPolicy for a key region.
1342 : //
1343 : // The returned policy is valid from the start key until (and not including) the
1344 : // end key.
1345 : //
1346 : // A flush or compaction will call this function once for the first key to be
1347 : // output. If the compaction reaches the end key, the current output sst is
1348 : // finished and the function is called again.
1349 : //
1350 : // The end key can be empty, in which case the policy is valid for the entire
1351 : // keyspace after startKey.
1352 : type SpanPolicyFunc func(startKey []byte) (policy SpanPolicy, endKey []byte, err error)
1353 :
1354 : // SpanAndPolicy defines a key range and the policy to apply to it.
1355 : type SpanAndPolicy struct {
1356 : KeyRange KeyRange
1357 : Policy SpanPolicy
1358 : }
1359 :
1360 : // MakeStaticSpanPolicyFunc returns a SpanPolicyFunc that applies a given policy
1361 : // to the given span (and the default policy outside the span). The supplied
1362 : // policies must be non-overlapping in key range.
1363 0 : func MakeStaticSpanPolicyFunc(cmp base.Compare, inputPolicies ...SpanAndPolicy) SpanPolicyFunc {
1364 0 : // Collect all the boundaries of the input policies, sort and deduplicate them.
1365 0 : uniqueKeys := make([][]byte, 0, 2*len(inputPolicies))
1366 0 : for i := range inputPolicies {
1367 0 : uniqueKeys = append(uniqueKeys, inputPolicies[i].KeyRange.Start)
1368 0 : uniqueKeys = append(uniqueKeys, inputPolicies[i].KeyRange.End)
1369 0 : }
1370 0 : slices.SortFunc(uniqueKeys, cmp)
1371 0 : uniqueKeys = slices.CompactFunc(uniqueKeys, func(a, b []byte) bool { return cmp(a, b) == 0 })
1372 :
1373 : // Create a list of policies.
1374 0 : policies := make([]SpanPolicy, len(uniqueKeys)-1)
1375 0 : for _, p := range inputPolicies {
1376 0 : idx, _ := slices.BinarySearchFunc(uniqueKeys, p.KeyRange.Start, cmp)
1377 0 : policies[idx] = p.Policy
1378 0 : }
1379 :
1380 0 : return func(startKey []byte) (_ SpanPolicy, endKey []byte, _ error) {
1381 0 : // Find the policy that applies to the start key.
1382 0 : idx, eq := slices.BinarySearchFunc(uniqueKeys, startKey, cmp)
1383 0 : switch idx {
1384 0 : case len(uniqueKeys):
1385 0 : // The start key is after the last policy.
1386 0 : return SpanPolicy{}, nil, nil
1387 0 : case len(uniqueKeys) - 1:
1388 0 : if eq {
1389 0 : // The start key is exactly the start of the last policy.
1390 0 : return SpanPolicy{}, nil, nil
1391 0 : }
1392 0 : case 0:
1393 0 : if !eq {
1394 0 : // The start key is before the first policy.
1395 0 : return SpanPolicy{}, uniqueKeys[0], nil
1396 0 : }
1397 : }
1398 0 : if eq {
1399 0 : // The start key is exactly the start of this policy.
1400 0 : return policies[idx], uniqueKeys[idx+1], nil
1401 0 : }
1402 : // The start key is between two policies.
1403 0 : return policies[idx-1], uniqueKeys[idx], nil
1404 : }
1405 : }
1406 :
1407 : // WALFailoverOptions configures the WAL failover mechanics to use during
1408 : // transient write unavailability on the primary WAL volume.
1409 : type WALFailoverOptions struct {
1410 : // Secondary indicates the secondary directory and VFS to use in the event a
1411 : // write to the primary WAL stalls. The Lock field may be set during setup to
1412 : // preacquire the lock on the secondary directory.
1413 : Secondary wal.Dir
1414 :
1415 : // FailoverOptions provides configuration of the thresholds and intervals
1416 : // involved in WAL failover. If any of its fields are left unspecified,
1417 : // reasonable defaults will be used.
1418 : wal.FailoverOptions
1419 : }
1420 :
1421 : // ReadaheadConfig controls the use of read-ahead.
1422 : type ReadaheadConfig = objstorageprovider.ReadaheadConfig
1423 :
1424 : // JemallocSizeClasses exports sstable.JemallocSizeClasses.
1425 : var JemallocSizeClasses = sstable.JemallocSizeClasses
1426 :
1427 : // DebugCheckLevels calls CheckLevels on the provided database.
1428 : // It may be set in the DebugCheck field of Options to check
1429 : // level invariants whenever a new version is installed.
1430 1 : func DebugCheckLevels(db *DB) error {
1431 1 : return db.CheckLevels(nil)
1432 1 : }
1433 :
1434 : // DBCompressionSettings contains compression settings for the entire store. It
1435 : // defines compression profiles for each LSM level.
1436 : type DBCompressionSettings struct {
1437 : Name string
1438 : Levels [manifest.NumLevels]*block.CompressionProfile
1439 : }
1440 :
1441 : // Predefined compression settings.
1442 : var (
1443 : DBCompressionNone = UniformDBCompressionSettings(block.NoCompression)
1444 : DBCompressionFastest = UniformDBCompressionSettings(block.FastestCompression)
1445 1 : DBCompressionFast = func() DBCompressionSettings {
1446 1 : cs := DBCompressionSettings{Name: "Fast"}
1447 1 : for i := 0; i < manifest.NumLevels-1; i++ {
1448 1 : cs.Levels[i] = block.FastestCompression
1449 1 : }
1450 1 : cs.Levels[manifest.NumLevels-1] = block.FastCompression
1451 1 : return cs
1452 : }()
1453 1 : DBCompressionBalanced = func() DBCompressionSettings {
1454 1 : cs := DBCompressionSettings{Name: "Balanced"}
1455 1 : for i := 0; i < manifest.NumLevels-2; i++ {
1456 1 : cs.Levels[i] = block.FastestCompression
1457 1 : }
1458 1 : cs.Levels[manifest.NumLevels-2] = block.FastCompression
1459 1 : cs.Levels[manifest.NumLevels-1] = block.BalancedCompression
1460 1 : return cs
1461 : }()
1462 1 : DBCompressionGood = func() DBCompressionSettings {
1463 1 : cs := DBCompressionSettings{Name: "Good"}
1464 1 : for i := 0; i < manifest.NumLevels-2; i++ {
1465 1 : cs.Levels[i] = block.FastestCompression
1466 1 : }
1467 1 : cs.Levels[manifest.NumLevels-2] = block.BalancedCompression
1468 1 : cs.Levels[manifest.NumLevels-1] = block.GoodCompression
1469 1 : return cs
1470 : }()
1471 : )
1472 :
1473 : // UniformDBCompressionSettings returns a DBCompressionSettings which uses the
1474 : // same compression profile on all LSM levels.
1475 1 : func UniformDBCompressionSettings(profile *block.CompressionProfile) DBCompressionSettings {
1476 1 : cs := DBCompressionSettings{Name: profile.Name}
1477 1 : for i := range cs.Levels {
1478 1 : cs.Levels[i] = profile
1479 1 : }
1480 1 : return cs
1481 : }
1482 :
1483 : // ApplyCompressionSettings sets the Compression field in each LevelOptions to
1484 : // call the given function and return the compression profile for that level.
1485 0 : func (o *Options) ApplyCompressionSettings(csFn func() DBCompressionSettings) {
1486 0 : for i := range o.Levels {
1487 0 : levelIdx := i
1488 0 : o.Levels[i].Compression = func() *block.CompressionProfile {
1489 0 : return csFn().Levels[levelIdx]
1490 0 : }
1491 : }
1492 : }
1493 :
1494 : // EnsureDefaults ensures that the default values for all options are set if a
1495 : // valid value was not already specified.
1496 1 : func (o *Options) EnsureDefaults() {
1497 1 : if o.Cache == nil && o.CacheSize == 0 {
1498 0 : o.CacheSize = cacheDefaultSize
1499 0 : }
1500 1 : o.Comparer = o.Comparer.EnsureDefaults()
1501 1 :
1502 1 : if o.BytesPerSync <= 0 {
1503 0 : o.BytesPerSync = 512 << 10 // 512 KB
1504 0 : }
1505 1 : if o.Cleaner == nil {
1506 0 : o.Cleaner = DeleteCleaner{}
1507 0 : }
1508 1 : o.DeletionPacing.EnsureDefaults()
1509 1 :
1510 1 : if o.Experimental.DisableIngestAsFlushable == nil {
1511 1 : o.Experimental.DisableIngestAsFlushable = func() bool { return false }
1512 : }
1513 1 : if o.Experimental.L0CompactionConcurrency <= 0 {
1514 0 : o.Experimental.L0CompactionConcurrency = 10
1515 0 : }
1516 1 : if o.Experimental.CompactionDebtConcurrency <= 0 {
1517 0 : o.Experimental.CompactionDebtConcurrency = 1 << 30 // 1 GB
1518 0 : }
1519 1 : if o.Experimental.CompactionGarbageFractionForMaxConcurrency == nil {
1520 0 : // When 40% of the DB is garbage, the compaction concurrency is at the
1521 0 : // maximum permitted.
1522 0 : o.Experimental.CompactionGarbageFractionForMaxConcurrency = func() float64 { return 0.4 }
1523 : }
1524 1 : if o.Experimental.ValueSeparationPolicy == nil {
1525 0 : o.Experimental.ValueSeparationPolicy = func() ValueSeparationPolicy {
1526 0 : return ValueSeparationPolicy{Enabled: false}
1527 0 : }
1528 : }
1529 1 : if o.KeySchema == "" && len(o.KeySchemas) == 0 {
1530 0 : ks := colblk.DefaultKeySchema(o.Comparer, 16 /* bundleSize */)
1531 0 : o.KeySchema = ks.Name
1532 0 : o.KeySchemas = sstable.MakeKeySchemas(&ks)
1533 0 : }
1534 1 : if o.L0CompactionThreshold <= 0 {
1535 0 : o.L0CompactionThreshold = 4
1536 0 : }
1537 1 : if o.L0CompactionFileThreshold <= 0 {
1538 0 : // Some justification for the default of 500:
1539 0 : // Why not smaller?:
1540 0 : // - The default target file size for L0 is 2MB, so 500 files is <= 1GB
1541 0 : // of data. At observed compaction speeds of > 20MB/s, L0 can be
1542 0 : // cleared of all files in < 1min, so this backlog is not huge.
1543 0 : // - 500 files is low overhead for instantiating L0 sublevels from
1544 0 : // scratch.
1545 0 : // - Lower values were observed to cause excessive and inefficient
1546 0 : // compactions out of L0 in a TPCC import benchmark.
1547 0 : // Why not larger?:
1548 0 : // - More than 1min to compact everything out of L0.
1549 0 : // - CockroachDB's admission control system uses a threshold of 1000
1550 0 : // files to start throttling writes to Pebble. Using 500 here gives
1551 0 : // us headroom between when Pebble should start compacting L0 and
1552 0 : // when the admission control threshold is reached.
1553 0 : //
1554 0 : // We can revisit this default in the future based on better
1555 0 : // experimental understanding.
1556 0 : //
1557 0 : // TODO(jackson): Experiment with slightly lower thresholds [or higher
1558 0 : // admission control thresholds] to see whether a higher L0 score at the
1559 0 : // threshold (currently 2.0) is necessary for some workloads to avoid
1560 0 : // starving L0 in favor of lower-level compactions.
1561 0 : o.L0CompactionFileThreshold = 500
1562 0 : }
1563 1 : if o.L0StopWritesThreshold <= 0 {
1564 0 : o.L0StopWritesThreshold = 12
1565 0 : }
1566 1 : if o.LBaseMaxBytes <= 0 {
1567 0 : o.LBaseMaxBytes = 64 << 20 // 64 MB
1568 0 : }
1569 1 : if o.TargetFileSizes[0] <= 0 {
1570 0 : o.TargetFileSizes[0] = 2 << 20 // 2 MB
1571 0 : }
1572 1 : for i := 1; i < len(o.TargetFileSizes); i++ {
1573 1 : if o.TargetFileSizes[i] <= 0 {
1574 0 : o.TargetFileSizes[i] = o.TargetFileSizes[i-1] * 2
1575 0 : }
1576 : }
1577 1 : o.Levels[0].EnsureL0Defaults()
1578 1 : for i := 1; i < len(o.Levels); i++ {
1579 1 : o.Levels[i].EnsureL1PlusDefaults(&o.Levels[i-1])
1580 1 : }
1581 1 : if o.Logger == nil {
1582 1 : o.Logger = DefaultLogger
1583 1 : }
1584 1 : if o.EventListener == nil {
1585 1 : o.EventListener = &EventListener{}
1586 1 : }
1587 1 : o.EventListener.EnsureDefaults(o.Logger)
1588 1 : if o.MaxManifestFileSize == 0 {
1589 0 : o.MaxManifestFileSize = 128 << 20 // 128 MB
1590 0 : }
1591 1 : if o.MaxOpenFiles == 0 {
1592 0 : o.MaxOpenFiles = 1000
1593 0 : }
1594 1 : if o.MemTableSize <= 0 {
1595 0 : o.MemTableSize = 4 << 20 // 4 MB
1596 0 : }
1597 1 : if o.MemTableStopWritesThreshold <= 0 {
1598 0 : o.MemTableStopWritesThreshold = 2
1599 0 : }
1600 1 : if o.Merger == nil {
1601 0 : o.Merger = DefaultMerger
1602 0 : }
1603 1 : if o.CompactionConcurrencyRange == nil {
1604 0 : o.CompactionConcurrencyRange = func() (int, int) { return 1, 1 }
1605 : }
1606 1 : if o.MaxConcurrentDownloads == nil {
1607 0 : o.MaxConcurrentDownloads = func() int { return 1 }
1608 : }
1609 1 : if o.NumPrevManifest <= 0 {
1610 1 : o.NumPrevManifest = 1
1611 1 : }
1612 :
1613 1 : if o.FormatMajorVersion == FormatDefault {
1614 0 : o.FormatMajorVersion = FormatMinSupported
1615 0 : if o.Experimental.CreateOnShared != remote.CreateOnSharedNone {
1616 0 : o.FormatMajorVersion = FormatMinForSharedObjects
1617 0 : }
1618 : }
1619 :
1620 1 : if o.FS == nil {
1621 0 : o.WithFSDefaults()
1622 0 : }
1623 1 : if o.FlushSplitBytes <= 0 {
1624 0 : o.FlushSplitBytes = 2 * o.TargetFileSizes[0]
1625 0 : }
1626 1 : if o.WALFailover != nil {
1627 1 : o.WALFailover.FailoverOptions.EnsureDefaults()
1628 1 : }
1629 1 : if o.Experimental.UseDeprecatedCompensatedScore == nil {
1630 1 : o.Experimental.UseDeprecatedCompensatedScore = func() bool { return false }
1631 : }
1632 1 : if o.Experimental.LevelMultiplier <= 0 {
1633 1 : o.Experimental.LevelMultiplier = defaultLevelMultiplier
1634 1 : }
1635 1 : if o.Experimental.ReadCompactionRate == 0 {
1636 0 : o.Experimental.ReadCompactionRate = 16000
1637 0 : }
1638 1 : if o.Experimental.ReadSamplingMultiplier == 0 {
1639 0 : o.Experimental.ReadSamplingMultiplier = 1 << 4
1640 0 : }
1641 1 : if o.Experimental.NumDeletionsThreshold == 0 {
1642 0 : o.Experimental.NumDeletionsThreshold = sstable.DefaultNumDeletionsThreshold
1643 0 : }
1644 1 : if o.Experimental.DeletionSizeRatioThreshold == 0 {
1645 0 : o.Experimental.DeletionSizeRatioThreshold = sstable.DefaultDeletionSizeRatioThreshold
1646 0 : }
1647 1 : if o.Experimental.TombstoneDenseCompactionThreshold == nil {
1648 0 : o.Experimental.TombstoneDenseCompactionThreshold = func() float64 { return 0.10 }
1649 : }
1650 1 : if o.Experimental.FileCacheShards <= 0 {
1651 0 : o.Experimental.FileCacheShards = runtime.GOMAXPROCS(0)
1652 0 : }
1653 1 : if o.Experimental.MultiLevelCompactionHeuristic == nil {
1654 0 : o.Experimental.MultiLevelCompactionHeuristic = OptionWriteAmpHeuristic
1655 0 : }
1656 1 : if o.Experimental.SpanPolicyFunc == nil {
1657 1 : o.Experimental.SpanPolicyFunc = func(startKey []byte) (SpanPolicy, []byte, error) { return SpanPolicy{}, nil, nil }
1658 : }
1659 1 : if o.Experimental.VirtualTableRewriteUnreferencedFraction == nil {
1660 1 : o.Experimental.VirtualTableRewriteUnreferencedFraction = func() float64 { return defaultVirtualTableUnreferencedFraction }
1661 : }
1662 1 : if o.private.timeNow == nil {
1663 1 : o.private.timeNow = time.Now
1664 1 : }
1665 : // TODO(jackson): Enable value separation by default once we have confidence
1666 : // in a default policy.
1667 :
1668 1 : o.initMaps()
1669 : }
1670 :
1671 : // TargetFileSize computes the target file size for the given output level.
1672 1 : func (o *Options) TargetFileSize(outputLevel int, baseLevel int) int64 {
1673 1 : if outputLevel == 0 {
1674 1 : return o.TargetFileSizes[0]
1675 1 : }
1676 1 : if baseLevel > outputLevel {
1677 0 : panic(fmt.Sprintf("invalid base level %d (output level %d)", baseLevel, outputLevel))
1678 : }
1679 1 : return o.TargetFileSizes[outputLevel-baseLevel+1]
1680 : }
1681 :
1682 : // DefaultOptions returns a new Options object with the default values set.
1683 0 : func DefaultOptions() *Options {
1684 0 : o := &Options{}
1685 0 : o.EnsureDefaults()
1686 0 : return o
1687 0 : }
1688 :
1689 : // WithFSDefaults configures the Options to wrap the configured filesystem with
1690 : // the default virtual file system middleware, like disk-health checking.
1691 1 : func (o *Options) WithFSDefaults() {
1692 1 : if o.FS == nil {
1693 0 : o.FS = vfs.Default
1694 0 : }
1695 1 : o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(o.FS, 5*time.Second, nil,
1696 1 : func(info vfs.DiskSlowInfo) {
1697 0 : o.EventListener.DiskSlow(info)
1698 0 : })
1699 : }
1700 :
1701 : // AddEventListener adds the provided event listener to the Options, in addition
1702 : // to any existing event listener.
1703 0 : func (o *Options) AddEventListener(l EventListener) {
1704 0 : if o.EventListener != nil {
1705 0 : l = TeeEventListener(l, *o.EventListener)
1706 0 : }
1707 0 : o.EventListener = &l
1708 : }
1709 :
1710 : // initMaps initializes the Comparers, Filters, and Mergers maps.
1711 1 : func (o *Options) initMaps() {
1712 1 : for i := range o.Levels {
1713 1 : l := &o.Levels[i]
1714 1 : if l.FilterPolicy != NoFilterPolicy {
1715 1 : if o.Filters == nil {
1716 1 : o.Filters = make(map[string]FilterPolicy)
1717 1 : }
1718 1 : name := l.FilterPolicy.Name()
1719 1 : if _, ok := o.Filters[name]; !ok {
1720 1 : o.Filters[name] = l.FilterPolicy
1721 1 : }
1722 : }
1723 : }
1724 : }
1725 :
1726 : // Clone creates a shallow-copy of the supplied options.
1727 1 : func (o *Options) Clone() *Options {
1728 1 : if o == nil {
1729 0 : return &Options{}
1730 0 : }
1731 1 : n := *o
1732 1 : if o.WALFailover != nil {
1733 1 : c := *o.WALFailover
1734 1 : n.WALFailover = &c
1735 1 : }
1736 1 : return &n
1737 : }
1738 :
1739 1 : func (o *Options) String() string {
1740 1 : var buf bytes.Buffer
1741 1 :
1742 1 : cacheSize := o.CacheSize
1743 1 : if o.Cache != nil {
1744 1 : cacheSize = o.Cache.MaxSize()
1745 1 : }
1746 :
1747 1 : fmt.Fprintf(&buf, "[Version]\n")
1748 1 : fmt.Fprintf(&buf, " pebble_version=0.1\n")
1749 1 : fmt.Fprintf(&buf, "\n")
1750 1 : fmt.Fprintf(&buf, "[Options]\n")
1751 1 : fmt.Fprintf(&buf, " bytes_per_sync=%d\n", o.BytesPerSync)
1752 1 : fmt.Fprintf(&buf, " cache_size=%d\n", cacheSize)
1753 1 : fmt.Fprintf(&buf, " cleaner=%s\n", o.Cleaner)
1754 1 : fmt.Fprintf(&buf, " compaction_debt_concurrency=%d\n", o.Experimental.CompactionDebtConcurrency)
1755 1 : fmt.Fprintf(&buf, " compaction_garbage_fraction_for_max_concurrency=%.2f\n",
1756 1 : o.Experimental.CompactionGarbageFractionForMaxConcurrency())
1757 1 : fmt.Fprintf(&buf, " comparer=%s\n", o.Comparer.Name)
1758 1 : fmt.Fprintf(&buf, " disable_wal=%t\n", o.DisableWAL)
1759 1 : if o.Experimental.DisableIngestAsFlushable != nil && o.Experimental.DisableIngestAsFlushable() {
1760 1 : fmt.Fprintf(&buf, " disable_ingest_as_flushable=%t\n", true)
1761 1 : }
1762 1 : fmt.Fprintf(&buf, " flush_delay_delete_range=%s\n", o.FlushDelayDeleteRange)
1763 1 : fmt.Fprintf(&buf, " flush_delay_range_key=%s\n", o.FlushDelayRangeKey)
1764 1 : fmt.Fprintf(&buf, " flush_split_bytes=%d\n", o.FlushSplitBytes)
1765 1 : fmt.Fprintf(&buf, " format_major_version=%d\n", o.FormatMajorVersion)
1766 1 : fmt.Fprintf(&buf, " key_schema=%s\n", o.KeySchema)
1767 1 : fmt.Fprintf(&buf, " l0_compaction_concurrency=%d\n", o.Experimental.L0CompactionConcurrency)
1768 1 : fmt.Fprintf(&buf, " l0_compaction_file_threshold=%d\n", o.L0CompactionFileThreshold)
1769 1 : fmt.Fprintf(&buf, " l0_compaction_threshold=%d\n", o.L0CompactionThreshold)
1770 1 : fmt.Fprintf(&buf, " l0_stop_writes_threshold=%d\n", o.L0StopWritesThreshold)
1771 1 : fmt.Fprintf(&buf, " lbase_max_bytes=%d\n", o.LBaseMaxBytes)
1772 1 : if o.Experimental.LevelMultiplier != defaultLevelMultiplier {
1773 1 : fmt.Fprintf(&buf, " level_multiplier=%d\n", o.Experimental.LevelMultiplier)
1774 1 : }
1775 1 : lower, upper := o.CompactionConcurrencyRange()
1776 1 : fmt.Fprintf(&buf, " concurrent_compactions=%d\n", lower)
1777 1 : fmt.Fprintf(&buf, " max_concurrent_compactions=%d\n", upper)
1778 1 : fmt.Fprintf(&buf, " max_concurrent_downloads=%d\n", o.MaxConcurrentDownloads())
1779 1 : fmt.Fprintf(&buf, " max_manifest_file_size=%d\n", o.MaxManifestFileSize)
1780 1 : fmt.Fprintf(&buf, " max_open_files=%d\n", o.MaxOpenFiles)
1781 1 : fmt.Fprintf(&buf, " mem_table_size=%d\n", o.MemTableSize)
1782 1 : fmt.Fprintf(&buf, " mem_table_stop_writes_threshold=%d\n", o.MemTableStopWritesThreshold)
1783 1 : fmt.Fprintf(&buf, " min_deletion_rate=%d\n", o.DeletionPacing.BaselineRate())
1784 1 : fmt.Fprintf(&buf, " free_space_threshold_bytes=%d\n", o.DeletionPacing.FreeSpaceThresholdBytes)
1785 1 : fmt.Fprintf(&buf, " free_space_timeframe=%s\n", o.DeletionPacing.FreeSpaceTimeframe.String())
1786 1 : fmt.Fprintf(&buf, " obsolete_bytes_timeframe=%s\n", o.DeletionPacing.BacklogTimeframe.String())
1787 1 : fmt.Fprintf(&buf, " merger=%s\n", o.Merger.Name)
1788 1 : if o.Experimental.MultiLevelCompactionHeuristic != nil {
1789 1 : fmt.Fprintf(&buf, " multilevel_compaction_heuristic=%s\n", o.Experimental.MultiLevelCompactionHeuristic().String())
1790 1 : }
1791 1 : fmt.Fprintf(&buf, " read_compaction_rate=%d\n", o.Experimental.ReadCompactionRate)
1792 1 : fmt.Fprintf(&buf, " read_sampling_multiplier=%d\n", o.Experimental.ReadSamplingMultiplier)
1793 1 : fmt.Fprintf(&buf, " num_deletions_threshold=%d\n", o.Experimental.NumDeletionsThreshold)
1794 1 : fmt.Fprintf(&buf, " deletion_size_ratio_threshold=%f\n", o.Experimental.DeletionSizeRatioThreshold)
1795 1 : fmt.Fprintf(&buf, " tombstone_dense_compaction_threshold=%f\n", o.Experimental.TombstoneDenseCompactionThreshold())
1796 1 : // We no longer care about strict_wal_tail, but set it to true in case an
1797 1 : // older version reads the options.
1798 1 : fmt.Fprintf(&buf, " strict_wal_tail=%t\n", true)
1799 1 : fmt.Fprintf(&buf, " table_cache_shards=%d\n", o.Experimental.FileCacheShards)
1800 1 : fmt.Fprintf(&buf, " validate_on_ingest=%t\n", o.Experimental.ValidateOnIngest)
1801 1 : fmt.Fprintf(&buf, " wal_dir=%s\n", o.WALDir)
1802 1 : fmt.Fprintf(&buf, " wal_bytes_per_sync=%d\n", o.WALBytesPerSync)
1803 1 : fmt.Fprintf(&buf, " secondary_cache_size_bytes=%d\n", o.Experimental.SecondaryCacheSizeBytes)
1804 1 : fmt.Fprintf(&buf, " create_on_shared=%d\n", o.Experimental.CreateOnShared)
1805 1 :
1806 1 : if o.Experimental.IteratorTracking.PollInterval != 0 {
1807 1 : fmt.Fprintf(&buf, " iterator_tracking_poll_interval=%s\n", o.Experimental.IteratorTracking.PollInterval)
1808 1 : }
1809 1 : if o.Experimental.IteratorTracking.MaxAge != 0 {
1810 1 : fmt.Fprintf(&buf, " iterator_tracking_max_age=%s\n", o.Experimental.IteratorTracking.MaxAge)
1811 1 : }
1812 :
1813 : // Private options.
1814 : //
1815 : // These options are only encoded if true, because we do not want them to
1816 : // appear in production serialized Options files, since they're testing-only
1817 : // options. They're only serialized when true, which still ensures that the
1818 : // metamorphic tests may propagate them to subprocesses.
1819 1 : if o.private.disableDeleteOnlyCompactions {
1820 1 : fmt.Fprintln(&buf, " disable_delete_only_compactions=true")
1821 1 : }
1822 1 : if o.private.disableElisionOnlyCompactions {
1823 1 : fmt.Fprintln(&buf, " disable_elision_only_compactions=true")
1824 1 : }
1825 1 : if o.private.disableLazyCombinedIteration {
1826 1 : fmt.Fprintln(&buf, " disable_lazy_combined_iteration=true")
1827 1 : }
1828 :
1829 1 : if o.Experimental.ValueSeparationPolicy != nil {
1830 1 : policy := o.Experimental.ValueSeparationPolicy()
1831 1 : if policy.Enabled {
1832 1 : fmt.Fprintln(&buf)
1833 1 : fmt.Fprintln(&buf, "[Value Separation]")
1834 1 : fmt.Fprintf(&buf, " enabled=%t\n", policy.Enabled)
1835 1 : fmt.Fprintf(&buf, " minimum_size=%d\n", policy.MinimumSize)
1836 1 : fmt.Fprintf(&buf, " max_blob_reference_depth=%d\n", policy.MaxBlobReferenceDepth)
1837 1 : fmt.Fprintf(&buf, " rewrite_minimum_age=%s\n", policy.RewriteMinimumAge)
1838 1 : fmt.Fprintf(&buf, " garbage_ratio_low_priority=%.2f\n", policy.GarbageRatioLowPriority)
1839 1 : fmt.Fprintf(&buf, " garbage_ratio_high_priority=%.2f\n", policy.GarbageRatioHighPriority)
1840 1 : }
1841 : }
1842 :
1843 1 : if o.WALFailover != nil {
1844 1 : unhealthyThreshold, _ := o.WALFailover.FailoverOptions.UnhealthyOperationLatencyThreshold()
1845 1 : fmt.Fprintf(&buf, "\n")
1846 1 : fmt.Fprintf(&buf, "[WAL Failover]\n")
1847 1 : fmt.Fprintf(&buf, " secondary_dir=%s\n", o.WALFailover.Secondary.Dirname)
1848 1 : fmt.Fprintf(&buf, " primary_dir_probe_interval=%s\n", o.WALFailover.FailoverOptions.PrimaryDirProbeInterval)
1849 1 : fmt.Fprintf(&buf, " healthy_probe_latency_threshold=%s\n", o.WALFailover.FailoverOptions.HealthyProbeLatencyThreshold)
1850 1 : fmt.Fprintf(&buf, " healthy_interval=%s\n", o.WALFailover.FailoverOptions.HealthyInterval)
1851 1 : fmt.Fprintf(&buf, " unhealthy_sampling_interval=%s\n", o.WALFailover.FailoverOptions.UnhealthySamplingInterval)
1852 1 : fmt.Fprintf(&buf, " unhealthy_operation_latency_threshold=%s\n", unhealthyThreshold)
1853 1 : fmt.Fprintf(&buf, " elevated_write_stall_threshold_lag=%s\n", o.WALFailover.FailoverOptions.ElevatedWriteStallThresholdLag)
1854 1 : }
1855 :
1856 1 : for i := range o.Levels {
1857 1 : l := &o.Levels[i]
1858 1 : fmt.Fprintf(&buf, "\n")
1859 1 : fmt.Fprintf(&buf, "[Level \"%d\"]\n", i)
1860 1 : fmt.Fprintf(&buf, " block_restart_interval=%d\n", l.BlockRestartInterval)
1861 1 : fmt.Fprintf(&buf, " block_size=%d\n", l.BlockSize)
1862 1 : fmt.Fprintf(&buf, " block_size_threshold=%d\n", l.BlockSizeThreshold)
1863 1 : fmt.Fprintf(&buf, " compression=%s\n", l.Compression().Name)
1864 1 : fmt.Fprintf(&buf, " filter_policy=%s\n", l.FilterPolicy.Name())
1865 1 : fmt.Fprintf(&buf, " filter_type=%s\n", l.FilterType)
1866 1 : fmt.Fprintf(&buf, " index_block_size=%d\n", l.IndexBlockSize)
1867 1 : fmt.Fprintf(&buf, " target_file_size=%d\n", o.TargetFileSizes[i])
1868 1 : }
1869 :
1870 1 : return buf.String()
1871 : }
1872 :
1873 : type parseOptionsFuncs struct {
1874 : visitNewSection func(i, j int, section string) error
1875 : visitKeyValue func(i, j int, section, key, value string) error
1876 : visitCommentOrWhitespace func(i, j int, whitespace string) error
1877 : }
1878 :
1879 : // parseOptions takes options serialized by Options.String() and parses them
1880 : // into keys and values. It calls fns.visitNewSection for the beginning of each
1881 : // new section, fns.visitKeyValue for each key-value pair, and
1882 : // visitCommentOrWhitespace for comments and whitespace between key-value pairs.
1883 1 : func parseOptions(s string, fns parseOptionsFuncs) error {
1884 1 : var section, mappedSection string
1885 1 : i := 0
1886 1 : for i < len(s) {
1887 1 : rem := s[i:]
1888 1 : j := strings.IndexByte(rem, '\n')
1889 1 : if j < 0 {
1890 0 : j = len(rem)
1891 1 : } else {
1892 1 : j += 1 // Include the newline.
1893 1 : }
1894 1 : line := strings.TrimSpace(s[i : i+j])
1895 1 : startOff, endOff := i, i+j
1896 1 : i += j
1897 1 :
1898 1 : if len(line) == 0 || line[0] == ';' || line[0] == '#' {
1899 1 : // Skip blank lines and comments.
1900 1 : if fns.visitCommentOrWhitespace != nil {
1901 1 : if err := fns.visitCommentOrWhitespace(startOff, endOff, line); err != nil {
1902 0 : return err
1903 0 : }
1904 : }
1905 1 : continue
1906 : }
1907 1 : n := len(line)
1908 1 : if line[0] == '[' && line[n-1] == ']' {
1909 1 : // Parse section.
1910 1 : section = line[1 : n-1]
1911 1 : // RocksDB uses a similar (INI-style) syntax for the OPTIONS file, but
1912 1 : // different section names and keys. The "CFOptions ..." paths are the
1913 1 : // RocksDB versions which we map to the Pebble paths.
1914 1 : mappedSection = section
1915 1 : if section == `CFOptions "default"` {
1916 0 : mappedSection = "Options"
1917 0 : }
1918 1 : if fns.visitNewSection != nil {
1919 1 : if err := fns.visitNewSection(startOff, endOff, mappedSection); err != nil {
1920 0 : return err
1921 0 : }
1922 : }
1923 1 : continue
1924 : }
1925 :
1926 1 : pos := strings.Index(line, "=")
1927 1 : if pos < 0 {
1928 0 : const maxLen = 50
1929 0 : if len(line) > maxLen {
1930 0 : line = line[:maxLen-3] + "..."
1931 0 : }
1932 0 : return base.CorruptionErrorf("invalid key=value syntax: %q", errors.Safe(line))
1933 : }
1934 :
1935 1 : key := strings.TrimSpace(line[:pos])
1936 1 : value := strings.TrimSpace(line[pos+1:])
1937 1 :
1938 1 : if section == `CFOptions "default"` {
1939 0 : switch key {
1940 0 : case "comparator":
1941 0 : key = "comparer"
1942 0 : case "merge_operator":
1943 0 : key = "merger"
1944 : }
1945 : }
1946 1 : if fns.visitKeyValue != nil {
1947 1 : if err := fns.visitKeyValue(startOff, endOff, mappedSection, key, value); err != nil {
1948 0 : return err
1949 0 : }
1950 : }
1951 : }
1952 1 : return nil
1953 : }
1954 :
1955 : // ParseHooks contains callbacks to create options fields which can have
1956 : // user-defined implementations.
1957 : type ParseHooks struct {
1958 : NewCleaner func(name string) (Cleaner, error)
1959 : NewComparer func(name string) (*Comparer, error)
1960 : NewFilterPolicy func(name string) (FilterPolicy, error)
1961 : NewKeySchema func(name string) (KeySchema, error)
1962 : NewMerger func(name string) (*Merger, error)
1963 : SkipUnknown func(name, value string) bool
1964 : }
1965 :
1966 : // Parse parses the options from the specified string. Note that certain
1967 : // options cannot be parsed into populated fields. For example, comparer and
1968 : // merger.
1969 1 : func (o *Options) Parse(s string, hooks *ParseHooks) error {
1970 1 : var valSepPolicy ValueSeparationPolicy
1971 1 : var concurrencyLimit struct {
1972 1 : lower int
1973 1 : lowerSet bool
1974 1 : upper int
1975 1 : upperSet bool
1976 1 : }
1977 1 :
1978 1 : visitKeyValue := func(i, j int, section, key, value string) error {
1979 1 : // WARNING: DO NOT remove entries from the switches below because doing so
1980 1 : // causes a key previously written to the OPTIONS file to be considered unknown,
1981 1 : // a backwards incompatible change. Instead, leave in support for parsing the
1982 1 : // key but simply don't parse the value.
1983 1 :
1984 1 : parseComparer := func(name string) (*Comparer, error) {
1985 1 : switch name {
1986 0 : case DefaultComparer.Name:
1987 0 : return DefaultComparer, nil
1988 1 : case testkeys.Comparer.Name:
1989 1 : return testkeys.Comparer, nil
1990 1 : default:
1991 1 : if hooks != nil && hooks.NewComparer != nil {
1992 1 : return hooks.NewComparer(name)
1993 1 : }
1994 0 : return nil, nil
1995 : }
1996 : }
1997 :
1998 1 : switch {
1999 1 : case section == "Version":
2000 1 : switch key {
2001 1 : case "pebble_version":
2002 0 : default:
2003 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
2004 0 : return nil
2005 0 : }
2006 0 : return errors.Errorf("pebble: unknown option: %s.%s",
2007 0 : errors.Safe(section), errors.Safe(key))
2008 : }
2009 1 : return nil
2010 :
2011 1 : case section == "Options":
2012 1 : var err error
2013 1 : switch key {
2014 1 : case "bytes_per_sync":
2015 1 : o.BytesPerSync, err = strconv.Atoi(value)
2016 1 : case "cache_size":
2017 1 : o.CacheSize, err = strconv.ParseInt(value, 10, 64)
2018 1 : case "cleaner":
2019 1 : switch value {
2020 1 : case "archive":
2021 1 : o.Cleaner = ArchiveCleaner{}
2022 0 : case "delete":
2023 0 : o.Cleaner = DeleteCleaner{}
2024 0 : default:
2025 0 : if hooks != nil && hooks.NewCleaner != nil {
2026 0 : o.Cleaner, err = hooks.NewCleaner(value)
2027 0 : }
2028 : }
2029 1 : case "comparer":
2030 1 : var comparer *Comparer
2031 1 : comparer, err = parseComparer(value)
2032 1 : if comparer != nil {
2033 1 : o.Comparer = comparer
2034 1 : }
2035 1 : case "compaction_debt_concurrency":
2036 1 : o.Experimental.CompactionDebtConcurrency, err = strconv.ParseUint(value, 10, 64)
2037 1 : case "compaction_garbage_fraction_for_max_concurrency":
2038 1 : var frac float64
2039 1 : frac, err = strconv.ParseFloat(value, 64)
2040 1 : if err == nil {
2041 1 : o.Experimental.CompactionGarbageFractionForMaxConcurrency =
2042 1 : func() float64 { return frac }
2043 : }
2044 0 : case "delete_range_flush_delay":
2045 0 : // NB: This is a deprecated serialization of the
2046 0 : // `flush_delay_delete_range`.
2047 0 : o.FlushDelayDeleteRange, err = time.ParseDuration(value)
2048 1 : case "disable_delete_only_compactions":
2049 1 : o.private.disableDeleteOnlyCompactions, err = strconv.ParseBool(value)
2050 1 : case "disable_elision_only_compactions":
2051 1 : o.private.disableElisionOnlyCompactions, err = strconv.ParseBool(value)
2052 1 : case "disable_ingest_as_flushable":
2053 1 : var v bool
2054 1 : v, err = strconv.ParseBool(value)
2055 1 : if err == nil {
2056 1 : o.Experimental.DisableIngestAsFlushable = func() bool { return v }
2057 : }
2058 1 : case "disable_lazy_combined_iteration":
2059 1 : o.private.disableLazyCombinedIteration, err = strconv.ParseBool(value)
2060 1 : case "disable_wal":
2061 1 : o.DisableWAL, err = strconv.ParseBool(value)
2062 0 : case "enable_columnar_blocks":
2063 : // Do nothing; option existed in older versions of pebble.
2064 1 : case "flush_delay_delete_range":
2065 1 : o.FlushDelayDeleteRange, err = time.ParseDuration(value)
2066 1 : case "flush_delay_range_key":
2067 1 : o.FlushDelayRangeKey, err = time.ParseDuration(value)
2068 1 : case "flush_split_bytes":
2069 1 : o.FlushSplitBytes, err = strconv.ParseInt(value, 10, 64)
2070 1 : case "format_major_version":
2071 1 : // NB: The version written here may be stale. Open does
2072 1 : // not use the format major version encoded in the
2073 1 : // OPTIONS file other than to validate that the encoded
2074 1 : // version is valid right here.
2075 1 : var v uint64
2076 1 : v, err = strconv.ParseUint(value, 10, 64)
2077 1 : if vers := FormatMajorVersion(v); vers > internalFormatNewest || vers == FormatDefault {
2078 0 : err = errors.Newf("unsupported format major version %d", o.FormatMajorVersion)
2079 0 : }
2080 1 : if err == nil {
2081 1 : o.FormatMajorVersion = FormatMajorVersion(v)
2082 1 : }
2083 1 : case "key_schema":
2084 1 : o.KeySchema = value
2085 1 : if o.KeySchemas == nil {
2086 0 : o.KeySchemas = make(map[string]*KeySchema)
2087 0 : }
2088 1 : if _, ok := o.KeySchemas[o.KeySchema]; !ok {
2089 0 : if strings.HasPrefix(value, "DefaultKeySchema(") && strings.HasSuffix(value, ")") {
2090 0 : argsStr := strings.TrimSuffix(strings.TrimPrefix(value, "DefaultKeySchema("), ")")
2091 0 : args := strings.FieldsFunc(argsStr, func(r rune) bool {
2092 0 : return unicode.IsSpace(r) || r == ','
2093 0 : })
2094 0 : var comparer *base.Comparer
2095 0 : var bundleSize int
2096 0 : comparer, err = parseComparer(args[0])
2097 0 : if err == nil {
2098 0 : bundleSize, err = strconv.Atoi(args[1])
2099 0 : }
2100 0 : if err == nil {
2101 0 : schema := colblk.DefaultKeySchema(comparer, bundleSize)
2102 0 : o.KeySchema = schema.Name
2103 0 : o.KeySchemas[o.KeySchema] = &schema
2104 0 : }
2105 0 : } else if hooks != nil && hooks.NewKeySchema != nil {
2106 0 : var schema KeySchema
2107 0 : schema, err = hooks.NewKeySchema(value)
2108 0 : if err == nil {
2109 0 : o.KeySchemas[value] = &schema
2110 0 : }
2111 : }
2112 : }
2113 1 : case "l0_compaction_concurrency":
2114 1 : o.Experimental.L0CompactionConcurrency, err = strconv.Atoi(value)
2115 1 : case "l0_compaction_file_threshold":
2116 1 : o.L0CompactionFileThreshold, err = strconv.Atoi(value)
2117 1 : case "l0_compaction_threshold":
2118 1 : o.L0CompactionThreshold, err = strconv.Atoi(value)
2119 1 : case "l0_stop_writes_threshold":
2120 1 : o.L0StopWritesThreshold, err = strconv.Atoi(value)
2121 0 : case "l0_sublevel_compactions":
2122 : // Do nothing; option existed in older versions of pebble.
2123 1 : case "lbase_max_bytes":
2124 1 : o.LBaseMaxBytes, err = strconv.ParseInt(value, 10, 64)
2125 1 : case "level_multiplier":
2126 1 : o.Experimental.LevelMultiplier, err = strconv.Atoi(value)
2127 1 : case "concurrent_compactions":
2128 1 : concurrencyLimit.lowerSet = true
2129 1 : concurrencyLimit.lower, err = strconv.Atoi(value)
2130 1 : case "max_concurrent_compactions":
2131 1 : concurrencyLimit.upperSet = true
2132 1 : concurrencyLimit.upper, err = strconv.Atoi(value)
2133 1 : case "max_concurrent_downloads":
2134 1 : var concurrentDownloads int
2135 1 : concurrentDownloads, err = strconv.Atoi(value)
2136 1 : if concurrentDownloads <= 0 {
2137 0 : err = errors.New("max_concurrent_compactions cannot be <= 0")
2138 1 : } else {
2139 1 : o.MaxConcurrentDownloads = func() int { return concurrentDownloads }
2140 : }
2141 1 : case "max_manifest_file_size":
2142 1 : o.MaxManifestFileSize, err = strconv.ParseInt(value, 10, 64)
2143 1 : case "max_open_files":
2144 1 : o.MaxOpenFiles, err = strconv.Atoi(value)
2145 1 : case "mem_table_size":
2146 1 : o.MemTableSize, err = strconv.ParseUint(value, 10, 64)
2147 1 : case "mem_table_stop_writes_threshold":
2148 1 : o.MemTableStopWritesThreshold, err = strconv.Atoi(value)
2149 0 : case "min_compaction_rate":
2150 : // Do nothing; option existed in older versions of pebble, and
2151 : // may be meaningful again eventually.
2152 1 : case "min_deletion_rate":
2153 1 : var rate uint64
2154 1 : rate, err = strconv.ParseUint(value, 10, 64)
2155 1 : if err == nil {
2156 1 : o.DeletionPacing.BaselineRate = func() uint64 { return rate }
2157 : }
2158 1 : case "free_space_threshold_bytes":
2159 1 : o.DeletionPacing.FreeSpaceThresholdBytes, err = strconv.ParseUint(value, 10, 64)
2160 1 : case "free_space_timeframe":
2161 1 : o.DeletionPacing.FreeSpaceTimeframe, err = time.ParseDuration(value)
2162 0 : case "obsolete_bytes_max_ratio":
2163 : // No longer used.
2164 1 : case "obsolete_bytes_timeframe":
2165 1 : o.DeletionPacing.BacklogTimeframe, err = time.ParseDuration(value)
2166 0 : case "min_flush_rate":
2167 : // Do nothing; option existed in older versions of pebble, and
2168 : // may be meaningful again eventually.
2169 1 : case "multilevel_compaction_heuristic":
2170 1 : switch {
2171 1 : case value == "none":
2172 1 : o.Experimental.MultiLevelCompactionHeuristic = OptionNoMultiLevel
2173 1 : case strings.HasPrefix(value, "wamp"):
2174 1 : o.Experimental.MultiLevelCompactionHeuristic = OptionWriteAmpHeuristic
2175 1 : fields := strings.FieldsFunc(strings.TrimPrefix(value, "wamp"), func(r rune) bool {
2176 1 : return unicode.IsSpace(r) || r == ',' || r == '(' || r == ')'
2177 1 : })
2178 1 : if len(fields) != 2 {
2179 0 : err = errors.Newf("require 2 arguments")
2180 0 : }
2181 1 : var h WriteAmpHeuristic
2182 1 : if err == nil {
2183 1 : h.AddPropensity, err = strconv.ParseFloat(fields[0], 64)
2184 1 : }
2185 1 : if err == nil {
2186 1 : h.AllowL0, err = strconv.ParseBool(fields[1])
2187 1 : }
2188 :
2189 1 : if err == nil {
2190 1 : if h.AllowL0 || h.AddPropensity != 0 {
2191 1 : o.Experimental.MultiLevelCompactionHeuristic = func() MultiLevelHeuristic {
2192 1 : return &h
2193 1 : }
2194 : }
2195 0 : } else {
2196 0 : err = errors.Wrapf(err, "unexpected wamp heuristic arguments: %s", value)
2197 0 : }
2198 0 : default:
2199 0 : err = errors.Newf("unrecognized multilevel compaction heuristic: %s", value)
2200 : }
2201 0 : case "point_tombstone_weight":
2202 : // Do nothing; deprecated.
2203 1 : case "strict_wal_tail":
2204 1 : var strictWALTail bool
2205 1 : strictWALTail, err = strconv.ParseBool(value)
2206 1 : if err == nil && !strictWALTail {
2207 0 : err = errors.Newf("reading from versions with strict_wal_tail=false no longer supported")
2208 0 : }
2209 1 : case "merger":
2210 1 : switch value {
2211 0 : case "nullptr":
2212 0 : o.Merger = nil
2213 1 : case "pebble.concatenate":
2214 1 : o.Merger = DefaultMerger
2215 0 : default:
2216 0 : if hooks != nil && hooks.NewMerger != nil {
2217 0 : o.Merger, err = hooks.NewMerger(value)
2218 0 : }
2219 : }
2220 1 : case "read_compaction_rate":
2221 1 : o.Experimental.ReadCompactionRate, err = strconv.ParseInt(value, 10, 64)
2222 1 : case "read_sampling_multiplier":
2223 1 : o.Experimental.ReadSamplingMultiplier, err = strconv.ParseInt(value, 10, 64)
2224 1 : case "num_deletions_threshold":
2225 1 : o.Experimental.NumDeletionsThreshold, err = strconv.Atoi(value)
2226 1 : case "deletion_size_ratio_threshold":
2227 1 : val, parseErr := strconv.ParseFloat(value, 32)
2228 1 : o.Experimental.DeletionSizeRatioThreshold = float32(val)
2229 1 : err = parseErr
2230 1 : case "tombstone_dense_compaction_threshold":
2231 1 : var threshold float64
2232 1 : threshold, err = strconv.ParseFloat(value, 64)
2233 1 : if err == nil {
2234 1 : o.Experimental.TombstoneDenseCompactionThreshold = func() float64 { return threshold }
2235 : }
2236 1 : case "table_cache_shards":
2237 1 : o.Experimental.FileCacheShards, err = strconv.Atoi(value)
2238 0 : case "table_format":
2239 0 : switch value {
2240 0 : case "leveldb":
2241 0 : case "rocksdbv2":
2242 0 : default:
2243 0 : return errors.Errorf("pebble: unknown table format: %q", errors.Safe(value))
2244 : }
2245 0 : case "table_property_collectors":
2246 : // No longer implemented; ignore.
2247 1 : case "validate_on_ingest":
2248 1 : o.Experimental.ValidateOnIngest, err = strconv.ParseBool(value)
2249 1 : case "wal_dir":
2250 1 : o.WALDir = value
2251 1 : case "wal_bytes_per_sync":
2252 1 : o.WALBytesPerSync, err = strconv.Atoi(value)
2253 0 : case "max_writer_concurrency":
2254 : // No longer implemented; ignore.
2255 0 : case "force_writer_parallelism":
2256 : // No longer implemented; ignore.
2257 1 : case "secondary_cache_size_bytes":
2258 1 : o.Experimental.SecondaryCacheSizeBytes, err = strconv.ParseInt(value, 10, 64)
2259 1 : case "create_on_shared":
2260 1 : var createOnSharedInt int64
2261 1 : createOnSharedInt, err = strconv.ParseInt(value, 10, 64)
2262 1 : o.Experimental.CreateOnShared = remote.CreateOnSharedStrategy(createOnSharedInt)
2263 1 : case "iterator_tracking_poll_interval":
2264 1 : o.Experimental.IteratorTracking.PollInterval, err = time.ParseDuration(value)
2265 1 : case "iterator_tracking_max_age":
2266 1 : o.Experimental.IteratorTracking.MaxAge, err = time.ParseDuration(value)
2267 0 : default:
2268 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
2269 0 : return nil
2270 0 : }
2271 0 : return errors.Errorf("pebble: unknown option: %s.%s",
2272 0 : errors.Safe(section), errors.Safe(key))
2273 : }
2274 1 : return err
2275 :
2276 1 : case section == "Value Separation":
2277 1 : var err error
2278 1 : switch key {
2279 1 : case "enabled":
2280 1 : valSepPolicy.Enabled, err = strconv.ParseBool(value)
2281 1 : case "minimum_size":
2282 1 : var minimumSize int
2283 1 : minimumSize, err = strconv.Atoi(value)
2284 1 : valSepPolicy.MinimumSize = minimumSize
2285 1 : case "max_blob_reference_depth":
2286 1 : valSepPolicy.MaxBlobReferenceDepth, err = strconv.Atoi(value)
2287 1 : case "rewrite_minimum_age":
2288 1 : valSepPolicy.RewriteMinimumAge, err = time.ParseDuration(value)
2289 1 : case "target_garbage_ratio", "garbage_ratio_low_priority":
2290 1 : // NB: "target_garbage_ratio" is a deprecated name for the same field.
2291 1 : valSepPolicy.GarbageRatioLowPriority, err = strconv.ParseFloat(value, 64)
2292 1 : case "garbage_ratio_high_priority":
2293 1 : valSepPolicy.GarbageRatioHighPriority, err = strconv.ParseFloat(value, 64)
2294 0 : default:
2295 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
2296 0 : return nil
2297 0 : }
2298 0 : return errors.Errorf("pebble: unknown option: %s.%s", errors.Safe(section), errors.Safe(key))
2299 : }
2300 1 : return err
2301 :
2302 1 : case section == "WAL Failover":
2303 1 : if o.WALFailover == nil {
2304 1 : o.WALFailover = new(WALFailoverOptions)
2305 1 : }
2306 1 : var err error
2307 1 : switch key {
2308 1 : case "secondary_dir":
2309 1 : o.WALFailover.Secondary = wal.Dir{Dirname: value, FS: vfs.Default}
2310 1 : case "primary_dir_probe_interval":
2311 1 : o.WALFailover.PrimaryDirProbeInterval, err = time.ParseDuration(value)
2312 1 : case "healthy_probe_latency_threshold":
2313 1 : o.WALFailover.HealthyProbeLatencyThreshold, err = time.ParseDuration(value)
2314 1 : case "healthy_interval":
2315 1 : o.WALFailover.HealthyInterval, err = time.ParseDuration(value)
2316 1 : case "unhealthy_sampling_interval":
2317 1 : o.WALFailover.UnhealthySamplingInterval, err = time.ParseDuration(value)
2318 1 : case "unhealthy_operation_latency_threshold":
2319 1 : var threshold time.Duration
2320 1 : threshold, err = time.ParseDuration(value)
2321 1 : o.WALFailover.UnhealthyOperationLatencyThreshold = func() (time.Duration, bool) {
2322 1 : return threshold, true
2323 1 : }
2324 1 : case "elevated_write_stall_threshold_lag":
2325 1 : o.WALFailover.ElevatedWriteStallThresholdLag, err = time.ParseDuration(value)
2326 0 : default:
2327 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
2328 0 : return nil
2329 0 : }
2330 0 : return errors.Errorf("pebble: unknown option: %s.%s",
2331 0 : errors.Safe(section), errors.Safe(key))
2332 : }
2333 1 : return err
2334 :
2335 1 : case strings.HasPrefix(section, "Level "):
2336 1 : m := regexp.MustCompile(`Level\s*"?(\d+)"?\s*$`).FindStringSubmatch(section)
2337 1 : if m == nil {
2338 0 : return errors.Errorf("pebble: unknown section: %q", errors.Safe(section))
2339 0 : }
2340 1 : index, err := strconv.Atoi(m[1])
2341 1 : if err != nil || index < 0 || index >= len(o.Levels) {
2342 0 : return errors.Errorf("pebble: invalid level index: %q", errors.Safe(m[1]))
2343 0 : }
2344 :
2345 1 : l := &o.Levels[index]
2346 1 :
2347 1 : switch key {
2348 1 : case "block_restart_interval":
2349 1 : l.BlockRestartInterval, err = strconv.Atoi(value)
2350 1 : case "block_size":
2351 1 : l.BlockSize, err = strconv.Atoi(value)
2352 1 : case "block_size_threshold":
2353 1 : l.BlockSizeThreshold, err = strconv.Atoi(value)
2354 1 : case "compression":
2355 1 : profile := block.CompressionProfileByName(value)
2356 1 : if profile == nil {
2357 0 : return errors.Errorf("pebble: unknown compression: %q", errors.Safe(value))
2358 0 : }
2359 1 : l.Compression = func() *sstable.CompressionProfile { return profile }
2360 1 : case "filter_policy":
2361 1 : if hooks != nil && hooks.NewFilterPolicy != nil {
2362 1 : l.FilterPolicy, err = hooks.NewFilterPolicy(value)
2363 1 : } else {
2364 0 : l.FilterPolicy = NoFilterPolicy
2365 0 : }
2366 1 : case "filter_type":
2367 1 : switch value {
2368 1 : case "table":
2369 1 : l.FilterType = TableFilter
2370 0 : default:
2371 0 : return errors.Errorf("pebble: unknown filter type: %q", errors.Safe(value))
2372 : }
2373 1 : case "index_block_size":
2374 1 : l.IndexBlockSize, err = strconv.Atoi(value)
2375 1 : case "target_file_size":
2376 1 : o.TargetFileSizes[index], err = strconv.ParseInt(value, 10, 64)
2377 0 : default:
2378 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
2379 0 : return nil
2380 0 : }
2381 0 : return errors.Errorf("pebble: unknown option: %s.%s", errors.Safe(section), errors.Safe(key))
2382 : }
2383 1 : return err
2384 : }
2385 1 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
2386 1 : return nil
2387 1 : }
2388 0 : return errors.Errorf("pebble: unknown section %q or key %q", errors.Safe(section), errors.Safe(key))
2389 : }
2390 1 : err := parseOptions(s, parseOptionsFuncs{
2391 1 : visitKeyValue: visitKeyValue,
2392 1 : })
2393 1 : if err != nil {
2394 0 : return err
2395 0 : }
2396 1 : o.Experimental.ValueSeparationPolicy = func() ValueSeparationPolicy { return valSepPolicy }
2397 1 : if concurrencyLimit.lowerSet || concurrencyLimit.upperSet {
2398 1 : if !concurrencyLimit.lowerSet {
2399 0 : concurrencyLimit.lower = 1
2400 1 : } else if concurrencyLimit.lower < 1 {
2401 0 : return errors.New("baseline_concurrent_compactions cannot be <= 0")
2402 0 : }
2403 1 : if !concurrencyLimit.upperSet {
2404 0 : concurrencyLimit.upper = concurrencyLimit.lower
2405 1 : } else if concurrencyLimit.upper < concurrencyLimit.lower {
2406 0 : return errors.Newf("max_concurrent_compactions cannot be < %d", concurrencyLimit.lower)
2407 0 : }
2408 1 : o.CompactionConcurrencyRange = func() (int, int) {
2409 1 : return concurrencyLimit.lower, concurrencyLimit.upper
2410 1 : }
2411 : }
2412 1 : return nil
2413 : }
2414 :
2415 : // ErrMissingWALRecoveryDir is an error returned when a database is attempted to be
2416 : // opened without supplying a Options.WALRecoveryDir entry for a directory that
2417 : // may contain WALs required to recover a consistent database state.
2418 : type ErrMissingWALRecoveryDir struct {
2419 : Dir string
2420 : ExtraInfo string
2421 : }
2422 :
2423 : // Error implements error.
2424 0 : func (e ErrMissingWALRecoveryDir) Error() string {
2425 0 : return fmt.Sprintf("directory %q may contain relevant WALs but is not in WALRecoveryDirs%s", e.Dir, e.ExtraInfo)
2426 0 : }
2427 :
2428 : // CheckCompatibility verifies the options are compatible with the previous options
2429 : // serialized by Options.String(). For example, the Comparer and Merger must be
2430 : // the same, or data will not be able to be properly read from the DB.
2431 : //
2432 : // This function only looks at specific keys and does not error out if the
2433 : // options are newer and contain unknown keys.
2434 1 : func (o *Options) CheckCompatibility(storeDir string, previousOptions string) error {
2435 1 : previousWALDir := ""
2436 1 :
2437 1 : visitKeyValue := func(i, j int, section, key, value string) error {
2438 1 : switch section + "." + key {
2439 1 : case "Options.comparer":
2440 1 : if value != o.Comparer.Name {
2441 0 : return errors.Errorf("pebble: comparer name from file %q != comparer name from options %q",
2442 0 : errors.Safe(value), errors.Safe(o.Comparer.Name))
2443 0 : }
2444 1 : case "Options.merger":
2445 1 : // RocksDB allows the merge operator to be unspecified, in which case it
2446 1 : // shows up as "nullptr".
2447 1 : if value != "nullptr" && value != o.Merger.Name {
2448 0 : return errors.Errorf("pebble: merger name from file %q != merger name from options %q",
2449 0 : errors.Safe(value), errors.Safe(o.Merger.Name))
2450 0 : }
2451 1 : case "Options.wal_dir":
2452 1 : previousWALDir = value
2453 1 : case "WAL Failover.secondary_dir":
2454 1 : previousWALSecondaryDir := value
2455 1 : if err := o.checkWALDir(storeDir, previousWALSecondaryDir, "WALFailover.Secondary changed from previous options"); err != nil {
2456 0 : return err
2457 0 : }
2458 : }
2459 1 : return nil
2460 : }
2461 1 : if err := parseOptions(previousOptions, parseOptionsFuncs{visitKeyValue: visitKeyValue}); err != nil {
2462 0 : return err
2463 0 : }
2464 1 : if err := o.checkWALDir(storeDir, previousWALDir, "WALDir changed from previous options"); err != nil {
2465 0 : return err
2466 0 : }
2467 1 : return nil
2468 : }
2469 :
2470 : // checkWALDir verifies that walDir is among o.WALDir, o.WALFailover.Secondary,
2471 : // or o.WALRecoveryDirs. An empty "walDir" maps to the storeDir.
2472 1 : func (o *Options) checkWALDir(storeDir, walDir, errContext string) error {
2473 1 : walPath := resolveStorePath(storeDir, walDir)
2474 1 : if walDir == "" {
2475 1 : walPath = storeDir
2476 1 : }
2477 :
2478 1 : if o.WALDir == "" {
2479 1 : if walPath == storeDir {
2480 1 : return nil
2481 1 : }
2482 1 : } else {
2483 1 : if walPath == resolveStorePath(storeDir, o.WALDir) {
2484 1 : return nil
2485 1 : }
2486 : }
2487 :
2488 1 : if o.WALFailover != nil && walPath == resolveStorePath(storeDir, o.WALFailover.Secondary.Dirname) {
2489 1 : return nil
2490 1 : }
2491 :
2492 0 : for _, d := range o.WALRecoveryDirs {
2493 0 : // TODO(radu): should we also check that d.FS is the same as walDir's FS?
2494 0 : if walPath == resolveStorePath(storeDir, d.Dirname) {
2495 0 : return nil
2496 0 : }
2497 : }
2498 :
2499 0 : if o.Unsafe.AllowMissingWALDirs {
2500 0 : o.Logger.Infof("directory %q may contain relevant WALs but is not in WALRecoveryDirs (AllowMissingWALDirs enabled)", walDir)
2501 0 : return nil
2502 0 : }
2503 :
2504 0 : var buf bytes.Buffer
2505 0 : fmt.Fprintf(&buf, "\n %s\n", errContext)
2506 0 : fmt.Fprintf(&buf, " o.WALDir: %q\n", o.WALDir)
2507 0 : if o.WALFailover != nil {
2508 0 : fmt.Fprintf(&buf, " o.WALFailover.Secondary.Dirname: %q\n", o.WALFailover.Secondary.Dirname)
2509 0 : }
2510 0 : fmt.Fprintf(&buf, " o.WALRecoveryDirs: %d", len(o.WALRecoveryDirs))
2511 0 : for _, d := range o.WALRecoveryDirs {
2512 0 : fmt.Fprintf(&buf, "\n %q", d.Dirname)
2513 0 : }
2514 :
2515 0 : return ErrMissingWALRecoveryDir{Dir: walPath, ExtraInfo: buf.String()}
2516 : }
2517 :
2518 : // Validate verifies that the options are mutually consistent. For example,
2519 : // L0StopWritesThreshold must be >= L0CompactionThreshold, otherwise a write
2520 : // stall would persist indefinitely.
2521 1 : func (o *Options) Validate() error {
2522 1 : // Note that we can presume Options.EnsureDefaults has been called, so there
2523 1 : // is no need to check for zero values.
2524 1 :
2525 1 : var buf strings.Builder
2526 1 : if o.Experimental.L0CompactionConcurrency < 1 {
2527 0 : fmt.Fprintf(&buf, "L0CompactionConcurrency (%d) must be >= 1\n",
2528 0 : o.Experimental.L0CompactionConcurrency)
2529 0 : }
2530 1 : if o.L0StopWritesThreshold < o.L0CompactionThreshold {
2531 0 : fmt.Fprintf(&buf, "L0StopWritesThreshold (%d) must be >= L0CompactionThreshold (%d)\n",
2532 0 : o.L0StopWritesThreshold, o.L0CompactionThreshold)
2533 0 : }
2534 1 : if uint64(o.MemTableSize) >= maxMemTableSize {
2535 0 : fmt.Fprintf(&buf, "MemTableSize (%s) must be < %s\n",
2536 0 : humanize.Bytes.Uint64(uint64(o.MemTableSize)), humanize.Bytes.Uint64(maxMemTableSize))
2537 0 : }
2538 1 : if o.MemTableStopWritesThreshold < 2 {
2539 0 : fmt.Fprintf(&buf, "MemTableStopWritesThreshold (%d) must be >= 2\n",
2540 0 : o.MemTableStopWritesThreshold)
2541 0 : }
2542 1 : if o.FormatMajorVersion < FormatMinSupported || o.FormatMajorVersion > internalFormatNewest {
2543 0 : fmt.Fprintf(&buf, "FormatMajorVersion (%d) must be between %d and %d\n",
2544 0 : o.FormatMajorVersion, FormatMinSupported, internalFormatNewest)
2545 0 : }
2546 1 : if o.Experimental.CreateOnShared != remote.CreateOnSharedNone && o.FormatMajorVersion < FormatMinForSharedObjects {
2547 0 : fmt.Fprintf(&buf, "FormatMajorVersion (%d) when CreateOnShared is set must be at least %d\n",
2548 0 : o.FormatMajorVersion, FormatMinForSharedObjects)
2549 0 : }
2550 1 : if len(o.KeySchemas) > 0 {
2551 1 : if o.KeySchema == "" {
2552 0 : fmt.Fprintf(&buf, "KeySchemas is set but KeySchema is not\n")
2553 0 : }
2554 1 : if _, ok := o.KeySchemas[o.KeySchema]; !ok {
2555 0 : fmt.Fprintf(&buf, "KeySchema %q not found in KeySchemas\n", o.KeySchema)
2556 0 : }
2557 : }
2558 1 : if policy := o.Experimental.ValueSeparationPolicy(); policy.Enabled {
2559 1 : if policy.MinimumSize <= 0 {
2560 0 : fmt.Fprintf(&buf, "ValueSeparationPolicy.MinimumSize (%d) must be > 0\n", policy.MinimumSize)
2561 0 : }
2562 1 : if policy.MaxBlobReferenceDepth <= 0 {
2563 0 : fmt.Fprintf(&buf, "ValueSeparationPolicy.MaxBlobReferenceDepth (%d) must be > 0\n", policy.MaxBlobReferenceDepth)
2564 0 : }
2565 1 : if policy.GarbageRatioHighPriority < policy.GarbageRatioLowPriority {
2566 0 : fmt.Fprintf(&buf, "ValueSeparationPolicy.GarbageRatioHighPriority (%f) must be >= ValueSeparationPolicy.GarbageRatioLowPriority (%f)\n",
2567 0 : policy.GarbageRatioHighPriority, policy.GarbageRatioLowPriority)
2568 0 : }
2569 : }
2570 :
2571 1 : if buf.Len() == 0 {
2572 1 : return nil
2573 1 : }
2574 0 : return errors.New(buf.String())
2575 : }
2576 :
2577 : // MakeReaderOptions constructs sstable.ReaderOptions from the corresponding
2578 : // options in the receiver.
2579 1 : func (o *Options) MakeReaderOptions() sstable.ReaderOptions {
2580 1 : return sstable.ReaderOptions{
2581 1 : Comparer: o.Comparer,
2582 1 : Filters: o.Filters,
2583 1 : KeySchemas: o.KeySchemas,
2584 1 : Merger: o.Merger,
2585 1 : ReaderOptions: block.ReaderOptions{
2586 1 : LoadBlockSema: o.LoadBlockSema,
2587 1 : LoggerAndTracer: o.LoggerAndTracer,
2588 1 : },
2589 1 : }
2590 1 : }
2591 :
2592 : // MakeWriterOptions constructs sstable.WriterOptions for the specified level
2593 : // from the corresponding options in the receiver.
2594 1 : func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstable.WriterOptions {
2595 1 : writerOpts := sstable.WriterOptions{
2596 1 : TableFormat: format,
2597 1 : Comparer: o.Comparer,
2598 1 : BlockPropertyCollectors: o.BlockPropertyCollectors,
2599 1 : AllocatorSizeClasses: o.AllocatorSizeClasses,
2600 1 : NumDeletionsThreshold: o.Experimental.NumDeletionsThreshold,
2601 1 : DeletionSizeRatioThreshold: o.Experimental.DeletionSizeRatioThreshold,
2602 1 : }
2603 1 : if o.Merger != nil {
2604 1 : writerOpts.MergerName = o.Merger.Name
2605 1 : }
2606 1 : if o.KeySchema != "" {
2607 1 : var ok bool
2608 1 : writerOpts.KeySchema, ok = o.KeySchemas[o.KeySchema]
2609 1 : if !ok {
2610 0 : panic(fmt.Sprintf("invalid schema %q", redact.Safe(o.KeySchema)))
2611 : }
2612 : }
2613 1 : if format >= sstable.TableFormatPebblev3 {
2614 1 : writerOpts.ShortAttributeExtractor = o.Experimental.ShortAttributeExtractor
2615 1 : if format >= sstable.TableFormatPebblev4 && level == numLevels-1 {
2616 1 : writerOpts.WritingToLowestLevel = true
2617 1 : }
2618 : }
2619 1 : levelOpts := o.Levels[level]
2620 1 : writerOpts.BlockRestartInterval = levelOpts.BlockRestartInterval
2621 1 : writerOpts.BlockSize = levelOpts.BlockSize
2622 1 : writerOpts.BlockSizeThreshold = levelOpts.BlockSizeThreshold
2623 1 : writerOpts.Compression = levelOpts.Compression()
2624 1 : writerOpts.FilterPolicy = levelOpts.FilterPolicy
2625 1 : writerOpts.FilterType = levelOpts.FilterType
2626 1 : writerOpts.IndexBlockSize = levelOpts.IndexBlockSize
2627 1 : return writerOpts
2628 : }
2629 :
2630 : // makeWriterOptions constructs sstable.WriterOptions for the specified level
2631 : // using the current DB options and format.
2632 1 : func (d *DB) makeWriterOptions(level int) sstable.WriterOptions {
2633 1 : return d.opts.MakeWriterOptions(level, d.TableFormat())
2634 1 : }
2635 :
2636 : // makeBlobWriterOptions constructs blob.FileWriterOptions using the current DB
2637 : // options and format.
2638 1 : func (d *DB) makeBlobWriterOptions(level int) blob.FileWriterOptions {
2639 1 : lo := &d.opts.Levels[level]
2640 1 : return blob.FileWriterOptions{
2641 1 : Format: d.BlobFileFormat(),
2642 1 : Compression: lo.Compression(),
2643 1 : ChecksumType: block.ChecksumTypeCRC32c,
2644 1 : FlushGovernor: block.MakeFlushGovernor(
2645 1 : lo.BlockSize,
2646 1 : lo.BlockSizeThreshold,
2647 1 : base.SizeClassAwareBlockSizeThreshold,
2648 1 : d.opts.AllocatorSizeClasses,
2649 1 : ),
2650 1 : }
2651 1 : }
2652 :
2653 1 : func (o *Options) MakeObjStorageProviderSettings(dirname string) objstorageprovider.Settings {
2654 1 : s := objstorageprovider.Settings{
2655 1 : Logger: o.Logger,
2656 1 : FS: o.FS,
2657 1 : FSDirName: dirname,
2658 1 : FSCleaner: o.Cleaner,
2659 1 : NoSyncOnClose: o.NoSyncOnClose,
2660 1 : BytesPerSync: o.BytesPerSync,
2661 1 : }
2662 1 : s.Local.ReadaheadConfig = o.Local.ReadaheadConfig
2663 1 : s.Remote.StorageFactory = o.Experimental.RemoteStorage
2664 1 : s.Remote.CreateOnShared = o.Experimental.CreateOnShared
2665 1 : s.Remote.CreateOnSharedLocator = o.Experimental.CreateOnSharedLocator
2666 1 : s.Remote.CacheSizeBytes = o.Experimental.SecondaryCacheSizeBytes
2667 1 : return s
2668 1 : }
2669 :
2670 : // UserKeyCategories describes a partitioning of the user key space. Each
2671 : // partition is a category with a name. The categories are used for informative
2672 : // purposes only (like pprof labels). Pebble does not treat keys differently
2673 : // based on the UserKeyCategories.
2674 : //
2675 : // The partitions are defined by their upper bounds. The last partition is
2676 : // assumed to go until the end of keyspace; its UpperBound is ignored. The rest
2677 : // of the partitions are ordered by their UpperBound.
2678 : type UserKeyCategories struct {
2679 : categories []UserKeyCategory
2680 : cmp base.Compare
2681 : // rangeNames[i][j] contains the string referring to the categories in the
2682 : // range [i, j], with j > i.
2683 : rangeNames [][]string
2684 : }
2685 :
2686 : // UserKeyCategory describes a partition of the user key space.
2687 : //
2688 : // User keys >= the previous category's UpperBound and < this category's
2689 : // UpperBound are part of this category.
2690 : type UserKeyCategory struct {
2691 : Name string
2692 : // UpperBound is the exclusive upper bound of the category. All user keys >= the
2693 : // previous category's UpperBound and < this UpperBound are part of this
2694 : // category.
2695 : UpperBound []byte
2696 : }
2697 :
2698 : // MakeUserKeyCategories creates a UserKeyCategories object with the given
2699 : // categories. The object is immutable and can be reused across different
2700 : // stores.
2701 0 : func MakeUserKeyCategories(cmp base.Compare, categories ...UserKeyCategory) UserKeyCategories {
2702 0 : n := len(categories)
2703 0 : if n == 0 {
2704 0 : return UserKeyCategories{}
2705 0 : }
2706 0 : if categories[n-1].UpperBound != nil {
2707 0 : panic("last category UpperBound must be nil")
2708 : }
2709 : // Verify that the partitions are ordered as expected.
2710 0 : for i := 1; i < n-1; i++ {
2711 0 : if cmp(categories[i-1].UpperBound, categories[i].UpperBound) >= 0 {
2712 0 : panic("invalid UserKeyCategories: key prefixes must be sorted")
2713 : }
2714 : }
2715 :
2716 : // Precalculate a table of range names to avoid allocations in the
2717 : // categorization path.
2718 0 : rangeNamesBuf := make([]string, n*n)
2719 0 : rangeNames := make([][]string, n)
2720 0 : for i := range rangeNames {
2721 0 : rangeNames[i] = rangeNamesBuf[:n]
2722 0 : rangeNamesBuf = rangeNamesBuf[n:]
2723 0 : for j := i + 1; j < n; j++ {
2724 0 : rangeNames[i][j] = categories[i].Name + "-" + categories[j].Name
2725 0 : }
2726 : }
2727 0 : return UserKeyCategories{
2728 0 : categories: categories,
2729 0 : cmp: cmp,
2730 0 : rangeNames: rangeNames,
2731 0 : }
2732 : }
2733 :
2734 : // Len returns the number of categories defined.
2735 1 : func (kc *UserKeyCategories) Len() int {
2736 1 : return len(kc.categories)
2737 1 : }
2738 :
2739 : // CategorizeKey returns the name of the category containing the key.
2740 0 : func (kc *UserKeyCategories) CategorizeKey(userKey []byte) string {
2741 0 : idx := sort.Search(len(kc.categories)-1, func(i int) bool {
2742 0 : return kc.cmp(userKey, kc.categories[i].UpperBound) < 0
2743 0 : })
2744 0 : return kc.categories[idx].Name
2745 : }
2746 :
2747 : // CategorizeKeyRange returns the name of the category containing the key range.
2748 : // If the key range spans multiple categories, the result shows the first and
2749 : // last category separated by a dash, e.g. `cat1-cat5`.
2750 0 : func (kc *UserKeyCategories) CategorizeKeyRange(startUserKey, endUserKey []byte) string {
2751 0 : n := len(kc.categories)
2752 0 : p := sort.Search(n-1, func(i int) bool {
2753 0 : return kc.cmp(startUserKey, kc.categories[i].UpperBound) < 0
2754 0 : })
2755 0 : if p == n-1 || kc.cmp(endUserKey, kc.categories[p].UpperBound) < 0 {
2756 0 : // Fast path for a single category.
2757 0 : return kc.categories[p].Name
2758 0 : }
2759 : // Binary search among the remaining categories.
2760 0 : q := p + 1 + sort.Search(n-2-p, func(i int) bool {
2761 0 : return kc.cmp(endUserKey, kc.categories[p+1+i].UpperBound) < 0
2762 0 : })
2763 0 : return kc.rangeNames[p][q]
2764 : }
2765 :
2766 : const storePathIdentifier = "{store_path}"
2767 :
2768 : // MakeStoreRelativePath takes a path that is relative to the store directory
2769 : // and creates a path that can be used for Options.WALDir and wal.Dir.Dirname.
2770 : //
2771 : // This is used in metamorphic tests, so that the test run directory can be
2772 : // copied or moved.
2773 0 : func MakeStoreRelativePath(fs vfs.FS, relativePath string) string {
2774 0 : if relativePath == "" {
2775 0 : return storePathIdentifier
2776 0 : }
2777 0 : return fs.PathJoin(storePathIdentifier, relativePath)
2778 : }
2779 :
2780 : // resolveStorePath is the inverse of MakeStoreRelativePath(). It replaces any
2781 : // storePathIdentifier prefix with the store dir.
2782 1 : func resolveStorePath(storeDir, path string) string {
2783 1 : if remainder, ok := strings.CutPrefix(path, storePathIdentifier); ok {
2784 1 : return storeDir + remainder
2785 1 : }
2786 1 : return path
2787 : }
|