Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "io"
11 : "runtime"
12 : "strconv"
13 : "strings"
14 : "time"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/cache"
19 : "github.com/cockroachdb/pebble/internal/humanize"
20 : "github.com/cockroachdb/pebble/internal/keyspan"
21 : "github.com/cockroachdb/pebble/internal/manifest"
22 : "github.com/cockroachdb/pebble/objstorage/remote"
23 : "github.com/cockroachdb/pebble/rangekey"
24 : "github.com/cockroachdb/pebble/sstable"
25 : "github.com/cockroachdb/pebble/vfs"
26 : )
27 :
28 : const (
29 : cacheDefaultSize = 8 << 20 // 8 MB
30 : defaultLevelMultiplier = 10
31 : )
32 :
33 : // Compression exports the base.Compression type.
34 : type Compression = sstable.Compression
35 :
36 : // Exported Compression constants.
37 : const (
38 : DefaultCompression = sstable.DefaultCompression
39 : NoCompression = sstable.NoCompression
40 : SnappyCompression = sstable.SnappyCompression
41 : ZstdCompression = sstable.ZstdCompression
42 : )
43 :
44 : // FilterType exports the base.FilterType type.
45 : type FilterType = base.FilterType
46 :
47 : // Exported TableFilter constants.
48 : const (
49 : TableFilter = base.TableFilter
50 : )
51 :
52 : // FilterWriter exports the base.FilterWriter type.
53 : type FilterWriter = base.FilterWriter
54 :
55 : // FilterPolicy exports the base.FilterPolicy type.
56 : type FilterPolicy = base.FilterPolicy
57 :
58 : // TablePropertyCollector exports the sstable.TablePropertyCollector type.
59 : type TablePropertyCollector = sstable.TablePropertyCollector
60 :
61 : // BlockPropertyCollector exports the sstable.BlockPropertyCollector type.
62 : type BlockPropertyCollector = sstable.BlockPropertyCollector
63 :
64 : // BlockPropertyFilter exports the sstable.BlockPropertyFilter type.
65 : type BlockPropertyFilter = base.BlockPropertyFilter
66 :
67 : // ShortAttributeExtractor exports the base.ShortAttributeExtractor type.
68 : type ShortAttributeExtractor = base.ShortAttributeExtractor
69 :
70 : // UserKeyPrefixBound exports the sstable.UserKeyPrefixBound type.
71 : type UserKeyPrefixBound = sstable.UserKeyPrefixBound
72 :
73 : // IterKeyType configures which types of keys an iterator should surface.
74 : type IterKeyType int8
75 :
76 : const (
77 : // IterKeyTypePointsOnly configures an iterator to iterate over point keys
78 : // only.
79 : IterKeyTypePointsOnly IterKeyType = iota
80 : // IterKeyTypeRangesOnly configures an iterator to iterate over range keys
81 : // only.
82 : IterKeyTypeRangesOnly
83 : // IterKeyTypePointsAndRanges configures an iterator iterate over both point
84 : // keys and range keys simultaneously.
85 : IterKeyTypePointsAndRanges
86 : )
87 :
88 : // String implements fmt.Stringer.
89 0 : func (t IterKeyType) String() string {
90 0 : switch t {
91 0 : case IterKeyTypePointsOnly:
92 0 : return "points-only"
93 0 : case IterKeyTypeRangesOnly:
94 0 : return "ranges-only"
95 0 : case IterKeyTypePointsAndRanges:
96 0 : return "points-and-ranges"
97 0 : default:
98 0 : panic(fmt.Sprintf("unknown key type %d", t))
99 : }
100 : }
101 :
102 : // IterOptions hold the optional per-query parameters for NewIter.
103 : //
104 : // Like Options, a nil *IterOptions is valid and means to use the default
105 : // values.
106 : type IterOptions struct {
107 : // LowerBound specifies the smallest key (inclusive) that the iterator will
108 : // return during iteration. If the iterator is seeked or iterated past this
109 : // boundary the iterator will return Valid()==false. Setting LowerBound
110 : // effectively truncates the key space visible to the iterator.
111 : LowerBound []byte
112 : // UpperBound specifies the largest key (exclusive) that the iterator will
113 : // return during iteration. If the iterator is seeked or iterated past this
114 : // boundary the iterator will return Valid()==false. Setting UpperBound
115 : // effectively truncates the key space visible to the iterator.
116 : UpperBound []byte
117 : // TableFilter can be used to filter the tables that are scanned during
118 : // iteration based on the user properties. Return true to scan the table and
119 : // false to skip scanning. This function must be thread-safe since the same
120 : // function can be used by multiple iterators, if the iterator is cloned.
121 : TableFilter func(userProps map[string]string) bool
122 : // PointKeyFilters can be used to avoid scanning tables and blocks in tables
123 : // when iterating over point keys. This slice represents an intersection
124 : // across all filters, i.e., all filters must indicate that the block is
125 : // relevant.
126 : //
127 : // Performance note: When len(PointKeyFilters) > 0, the caller should ensure
128 : // that cap(PointKeyFilters) is at least len(PointKeyFilters)+1. This helps
129 : // avoid allocations in Pebble internal code that mutates the slice.
130 : PointKeyFilters []BlockPropertyFilter
131 : // RangeKeyFilters can be usefd to avoid scanning tables and blocks in tables
132 : // when iterating over range keys. The same requirements that apply to
133 : // PointKeyFilters apply here too.
134 : RangeKeyFilters []BlockPropertyFilter
135 : // KeyTypes configures which types of keys to iterate over: point keys,
136 : // range keys, or both.
137 : KeyTypes IterKeyType
138 : // RangeKeyMasking can be used to enable automatic masking of point keys by
139 : // range keys. Range key masking is only supported during combined range key
140 : // and point key iteration mode (IterKeyTypePointsAndRanges).
141 : RangeKeyMasking RangeKeyMasking
142 :
143 : // OnlyReadGuaranteedDurable is an advanced option that is only supported by
144 : // the Reader implemented by DB. When set to true, only the guaranteed to be
145 : // durable state is visible in the iterator.
146 : // - This definition is made under the assumption that the FS implementation
147 : // is providing a durability guarantee when data is synced.
148 : // - The visible state represents a consistent point in the history of the
149 : // DB.
150 : // - The implementation is free to choose a conservative definition of what
151 : // is guaranteed durable. For simplicity, the current implementation
152 : // ignores memtables. A more sophisticated implementation could track the
153 : // highest seqnum that is synced to the WAL and published and use that as
154 : // the visible seqnum for an iterator. Note that the latter approach is
155 : // not strictly better than the former since we can have DBs that are (a)
156 : // synced more rarely than memtable flushes, (b) have no WAL. (a) is
157 : // likely to be true in a future CockroachDB context where the DB
158 : // containing the state machine may be rarely synced.
159 : // NB: this current implementation relies on the fact that memtables are
160 : // flushed in seqnum order, and any ingested sstables that happen to have a
161 : // lower seqnum than a non-flushed memtable don't have any overlapping keys.
162 : // This is the fundamental level invariant used in other code too, like when
163 : // merging iterators.
164 : //
165 : // Semantically, using this option provides the caller a "snapshot" as of
166 : // the time the most recent memtable was flushed. An alternate interface
167 : // would be to add a NewSnapshot variant. Creating a snapshot is heavier
168 : // weight than creating an iterator, so we have opted to support this
169 : // iterator option.
170 : OnlyReadGuaranteedDurable bool
171 : // UseL6Filters allows the caller to opt into reading filter blocks for L6
172 : // sstables. Helpful if a lot of SeekPrefixGEs are expected in quick
173 : // succession, that are also likely to not yield a single key. Filter blocks in
174 : // L6 can be relatively large, often larger than data blocks, so the benefit of
175 : // loading them in the cache is minimized if the probability of the key
176 : // existing is not low or if we just expect a one-time Seek (where loading the
177 : // data block directly is better).
178 : UseL6Filters bool
179 :
180 : // Internal options.
181 :
182 : logger Logger
183 : // Level corresponding to this file. Only passed in if constructed by a
184 : // levelIter.
185 : level manifest.Level
186 : // disableLazyCombinedIteration is an internal testing option.
187 : disableLazyCombinedIteration bool
188 : // snapshotForHideObsoletePoints is specified for/by levelIter when opening
189 : // files and is used to decide whether to hide obsolete points. A value of 0
190 : // implies obsolete points should not be hidden.
191 : snapshotForHideObsoletePoints uint64
192 :
193 : // NB: If adding new Options, you must account for them in iterator
194 : // construction and Iterator.SetOptions.
195 : }
196 :
197 : // GetLowerBound returns the LowerBound or nil if the receiver is nil.
198 1 : func (o *IterOptions) GetLowerBound() []byte {
199 1 : if o == nil {
200 1 : return nil
201 1 : }
202 1 : return o.LowerBound
203 : }
204 :
205 : // GetUpperBound returns the UpperBound or nil if the receiver is nil.
206 1 : func (o *IterOptions) GetUpperBound() []byte {
207 1 : if o == nil {
208 1 : return nil
209 1 : }
210 1 : return o.UpperBound
211 : }
212 :
213 1 : func (o *IterOptions) pointKeys() bool {
214 1 : if o == nil {
215 0 : return true
216 0 : }
217 1 : return o.KeyTypes == IterKeyTypePointsOnly || o.KeyTypes == IterKeyTypePointsAndRanges
218 : }
219 :
220 1 : func (o *IterOptions) rangeKeys() bool {
221 1 : if o == nil {
222 1 : return false
223 1 : }
224 1 : return o.KeyTypes == IterKeyTypeRangesOnly || o.KeyTypes == IterKeyTypePointsAndRanges
225 : }
226 :
227 1 : func (o *IterOptions) getLogger() Logger {
228 1 : if o == nil || o.logger == nil {
229 1 : return DefaultLogger
230 1 : }
231 1 : return o.logger
232 : }
233 :
234 : // SpanIterOptions creates a SpanIterOptions from this IterOptions.
235 1 : func (o *IterOptions) SpanIterOptions() keyspan.SpanIterOptions {
236 1 : if o == nil {
237 0 : return keyspan.SpanIterOptions{}
238 0 : }
239 1 : return keyspan.SpanIterOptions{
240 1 : RangeKeyFilters: o.RangeKeyFilters,
241 1 : }
242 : }
243 :
244 : // scanInternalOptions is similar to IterOptions, meant for use with
245 : // scanInternalIterator.
246 : type scanInternalOptions struct {
247 : IterOptions
248 :
249 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error
250 : visitRangeDel func(start, end []byte, seqNum uint64) error
251 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error
252 : visitSharedFile func(sst *SharedSSTMeta) error
253 :
254 : // skipSharedLevels skips levels that are shareable (level >=
255 : // sharedLevelStart).
256 : skipSharedLevels bool
257 :
258 : // includeObsoleteKeys specifies whether keys shadowed by newer internal keys
259 : // are exposed. If false, only one internal key per user key is exposed.
260 : includeObsoleteKeys bool
261 :
262 : // rateLimitFunc is used to limit the amount of bytes read per second.
263 : rateLimitFunc func(key *InternalKey, value LazyValue) error
264 : }
265 :
266 : // RangeKeyMasking configures automatic hiding of point keys by range keys. A
267 : // non-nil Suffix enables range-key masking. When enabled, range keys with
268 : // suffixes ≥ Suffix behave as masks. All point keys that are contained within a
269 : // masking range key's bounds and have suffixes greater than the range key's
270 : // suffix are automatically skipped.
271 : //
272 : // Specifically, when configured with a RangeKeyMasking.Suffix _s_, and there
273 : // exists a range key with suffix _r_ covering a point key with suffix _p_, and
274 : //
275 : // _s_ ≤ _r_ < _p_
276 : //
277 : // then the point key is elided.
278 : //
279 : // Range-key masking may only be used when iterating over both point keys and
280 : // range keys with IterKeyTypePointsAndRanges.
281 : type RangeKeyMasking struct {
282 : // Suffix configures which range keys may mask point keys. Only range keys
283 : // that are defined at suffixes greater than or equal to Suffix will mask
284 : // point keys.
285 : Suffix []byte
286 : // Filter is an optional field that may be used to improve performance of
287 : // range-key masking through a block-property filter defined over key
288 : // suffixes. If non-nil, Filter is called by Pebble to construct a
289 : // block-property filter mask at iterator creation. The filter is used to
290 : // skip whole point-key blocks containing point keys with suffixes greater
291 : // than a covering range-key's suffix.
292 : //
293 : // To use this functionality, the caller must create and configure (through
294 : // Options.BlockPropertyCollectors) a block-property collector that records
295 : // the maxmimum suffix contained within a block. The caller then must write
296 : // and provide a BlockPropertyFilterMask implementation on that same
297 : // property. See the BlockPropertyFilterMask type for more information.
298 : Filter func() BlockPropertyFilterMask
299 : }
300 :
301 : // BlockPropertyFilterMask extends the BlockPropertyFilter interface for use
302 : // with range-key masking. Unlike an ordinary block property filter, a
303 : // BlockPropertyFilterMask's filtering criteria is allowed to change when Pebble
304 : // invokes its SetSuffix method.
305 : //
306 : // When a Pebble iterator steps into a range key's bounds and the range key has
307 : // a suffix greater than or equal to RangeKeyMasking.Suffix, the range key acts
308 : // as a mask. The masking range key hides all point keys that fall within the
309 : // range key's bounds and have suffixes > the range key's suffix. Without a
310 : // filter mask configured, Pebble performs this hiding by stepping through point
311 : // keys and comparing suffixes. If large numbers of point keys are masked, this
312 : // requires Pebble to load, iterate through and discard a large number of
313 : // sstable blocks containing masked point keys.
314 : //
315 : // If a block-property collector and a filter mask are configured, Pebble may
316 : // skip loading some point-key blocks altogether. If a block's keys are known to
317 : // all fall within the bounds of the masking range key and the block was
318 : // annotated by a block-property collector with the maximal suffix, Pebble can
319 : // ask the filter mask to compare the property to the current masking range
320 : // key's suffix. If the mask reports no intersection, the block may be skipped.
321 : //
322 : // If unsuffixed and suffixed keys are written to the database, care must be
323 : // taken to avoid unintentionally masking un-suffixed keys located in the same
324 : // block as suffixed keys. One solution is to interpret unsuffixed keys as
325 : // containing the maximal suffix value, ensuring that blocks containing
326 : // unsuffixed keys are always loaded.
327 : type BlockPropertyFilterMask interface {
328 : BlockPropertyFilter
329 :
330 : // SetSuffix configures the mask with the suffix of a range key. The filter
331 : // should return false from Intersects whenever it's provided with a
332 : // property encoding a block's minimum suffix that's greater (according to
333 : // Compare) than the provided suffix.
334 : SetSuffix(suffix []byte) error
335 : }
336 :
337 : // WriteOptions hold the optional per-query parameters for Set and Delete
338 : // operations.
339 : //
340 : // Like Options, a nil *WriteOptions is valid and means to use the default
341 : // values.
342 : type WriteOptions struct {
343 : // Sync is whether to sync writes through the OS buffer cache and down onto
344 : // the actual disk, if applicable. Setting Sync is required for durability of
345 : // individual write operations but can result in slower writes.
346 : //
347 : // If false, and the process or machine crashes, then a recent write may be
348 : // lost. This is due to the recently written data being buffered inside the
349 : // process running Pebble. This differs from the semantics of a write system
350 : // call in which the data is buffered in the OS buffer cache and would thus
351 : // survive a process crash.
352 : //
353 : // The default value is true.
354 : Sync bool
355 : }
356 :
357 : // Sync specifies the default write options for writes which synchronize to
358 : // disk.
359 : var Sync = &WriteOptions{Sync: true}
360 :
361 : // NoSync specifies the default write options for writes which do not
362 : // synchronize to disk.
363 : var NoSync = &WriteOptions{Sync: false}
364 :
365 : // GetSync returns the Sync value or true if the receiver is nil.
366 1 : func (o *WriteOptions) GetSync() bool {
367 1 : return o == nil || o.Sync
368 1 : }
369 :
370 : // LevelOptions holds the optional per-level parameters.
371 : type LevelOptions struct {
372 : // BlockRestartInterval is the number of keys between restart points
373 : // for delta encoding of keys.
374 : //
375 : // The default value is 16.
376 : BlockRestartInterval int
377 :
378 : // BlockSize is the target uncompressed size in bytes of each table block.
379 : //
380 : // The default value is 4096.
381 : BlockSize int
382 :
383 : // BlockSizeThreshold finishes a block if the block size is larger than the
384 : // specified percentage of the target block size and adding the next entry
385 : // would cause the block to be larger than the target block size.
386 : //
387 : // The default value is 90
388 : BlockSizeThreshold int
389 :
390 : // Compression defines the per-block compression to use.
391 : //
392 : // The default value (DefaultCompression) uses snappy compression.
393 : Compression Compression
394 :
395 : // FilterPolicy defines a filter algorithm (such as a Bloom filter) that can
396 : // reduce disk reads for Get calls.
397 : //
398 : // One such implementation is bloom.FilterPolicy(10) from the pebble/bloom
399 : // package.
400 : //
401 : // The default value means to use no filter.
402 : FilterPolicy FilterPolicy
403 :
404 : // FilterType defines whether an existing filter policy is applied at a
405 : // block-level or table-level. Block-level filters use less memory to create,
406 : // but are slower to access as a check for the key in the index must first be
407 : // performed to locate the filter block. A table-level filter will require
408 : // memory proportional to the number of keys in an sstable to create, but
409 : // avoids the index lookup when determining if a key is present. Table-level
410 : // filters should be preferred except under constrained memory situations.
411 : FilterType FilterType
412 :
413 : // IndexBlockSize is the target uncompressed size in bytes of each index
414 : // block. When the index block size is larger than this target, two-level
415 : // indexes are automatically enabled. Setting this option to a large value
416 : // (such as math.MaxInt32) disables the automatic creation of two-level
417 : // indexes.
418 : //
419 : // The default value is the value of BlockSize.
420 : IndexBlockSize int
421 :
422 : // The target file size for the level.
423 : TargetFileSize int64
424 : }
425 :
426 : // EnsureDefaults ensures that the default values for all of the options have
427 : // been initialized. It is valid to call EnsureDefaults on a nil receiver. A
428 : // non-nil result will always be returned.
429 1 : func (o *LevelOptions) EnsureDefaults() *LevelOptions {
430 1 : if o == nil {
431 0 : o = &LevelOptions{}
432 0 : }
433 1 : if o.BlockRestartInterval <= 0 {
434 0 : o.BlockRestartInterval = base.DefaultBlockRestartInterval
435 0 : }
436 1 : if o.BlockSize <= 0 {
437 0 : o.BlockSize = base.DefaultBlockSize
438 1 : } else if o.BlockSize > sstable.MaximumBlockSize {
439 0 : panic(errors.Errorf("BlockSize %d exceeds MaximumBlockSize", o.BlockSize))
440 : }
441 1 : if o.BlockSizeThreshold <= 0 {
442 0 : o.BlockSizeThreshold = base.DefaultBlockSizeThreshold
443 0 : }
444 1 : if o.Compression <= DefaultCompression || o.Compression >= sstable.NCompression {
445 0 : o.Compression = SnappyCompression
446 0 : }
447 1 : if o.IndexBlockSize <= 0 {
448 0 : o.IndexBlockSize = o.BlockSize
449 0 : }
450 1 : if o.TargetFileSize <= 0 {
451 0 : o.TargetFileSize = 2 << 20 // 2 MB
452 0 : }
453 1 : return o
454 : }
455 :
456 : // Options holds the optional parameters for configuring pebble. These options
457 : // apply to the DB at large; per-query options are defined by the IterOptions
458 : // and WriteOptions types.
459 : type Options struct {
460 : // Sync sstables periodically in order to smooth out writes to disk. This
461 : // option does not provide any persistency guarantee, but is used to avoid
462 : // latency spikes if the OS automatically decides to write out a large chunk
463 : // of dirty filesystem buffers. This option only controls SSTable syncs; WAL
464 : // syncs are controlled by WALBytesPerSync.
465 : //
466 : // The default value is 512KB.
467 : BytesPerSync int
468 :
469 : // Cache is used to cache uncompressed blocks from sstables.
470 : //
471 : // The default cache size is 8 MB.
472 : Cache *cache.Cache
473 :
474 : // Cleaner cleans obsolete files.
475 : //
476 : // The default cleaner uses the DeleteCleaner.
477 : Cleaner Cleaner
478 :
479 : // Comparer defines a total ordering over the space of []byte keys: a 'less
480 : // than' relationship. The same comparison algorithm must be used for reads
481 : // and writes over the lifetime of the DB.
482 : //
483 : // The default value uses the same ordering as bytes.Compare.
484 : Comparer *Comparer
485 :
486 : // DebugCheck is invoked, if non-nil, whenever a new version is being
487 : // installed. Typically, this is set to pebble.DebugCheckLevels in tests
488 : // or tools only, to check invariants over all the data in the database.
489 : DebugCheck func(*DB) error
490 :
491 : // Disable the write-ahead log (WAL). Disabling the write-ahead log prohibits
492 : // crash recovery, but can improve performance if crash recovery is not
493 : // needed (e.g. when only temporary state is being stored in the database).
494 : //
495 : // TODO(peter): untested
496 : DisableWAL bool
497 :
498 : // ErrorIfExists causes an error on Open if the database already exists.
499 : // The error can be checked with errors.Is(err, ErrDBAlreadyExists).
500 : //
501 : // The default value is false.
502 : ErrorIfExists bool
503 :
504 : // ErrorIfNotExists causes an error on Open if the database does not already
505 : // exist. The error can be checked with errors.Is(err, ErrDBDoesNotExist).
506 : //
507 : // The default value is false which will cause a database to be created if it
508 : // does not already exist.
509 : ErrorIfNotExists bool
510 :
511 : // ErrorIfNotPristine causes an error on Open if the database already exists
512 : // and any operations have been performed on the database. The error can be
513 : // checked with errors.Is(err, ErrDBNotPristine).
514 : //
515 : // Note that a database that contained keys that were all subsequently deleted
516 : // may or may not trigger the error. Currently, we check if there are any live
517 : // SSTs or log records to replay.
518 : ErrorIfNotPristine bool
519 :
520 : // EventListener provides hooks to listening to significant DB events such as
521 : // flushes, compactions, and table deletion.
522 : EventListener *EventListener
523 :
524 : // Experimental contains experimental options which are off by default.
525 : // These options are temporary and will eventually either be deleted, moved
526 : // out of the experimental group, or made the non-adjustable default. These
527 : // options may change at any time, so do not rely on them.
528 : Experimental struct {
529 : // The threshold of L0 read-amplification at which compaction concurrency
530 : // is enabled (if CompactionDebtConcurrency was not already exceeded).
531 : // Every multiple of this value enables another concurrent
532 : // compaction up to MaxConcurrentCompactions.
533 : L0CompactionConcurrency int
534 :
535 : // CompactionDebtConcurrency controls the threshold of compaction debt
536 : // at which additional compaction concurrency slots are added. For every
537 : // multiple of this value in compaction debt bytes, an additional
538 : // concurrent compaction is added. This works "on top" of
539 : // L0CompactionConcurrency, so the higher of the count of compaction
540 : // concurrency slots as determined by the two options is chosen.
541 : CompactionDebtConcurrency uint64
542 :
543 : // ReadCompactionRate controls the frequency of read triggered
544 : // compactions by adjusting `AllowedSeeks` in manifest.FileMetadata:
545 : //
546 : // AllowedSeeks = FileSize / ReadCompactionRate
547 : //
548 : // From LevelDB:
549 : // ```
550 : // We arrange to automatically compact this file after
551 : // a certain number of seeks. Let's assume:
552 : // (1) One seek costs 10ms
553 : // (2) Writing or reading 1MB costs 10ms (100MB/s)
554 : // (3) A compaction of 1MB does 25MB of IO:
555 : // 1MB read from this level
556 : // 10-12MB read from next level (boundaries may be misaligned)
557 : // 10-12MB written to next level
558 : // This implies that 25 seeks cost the same as the compaction
559 : // of 1MB of data. I.e., one seek costs approximately the
560 : // same as the compaction of 40KB of data. We are a little
561 : // conservative and allow approximately one seek for every 16KB
562 : // of data before triggering a compaction.
563 : // ```
564 : ReadCompactionRate int64
565 :
566 : // ReadSamplingMultiplier is a multiplier for the readSamplingPeriod in
567 : // iterator.maybeSampleRead() to control the frequency of read sampling
568 : // to trigger a read triggered compaction. A value of -1 prevents sampling
569 : // and disables read triggered compactions. The default is 1 << 4. which
570 : // gets multiplied with a constant of 1 << 16 to yield 1 << 20 (1MB).
571 : ReadSamplingMultiplier int64
572 :
573 : // TableCacheShards is the number of shards per table cache.
574 : // Reducing the value can reduce the number of idle goroutines per DB
575 : // instance which can be useful in scenarios with a lot of DB instances
576 : // and a large number of CPUs, but doing so can lead to higher contention
577 : // in the table cache and reduced performance.
578 : //
579 : // The default value is the number of logical CPUs, which can be
580 : // limited by runtime.GOMAXPROCS.
581 : TableCacheShards int
582 :
583 : // KeyValidationFunc is a function to validate a user key in an SSTable.
584 : //
585 : // Currently, this function is used to validate the smallest and largest
586 : // keys in an SSTable undergoing compaction. In this case, returning an
587 : // error from the validation function will result in a panic at runtime,
588 : // given that there is rarely any way of recovering from malformed keys
589 : // present in compacted files. By default, validation is not performed.
590 : //
591 : // Additional use-cases may be added in the future.
592 : //
593 : // NOTE: callers should take care to not mutate the key being validated.
594 : KeyValidationFunc func(userKey []byte) error
595 :
596 : // ValidateOnIngest schedules validation of sstables after they have
597 : // been ingested.
598 : //
599 : // By default, this value is false.
600 : ValidateOnIngest bool
601 :
602 : // LevelMultiplier configures the size multiplier used to determine the
603 : // desired size of each level of the LSM. Defaults to 10.
604 : LevelMultiplier int
605 :
606 : // MultiLevelCompactionHeuristic determines whether to add an additional
607 : // level to a conventional two level compaction. If nil, a multilevel
608 : // compaction will never get triggered.
609 : MultiLevelCompactionHeuristic MultiLevelHeuristic
610 :
611 : // MaxWriterConcurrency is used to indicate the maximum number of
612 : // compression workers the compression queue is allowed to use. If
613 : // MaxWriterConcurrency > 0, then the Writer will use parallelism, to
614 : // compress and write blocks to disk. Otherwise, the writer will
615 : // compress and write blocks to disk synchronously.
616 : MaxWriterConcurrency int
617 :
618 : // ForceWriterParallelism is used to force parallelism in the sstable
619 : // Writer for the metamorphic tests. Even with the MaxWriterConcurrency
620 : // option set, we only enable parallelism in the sstable Writer if there
621 : // is enough CPU available, and this option bypasses that.
622 : ForceWriterParallelism bool
623 :
624 : // CPUWorkPermissionGranter should be set if Pebble should be given the
625 : // ability to optionally schedule additional CPU. See the documentation
626 : // for CPUWorkPermissionGranter for more details.
627 : CPUWorkPermissionGranter CPUWorkPermissionGranter
628 :
629 : // EnableValueBlocks is used to decide whether to enable writing
630 : // TableFormatPebblev3 sstables. This setting is only respected by a
631 : // specific subset of format major versions: FormatSSTableValueBlocks,
632 : // FormatFlushableIngest and FormatPrePebblev1MarkedCompacted. In lower
633 : // format major versions, value blocks are never enabled. In higher
634 : // format major versions, value blocks are always enabled.
635 : EnableValueBlocks func() bool
636 :
637 : // ShortAttributeExtractor is used iff EnableValueBlocks() returns true
638 : // (else ignored). If non-nil, a ShortAttribute can be extracted from the
639 : // value and stored with the key, when the value is stored elsewhere.
640 : ShortAttributeExtractor ShortAttributeExtractor
641 :
642 : // RequiredInPlaceValueBound specifies an optional span of user key
643 : // prefixes that are not-MVCC, but have a suffix. For these the values
644 : // must be stored with the key, since the concept of "older versions" is
645 : // not defined. It is also useful for statically known exclusions to value
646 : // separation. In CockroachDB, this will be used for the lock table key
647 : // space that has non-empty suffixes, but those locks don't represent
648 : // actual MVCC versions (the suffix ordering is arbitrary). We will also
649 : // need to add support for dynamically configured exclusions (we want the
650 : // default to be to allow Pebble to decide whether to separate the value
651 : // or not, hence this is structured as exclusions), for example, for users
652 : // of CockroachDB to dynamically exclude certain tables.
653 : //
654 : // Any change in exclusion behavior takes effect only on future written
655 : // sstables, and does not start rewriting existing sstables.
656 : //
657 : // Even ignoring changes in this setting, exclusions are interpreted as a
658 : // guidance by Pebble, and not necessarily honored. Specifically, user
659 : // keys with multiple Pebble-versions *may* have the older versions stored
660 : // in value blocks.
661 : RequiredInPlaceValueBound UserKeyPrefixBound
662 :
663 : // DisableIngestAsFlushable disables lazy ingestion of sstables through
664 : // a WAL write and memtable rotation. Only effectual if the the format
665 : // major version is at least `FormatFlushableIngest`.
666 : DisableIngestAsFlushable func() bool
667 :
668 : // RemoteStorage enables use of remote storage (e.g. S3) for storing
669 : // sstables. Setting this option enables use of CreateOnShared option and
670 : // allows ingestion of external files.
671 : RemoteStorage remote.StorageFactory
672 :
673 : // If CreateOnShared is true, any new sstables are created on remote storage
674 : // (using CreateOnSharedLocator). These sstables can be shared between
675 : // different Pebble instances; the lifecycle of such objects is managed by
676 : // the cluster.
677 : //
678 : // Can only be used when RemoteStorage is set (and recognizes
679 : // CreateOnSharedLocator).
680 : CreateOnShared bool
681 : CreateOnSharedLocator remote.Locator
682 :
683 : // CacheSizeBytesBytes is the size of the on-disk block cache for objects
684 : // on shared storage in bytes. If it is 0, no cache is used.
685 : SecondaryCacheSizeBytes int64
686 : }
687 :
688 : // Filters is a map from filter policy name to filter policy. It is used for
689 : // debugging tools which may be used on multiple databases configured with
690 : // different filter policies. It is not necessary to populate this filters
691 : // map during normal usage of a DB.
692 : Filters map[string]FilterPolicy
693 :
694 : // FlushDelayDeleteRange configures how long the database should wait before
695 : // forcing a flush of a memtable that contains a range deletion. Disk space
696 : // cannot be reclaimed until the range deletion is flushed. No automatic
697 : // flush occurs if zero.
698 : FlushDelayDeleteRange time.Duration
699 :
700 : // FlushDelayRangeKey configures how long the database should wait before
701 : // forcing a flush of a memtable that contains a range key. Range keys in
702 : // the memtable prevent lazy combined iteration, so it's desirable to flush
703 : // range keys promptly. No automatic flush occurs if zero.
704 : FlushDelayRangeKey time.Duration
705 :
706 : // FlushSplitBytes denotes the target number of bytes per sublevel in
707 : // each flush split interval (i.e. range between two flush split keys)
708 : // in L0 sstables. When set to zero, only a single sstable is generated
709 : // by each flush. When set to a non-zero value, flushes are split at
710 : // points to meet L0's TargetFileSize, any grandparent-related overlap
711 : // options, and at boundary keys of L0 flush split intervals (which are
712 : // targeted to contain around FlushSplitBytes bytes in each sublevel
713 : // between pairs of boundary keys). Splitting sstables during flush
714 : // allows increased compaction flexibility and concurrency when those
715 : // tables are compacted to lower levels.
716 : FlushSplitBytes int64
717 :
718 : // FormatMajorVersion sets the format of on-disk files. It is
719 : // recommended to set the format major version to an explicit
720 : // version, as the default may change over time.
721 : //
722 : // At Open if the existing database is formatted using a later
723 : // format major version that is known to this version of Pebble,
724 : // Pebble will continue to use the later format major version. If
725 : // the existing database's version is unknown, the caller may use
726 : // FormatMostCompatible and will be able to open the database
727 : // regardless of its actual version.
728 : //
729 : // If the existing database is formatted using a format major
730 : // version earlier than the one specified, Open will automatically
731 : // ratchet the database to the specified format major version.
732 : FormatMajorVersion FormatMajorVersion
733 :
734 : // FS provides the interface for persistent file storage.
735 : //
736 : // The default value uses the underlying operating system's file system.
737 : FS vfs.FS
738 :
739 : // Lock, if set, must be a database lock acquired through LockDirectory for
740 : // the same directory passed to Open. If provided, Open will skip locking
741 : // the directory. Closing the database will not release the lock, and it's
742 : // the responsibility of the caller to release the lock after closing the
743 : // database.
744 : //
745 : // Open will enforce that the Lock passed locks the same directory passed to
746 : // Open. Concurrent calls to Open using the same Lock are detected and
747 : // prohibited.
748 : Lock *Lock
749 :
750 : // The count of L0 files necessary to trigger an L0 compaction.
751 : L0CompactionFileThreshold int
752 :
753 : // The amount of L0 read-amplification necessary to trigger an L0 compaction.
754 : L0CompactionThreshold int
755 :
756 : // Hard limit on L0 read-amplification, computed as the number of L0
757 : // sublevels. Writes are stopped when this threshold is reached.
758 : L0StopWritesThreshold int
759 :
760 : // The maximum number of bytes for LBase. The base level is the level which
761 : // L0 is compacted into. The base level is determined dynamically based on
762 : // the existing data in the LSM. The maximum number of bytes for other levels
763 : // is computed dynamically based on the base level's maximum size. When the
764 : // maximum number of bytes for a level is exceeded, compaction is requested.
765 : LBaseMaxBytes int64
766 :
767 : // Per-level options. Options for at least one level must be specified. The
768 : // options for the last level are used for all subsequent levels.
769 : Levels []LevelOptions
770 :
771 : // LoggerAndTracer will be used, if non-nil, else Logger will be used and
772 : // tracing will be a noop.
773 :
774 : // Logger used to write log messages.
775 : //
776 : // The default logger uses the Go standard library log package.
777 : Logger Logger
778 : // LoggerAndTracer is used for writing log messages and traces.
779 : LoggerAndTracer LoggerAndTracer
780 :
781 : // MaxManifestFileSize is the maximum size the MANIFEST file is allowed to
782 : // become. When the MANIFEST exceeds this size it is rolled over and a new
783 : // MANIFEST is created.
784 : MaxManifestFileSize int64
785 :
786 : // MaxOpenFiles is a soft limit on the number of open files that can be
787 : // used by the DB.
788 : //
789 : // The default value is 1000.
790 : MaxOpenFiles int
791 :
792 : // The size of a MemTable in steady state. The actual MemTable size starts at
793 : // min(256KB, MemTableSize) and doubles for each subsequent MemTable up to
794 : // MemTableSize. This reduces the memory pressure caused by MemTables for
795 : // short lived (test) DB instances. Note that more than one MemTable can be
796 : // in existence since flushing a MemTable involves creating a new one and
797 : // writing the contents of the old one in the
798 : // background. MemTableStopWritesThreshold places a hard limit on the size of
799 : // the queued MemTables.
800 : //
801 : // The default value is 4MB.
802 : MemTableSize uint64
803 :
804 : // Hard limit on the number of queued of MemTables. Writes are stopped when
805 : // the sum of the queued memtable sizes exceeds:
806 : // MemTableStopWritesThreshold * MemTableSize.
807 : //
808 : // This value should be at least 2 or writes will stop whenever a MemTable is
809 : // being flushed.
810 : //
811 : // The default value is 2.
812 : MemTableStopWritesThreshold int
813 :
814 : // Merger defines the associative merge operation to use for merging values
815 : // written with {Batch,DB}.Merge.
816 : //
817 : // The default merger concatenates values.
818 : Merger *Merger
819 :
820 : // MaxConcurrentCompactions specifies the maximum number of concurrent
821 : // compactions. The default is 1. Concurrent compactions are performed
822 : // - when L0 read-amplification passes the L0CompactionConcurrency threshold
823 : // - for automatic background compactions
824 : // - when a manual compaction for a level is split and parallelized
825 : // MaxConcurrentCompactions must be greater than 0.
826 : MaxConcurrentCompactions func() int
827 :
828 : // DisableAutomaticCompactions dictates whether automatic compactions are
829 : // scheduled or not. The default is false (enabled). This option is only used
830 : // externally when running a manual compaction, and internally for tests.
831 : DisableAutomaticCompactions bool
832 :
833 : // NoSyncOnClose decides whether the Pebble instance will enforce a
834 : // close-time synchronization (e.g., fdatasync() or sync_file_range())
835 : // on files it writes to. Setting this to true removes the guarantee for a
836 : // sync on close. Some implementations can still issue a non-blocking sync.
837 : NoSyncOnClose bool
838 :
839 : // NumPrevManifest is the number of non-current or older manifests which
840 : // we want to keep around for debugging purposes. By default, we're going
841 : // to keep one older manifest.
842 : NumPrevManifest int
843 :
844 : // ReadOnly indicates that the DB should be opened in read-only mode. Writes
845 : // to the DB will return an error, background compactions are disabled, and
846 : // the flush that normally occurs after replaying the WAL at startup is
847 : // disabled.
848 : ReadOnly bool
849 :
850 : // TableCache is an initialized TableCache which should be set as an
851 : // option if the DB needs to be initialized with a pre-existing table cache.
852 : // If TableCache is nil, then a table cache which is unique to the DB instance
853 : // is created. TableCache can be shared between db instances by setting it here.
854 : // The TableCache set here must use the same underlying cache as Options.Cache
855 : // and pebble will panic otherwise.
856 : TableCache *TableCache
857 :
858 : // TablePropertyCollectors is a list of TablePropertyCollector creation
859 : // functions. A new TablePropertyCollector is created for each sstable built
860 : // and lives for the lifetime of the table.
861 : TablePropertyCollectors []func() TablePropertyCollector
862 :
863 : // BlockPropertyCollectors is a list of BlockPropertyCollector creation
864 : // functions. A new BlockPropertyCollector is created for each sstable
865 : // built and lives for the lifetime of writing that table.
866 : BlockPropertyCollectors []func() BlockPropertyCollector
867 :
868 : // WALBytesPerSync sets the number of bytes to write to a WAL before calling
869 : // Sync on it in the background. Just like with BytesPerSync above, this
870 : // helps smooth out disk write latencies, and avoids cases where the OS
871 : // writes a lot of buffered data to disk at once. However, this is less
872 : // necessary with WALs, as many write operations already pass in
873 : // Sync = true.
874 : //
875 : // The default value is 0, i.e. no background syncing. This matches the
876 : // default behaviour in RocksDB.
877 : WALBytesPerSync int
878 :
879 : // WALDir specifies the directory to store write-ahead logs (WALs) in. If
880 : // empty (the default), WALs will be stored in the same directory as sstables
881 : // (i.e. the directory passed to pebble.Open).
882 : WALDir string
883 :
884 : // WALMinSyncInterval is the minimum duration between syncs of the WAL. If
885 : // WAL syncs are requested faster than this interval, they will be
886 : // artificially delayed. Introducing a small artificial delay (500us) between
887 : // WAL syncs can allow more operations to arrive and reduce IO operations
888 : // while having a minimal impact on throughput. This option is supplied as a
889 : // closure in order to allow the value to be changed dynamically. The default
890 : // value is 0.
891 : //
892 : // TODO(peter): rather than a closure, should there be another mechanism for
893 : // changing options dynamically?
894 : WALMinSyncInterval func() time.Duration
895 :
896 : // TargetByteDeletionRate is the rate (in bytes per second) at which sstable file
897 : // deletions are limited to (under normal circumstances).
898 : //
899 : // Deletion pacing is used to slow down deletions when compactions finish up
900 : // or readers close and newly-obsolete files need cleaning up. Deleting lots
901 : // of files at once can cause disk latency to go up on some SSDs, which this
902 : // functionality guards against.
903 : //
904 : // This value is only a best-effort target; the effective rate can be
905 : // higher if deletions are falling behind or disk space is running low.
906 : //
907 : // Setting this to 0 disables deletion pacing, which is also the default.
908 : TargetByteDeletionRate int
909 :
910 : // private options are only used by internal tests or are used internally
911 : // for facilitating upgrade paths of unconfigurable functionality.
912 : private struct {
913 : // strictWALTail configures whether or not a database's WALs created
914 : // prior to the most recent one should be interpreted strictly,
915 : // requiring a clean EOF. RocksDB 6.2.1 and the version of Pebble
916 : // included in CockroachDB 20.1 do not guarantee that closed WALs end
917 : // cleanly. If this option is set within an OPTIONS file, Pebble
918 : // interprets previous WALs strictly, requiring a clean EOF.
919 : // Otherwise, it interprets them permissively in the same manner as
920 : // RocksDB 6.2.1.
921 : strictWALTail bool
922 :
923 : // disableDeleteOnlyCompactions prevents the scheduling of delete-only
924 : // compactions that drop sstables wholy covered by range tombstones or
925 : // range key tombstones.
926 : disableDeleteOnlyCompactions bool
927 :
928 : // disableElisionOnlyCompactions prevents the scheduling of elision-only
929 : // compactions that rewrite sstables in place in order to elide obsolete
930 : // keys.
931 : disableElisionOnlyCompactions bool
932 :
933 : // disableLazyCombinedIteration is a private option used by the
934 : // metamorphic tests to test equivalence between lazy-combined iteration
935 : // and constructing the range-key iterator upfront. It's a private
936 : // option to avoid littering the public interface with options that we
937 : // do not want to allow users to actually configure.
938 : disableLazyCombinedIteration bool
939 :
940 : // A private option to disable stats collection.
941 : disableTableStats bool
942 :
943 : // fsCloser holds a closer that should be invoked after a DB using these
944 : // Options is closed. This is used to automatically stop the
945 : // long-running goroutine associated with the disk-health-checking FS.
946 : // See the initialization of FS in EnsureDefaults. Note that care has
947 : // been taken to ensure that it is still safe to continue using the FS
948 : // after this closer has been invoked. However, if write operations
949 : // against the FS are made after the DB is closed, the FS may leak a
950 : // goroutine indefinitely.
951 : fsCloser io.Closer
952 : }
953 : }
954 :
955 : // DebugCheckLevels calls CheckLevels on the provided database.
956 : // It may be set in the DebugCheck field of Options to check
957 : // level invariants whenever a new version is installed.
958 1 : func DebugCheckLevels(db *DB) error {
959 1 : return db.CheckLevels(nil)
960 1 : }
961 :
962 : // EnsureDefaults ensures that the default values for all options are set if a
963 : // valid value was not already specified. Returns the new options.
964 1 : func (o *Options) EnsureDefaults() *Options {
965 1 : if o == nil {
966 0 : o = &Options{}
967 0 : }
968 1 : if o.BytesPerSync <= 0 {
969 0 : o.BytesPerSync = 512 << 10 // 512 KB
970 0 : }
971 1 : if o.Cleaner == nil {
972 0 : o.Cleaner = DeleteCleaner{}
973 0 : }
974 1 : if o.Comparer == nil {
975 0 : o.Comparer = DefaultComparer
976 0 : }
977 1 : if o.Experimental.DisableIngestAsFlushable == nil {
978 1 : o.Experimental.DisableIngestAsFlushable = func() bool { return false }
979 : }
980 1 : if o.Experimental.L0CompactionConcurrency <= 0 {
981 0 : o.Experimental.L0CompactionConcurrency = 10
982 0 : }
983 1 : if o.Experimental.CompactionDebtConcurrency <= 0 {
984 0 : o.Experimental.CompactionDebtConcurrency = 1 << 30 // 1 GB
985 0 : }
986 1 : if o.Experimental.KeyValidationFunc == nil {
987 1 : o.Experimental.KeyValidationFunc = func([]byte) error { return nil }
988 : }
989 1 : if o.L0CompactionThreshold <= 0 {
990 0 : o.L0CompactionThreshold = 4
991 0 : }
992 1 : if o.L0CompactionFileThreshold <= 0 {
993 0 : // Some justification for the default of 500:
994 0 : // Why not smaller?:
995 0 : // - The default target file size for L0 is 2MB, so 500 files is <= 1GB
996 0 : // of data. At observed compaction speeds of > 20MB/s, L0 can be
997 0 : // cleared of all files in < 1min, so this backlog is not huge.
998 0 : // - 500 files is low overhead for instantiating L0 sublevels from
999 0 : // scratch.
1000 0 : // - Lower values were observed to cause excessive and inefficient
1001 0 : // compactions out of L0 in a TPCC import benchmark.
1002 0 : // Why not larger?:
1003 0 : // - More than 1min to compact everything out of L0.
1004 0 : // - CockroachDB's admission control system uses a threshold of 1000
1005 0 : // files to start throttling writes to Pebble. Using 500 here gives
1006 0 : // us headroom between when Pebble should start compacting L0 and
1007 0 : // when the admission control threshold is reached.
1008 0 : //
1009 0 : // We can revisit this default in the future based on better
1010 0 : // experimental understanding.
1011 0 : //
1012 0 : // TODO(jackson): Experiment with slightly lower thresholds [or higher
1013 0 : // admission control thresholds] to see whether a higher L0 score at the
1014 0 : // threshold (currently 2.0) is necessary for some workloads to avoid
1015 0 : // starving L0 in favor of lower-level compactions.
1016 0 : o.L0CompactionFileThreshold = 500
1017 0 : }
1018 1 : if o.L0StopWritesThreshold <= 0 {
1019 0 : o.L0StopWritesThreshold = 12
1020 0 : }
1021 1 : if o.LBaseMaxBytes <= 0 {
1022 0 : o.LBaseMaxBytes = 64 << 20 // 64 MB
1023 0 : }
1024 1 : if o.Levels == nil {
1025 0 : o.Levels = make([]LevelOptions, 1)
1026 0 : for i := range o.Levels {
1027 0 : if i > 0 {
1028 0 : l := &o.Levels[i]
1029 0 : if l.TargetFileSize <= 0 {
1030 0 : l.TargetFileSize = o.Levels[i-1].TargetFileSize * 2
1031 0 : }
1032 : }
1033 0 : o.Levels[i].EnsureDefaults()
1034 : }
1035 1 : } else {
1036 1 : for i := range o.Levels {
1037 1 : o.Levels[i].EnsureDefaults()
1038 1 : }
1039 : }
1040 1 : if o.Logger == nil {
1041 1 : o.Logger = DefaultLogger
1042 1 : }
1043 1 : if o.EventListener == nil {
1044 1 : o.EventListener = &EventListener{}
1045 1 : }
1046 1 : o.EventListener.EnsureDefaults(o.Logger)
1047 1 : if o.MaxManifestFileSize == 0 {
1048 0 : o.MaxManifestFileSize = 128 << 20 // 128 MB
1049 0 : }
1050 1 : if o.MaxOpenFiles == 0 {
1051 0 : o.MaxOpenFiles = 1000
1052 0 : }
1053 1 : if o.MemTableSize <= 0 {
1054 0 : o.MemTableSize = 4 << 20 // 4 MB
1055 0 : }
1056 1 : if o.MemTableStopWritesThreshold <= 0 {
1057 0 : o.MemTableStopWritesThreshold = 2
1058 0 : }
1059 1 : if o.Merger == nil {
1060 0 : o.Merger = DefaultMerger
1061 0 : }
1062 1 : o.private.strictWALTail = true
1063 1 : if o.MaxConcurrentCompactions == nil {
1064 0 : o.MaxConcurrentCompactions = func() int { return 1 }
1065 : }
1066 1 : if o.NumPrevManifest <= 0 {
1067 1 : o.NumPrevManifest = 1
1068 1 : }
1069 :
1070 1 : if o.FormatMajorVersion == FormatDefault {
1071 0 : o.FormatMajorVersion = FormatMostCompatible
1072 0 : }
1073 :
1074 1 : if o.FS == nil {
1075 0 : o.WithFSDefaults()
1076 0 : }
1077 1 : if o.FlushSplitBytes <= 0 {
1078 0 : o.FlushSplitBytes = 2 * o.Levels[0].TargetFileSize
1079 0 : }
1080 1 : if o.Experimental.LevelMultiplier <= 0 {
1081 1 : o.Experimental.LevelMultiplier = defaultLevelMultiplier
1082 1 : }
1083 1 : if o.Experimental.ReadCompactionRate == 0 {
1084 0 : o.Experimental.ReadCompactionRate = 16000
1085 0 : }
1086 1 : if o.Experimental.ReadSamplingMultiplier == 0 {
1087 0 : o.Experimental.ReadSamplingMultiplier = 1 << 4
1088 0 : }
1089 1 : if o.Experimental.TableCacheShards <= 0 {
1090 0 : o.Experimental.TableCacheShards = runtime.GOMAXPROCS(0)
1091 0 : }
1092 1 : if o.Experimental.CPUWorkPermissionGranter == nil {
1093 1 : o.Experimental.CPUWorkPermissionGranter = defaultCPUWorkGranter{}
1094 1 : }
1095 1 : if o.Experimental.MultiLevelCompactionHeuristic == nil {
1096 1 : o.Experimental.MultiLevelCompactionHeuristic = WriteAmpHeuristic{}
1097 1 : }
1098 :
1099 1 : o.initMaps()
1100 1 : return o
1101 : }
1102 :
1103 : // WithFSDefaults configures the Options to wrap the configured filesystem with
1104 : // the default virtual file system middleware, like disk-health checking.
1105 1 : func (o *Options) WithFSDefaults() *Options {
1106 1 : if o.FS == nil {
1107 0 : o.FS = vfs.Default
1108 0 : }
1109 1 : o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(o.FS, 5*time.Second,
1110 1 : func(info vfs.DiskSlowInfo) {
1111 0 : o.EventListener.DiskSlow(info)
1112 0 : })
1113 1 : return o
1114 : }
1115 :
1116 : // AddEventListener adds the provided event listener to the Options, in addition
1117 : // to any existing event listener.
1118 0 : func (o *Options) AddEventListener(l EventListener) {
1119 0 : if o.EventListener != nil {
1120 0 : l = TeeEventListener(l, *o.EventListener)
1121 0 : }
1122 0 : o.EventListener = &l
1123 : }
1124 :
1125 1 : func (o *Options) equal() Equal {
1126 1 : if o.Comparer.Equal == nil {
1127 0 : return bytes.Equal
1128 0 : }
1129 1 : return o.Comparer.Equal
1130 : }
1131 :
1132 : // initMaps initializes the Comparers, Filters, and Mergers maps.
1133 1 : func (o *Options) initMaps() {
1134 1 : for i := range o.Levels {
1135 1 : l := &o.Levels[i]
1136 1 : if l.FilterPolicy != nil {
1137 1 : if o.Filters == nil {
1138 1 : o.Filters = make(map[string]FilterPolicy)
1139 1 : }
1140 1 : name := l.FilterPolicy.Name()
1141 1 : if _, ok := o.Filters[name]; !ok {
1142 1 : o.Filters[name] = l.FilterPolicy
1143 1 : }
1144 : }
1145 : }
1146 : }
1147 :
1148 : // Level returns the LevelOptions for the specified level.
1149 1 : func (o *Options) Level(level int) LevelOptions {
1150 1 : if level < len(o.Levels) {
1151 1 : return o.Levels[level]
1152 1 : }
1153 1 : n := len(o.Levels) - 1
1154 1 : l := o.Levels[n]
1155 1 : for i := n; i < level; i++ {
1156 1 : l.TargetFileSize *= 2
1157 1 : }
1158 1 : return l
1159 : }
1160 :
1161 : // Clone creates a shallow-copy of the supplied options.
1162 1 : func (o *Options) Clone() *Options {
1163 1 : n := &Options{}
1164 1 : if o != nil {
1165 1 : *n = *o
1166 1 : }
1167 1 : return n
1168 : }
1169 :
1170 1 : func filterPolicyName(p FilterPolicy) string {
1171 1 : if p == nil {
1172 1 : return "none"
1173 1 : }
1174 1 : return p.Name()
1175 : }
1176 :
1177 1 : func (o *Options) String() string {
1178 1 : var buf bytes.Buffer
1179 1 :
1180 1 : cacheSize := int64(cacheDefaultSize)
1181 1 : if o.Cache != nil {
1182 1 : cacheSize = o.Cache.MaxSize()
1183 1 : }
1184 :
1185 1 : fmt.Fprintf(&buf, "[Version]\n")
1186 1 : fmt.Fprintf(&buf, " pebble_version=0.1\n")
1187 1 : fmt.Fprintf(&buf, "\n")
1188 1 : fmt.Fprintf(&buf, "[Options]\n")
1189 1 : fmt.Fprintf(&buf, " bytes_per_sync=%d\n", o.BytesPerSync)
1190 1 : fmt.Fprintf(&buf, " cache_size=%d\n", cacheSize)
1191 1 : fmt.Fprintf(&buf, " cleaner=%s\n", o.Cleaner)
1192 1 : fmt.Fprintf(&buf, " compaction_debt_concurrency=%d\n", o.Experimental.CompactionDebtConcurrency)
1193 1 : fmt.Fprintf(&buf, " comparer=%s\n", o.Comparer.Name)
1194 1 : fmt.Fprintf(&buf, " disable_wal=%t\n", o.DisableWAL)
1195 1 : if o.Experimental.DisableIngestAsFlushable != nil && o.Experimental.DisableIngestAsFlushable() {
1196 1 : fmt.Fprintf(&buf, " disable_ingest_as_flushable=%t\n", true)
1197 1 : }
1198 1 : fmt.Fprintf(&buf, " flush_delay_delete_range=%s\n", o.FlushDelayDeleteRange)
1199 1 : fmt.Fprintf(&buf, " flush_delay_range_key=%s\n", o.FlushDelayRangeKey)
1200 1 : fmt.Fprintf(&buf, " flush_split_bytes=%d\n", o.FlushSplitBytes)
1201 1 : fmt.Fprintf(&buf, " format_major_version=%d\n", o.FormatMajorVersion)
1202 1 : fmt.Fprintf(&buf, " l0_compaction_concurrency=%d\n", o.Experimental.L0CompactionConcurrency)
1203 1 : fmt.Fprintf(&buf, " l0_compaction_file_threshold=%d\n", o.L0CompactionFileThreshold)
1204 1 : fmt.Fprintf(&buf, " l0_compaction_threshold=%d\n", o.L0CompactionThreshold)
1205 1 : fmt.Fprintf(&buf, " l0_stop_writes_threshold=%d\n", o.L0StopWritesThreshold)
1206 1 : fmt.Fprintf(&buf, " lbase_max_bytes=%d\n", o.LBaseMaxBytes)
1207 1 : if o.Experimental.LevelMultiplier != defaultLevelMultiplier {
1208 1 : fmt.Fprintf(&buf, " level_multiplier=%d\n", o.Experimental.LevelMultiplier)
1209 1 : }
1210 1 : fmt.Fprintf(&buf, " max_concurrent_compactions=%d\n", o.MaxConcurrentCompactions())
1211 1 : fmt.Fprintf(&buf, " max_manifest_file_size=%d\n", o.MaxManifestFileSize)
1212 1 : fmt.Fprintf(&buf, " max_open_files=%d\n", o.MaxOpenFiles)
1213 1 : fmt.Fprintf(&buf, " mem_table_size=%d\n", o.MemTableSize)
1214 1 : fmt.Fprintf(&buf, " mem_table_stop_writes_threshold=%d\n", o.MemTableStopWritesThreshold)
1215 1 : fmt.Fprintf(&buf, " min_deletion_rate=%d\n", o.TargetByteDeletionRate)
1216 1 : fmt.Fprintf(&buf, " merger=%s\n", o.Merger.Name)
1217 1 : fmt.Fprintf(&buf, " read_compaction_rate=%d\n", o.Experimental.ReadCompactionRate)
1218 1 : fmt.Fprintf(&buf, " read_sampling_multiplier=%d\n", o.Experimental.ReadSamplingMultiplier)
1219 1 : fmt.Fprintf(&buf, " strict_wal_tail=%t\n", o.private.strictWALTail)
1220 1 : fmt.Fprintf(&buf, " table_cache_shards=%d\n", o.Experimental.TableCacheShards)
1221 1 : fmt.Fprintf(&buf, " table_property_collectors=[")
1222 1 : for i := range o.TablePropertyCollectors {
1223 0 : if i > 0 {
1224 0 : fmt.Fprintf(&buf, ",")
1225 0 : }
1226 : // NB: This creates a new TablePropertyCollector, but Options.String() is
1227 : // called rarely so the overhead of doing so is not consequential.
1228 0 : fmt.Fprintf(&buf, "%s", o.TablePropertyCollectors[i]().Name())
1229 : }
1230 1 : fmt.Fprintf(&buf, "]\n")
1231 1 : fmt.Fprintf(&buf, " validate_on_ingest=%t\n", o.Experimental.ValidateOnIngest)
1232 1 : fmt.Fprintf(&buf, " wal_dir=%s\n", o.WALDir)
1233 1 : fmt.Fprintf(&buf, " wal_bytes_per_sync=%d\n", o.WALBytesPerSync)
1234 1 : fmt.Fprintf(&buf, " max_writer_concurrency=%d\n", o.Experimental.MaxWriterConcurrency)
1235 1 : fmt.Fprintf(&buf, " force_writer_parallelism=%t\n", o.Experimental.ForceWriterParallelism)
1236 1 : fmt.Fprintf(&buf, " secondary_cache_size_bytes=%d\n", o.Experimental.SecondaryCacheSizeBytes)
1237 1 :
1238 1 : // Private options.
1239 1 : //
1240 1 : // These options are only encoded if true, because we do not want them to
1241 1 : // appear in production serialized Options files, since they're testing-only
1242 1 : // options. They're only serialized when true, which still ensures that the
1243 1 : // metamorphic tests may propagate them to subprocesses.
1244 1 : if o.private.disableDeleteOnlyCompactions {
1245 1 : fmt.Fprintln(&buf, " disable_delete_only_compactions=true")
1246 1 : }
1247 1 : if o.private.disableElisionOnlyCompactions {
1248 1 : fmt.Fprintln(&buf, " disable_elision_only_compactions=true")
1249 1 : }
1250 1 : if o.private.disableLazyCombinedIteration {
1251 1 : fmt.Fprintln(&buf, " disable_lazy_combined_iteration=true")
1252 1 : }
1253 :
1254 1 : for i := range o.Levels {
1255 1 : l := &o.Levels[i]
1256 1 : fmt.Fprintf(&buf, "\n")
1257 1 : fmt.Fprintf(&buf, "[Level \"%d\"]\n", i)
1258 1 : fmt.Fprintf(&buf, " block_restart_interval=%d\n", l.BlockRestartInterval)
1259 1 : fmt.Fprintf(&buf, " block_size=%d\n", l.BlockSize)
1260 1 : fmt.Fprintf(&buf, " block_size_threshold=%d\n", l.BlockSizeThreshold)
1261 1 : fmt.Fprintf(&buf, " compression=%s\n", l.Compression)
1262 1 : fmt.Fprintf(&buf, " filter_policy=%s\n", filterPolicyName(l.FilterPolicy))
1263 1 : fmt.Fprintf(&buf, " filter_type=%s\n", l.FilterType)
1264 1 : fmt.Fprintf(&buf, " index_block_size=%d\n", l.IndexBlockSize)
1265 1 : fmt.Fprintf(&buf, " target_file_size=%d\n", l.TargetFileSize)
1266 1 : }
1267 :
1268 1 : return buf.String()
1269 : }
1270 :
1271 1 : func parseOptions(s string, fn func(section, key, value string) error) error {
1272 1 : var section string
1273 1 : for _, line := range strings.Split(s, "\n") {
1274 1 : line = strings.TrimSpace(line)
1275 1 : if len(line) == 0 {
1276 1 : // Skip blank lines.
1277 1 : continue
1278 : }
1279 1 : if line[0] == ';' || line[0] == '#' {
1280 0 : // Skip comments.
1281 0 : continue
1282 : }
1283 1 : n := len(line)
1284 1 : if line[0] == '[' && line[n-1] == ']' {
1285 1 : // Parse section.
1286 1 : section = line[1 : n-1]
1287 1 : continue
1288 : }
1289 :
1290 1 : pos := strings.Index(line, "=")
1291 1 : if pos < 0 {
1292 0 : const maxLen = 50
1293 0 : if len(line) > maxLen {
1294 0 : line = line[:maxLen-3] + "..."
1295 0 : }
1296 0 : return base.CorruptionErrorf("invalid key=value syntax: %q", errors.Safe(line))
1297 : }
1298 :
1299 1 : key := strings.TrimSpace(line[:pos])
1300 1 : value := strings.TrimSpace(line[pos+1:])
1301 1 :
1302 1 : // RocksDB uses a similar (INI-style) syntax for the OPTIONS file, but
1303 1 : // different section names and keys. The "CFOptions ..." paths are the
1304 1 : // RocksDB versions which we map to the Pebble paths.
1305 1 : mappedSection := section
1306 1 : if section == `CFOptions "default"` {
1307 0 : mappedSection = "Options"
1308 0 : switch key {
1309 0 : case "comparator":
1310 0 : key = "comparer"
1311 0 : case "merge_operator":
1312 0 : key = "merger"
1313 : }
1314 : }
1315 :
1316 1 : if err := fn(mappedSection, key, value); err != nil {
1317 0 : return err
1318 0 : }
1319 : }
1320 1 : return nil
1321 : }
1322 :
1323 : // ParseHooks contains callbacks to create options fields which can have
1324 : // user-defined implementations.
1325 : type ParseHooks struct {
1326 : NewCache func(size int64) *Cache
1327 : NewCleaner func(name string) (Cleaner, error)
1328 : NewComparer func(name string) (*Comparer, error)
1329 : NewFilterPolicy func(name string) (FilterPolicy, error)
1330 : NewMerger func(name string) (*Merger, error)
1331 : SkipUnknown func(name, value string) bool
1332 : }
1333 :
1334 : // Parse parses the options from the specified string. Note that certain
1335 : // options cannot be parsed into populated fields. For example, comparer and
1336 : // merger.
1337 1 : func (o *Options) Parse(s string, hooks *ParseHooks) error {
1338 1 : return parseOptions(s, func(section, key, value string) error {
1339 1 : // WARNING: DO NOT remove entries from the switches below because doing so
1340 1 : // causes a key previously written to the OPTIONS file to be considered unknown,
1341 1 : // a backwards incompatible change. Instead, leave in support for parsing the
1342 1 : // key but simply don't parse the value.
1343 1 :
1344 1 : switch {
1345 1 : case section == "Version":
1346 1 : switch key {
1347 1 : case "pebble_version":
1348 0 : default:
1349 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
1350 0 : return nil
1351 0 : }
1352 0 : return errors.Errorf("pebble: unknown option: %s.%s",
1353 0 : errors.Safe(section), errors.Safe(key))
1354 : }
1355 1 : return nil
1356 :
1357 1 : case section == "Options":
1358 1 : var err error
1359 1 : switch key {
1360 1 : case "bytes_per_sync":
1361 1 : o.BytesPerSync, err = strconv.Atoi(value)
1362 1 : case "cache_size":
1363 1 : var n int64
1364 1 : n, err = strconv.ParseInt(value, 10, 64)
1365 1 : if err == nil && hooks != nil && hooks.NewCache != nil {
1366 1 : if o.Cache != nil {
1367 0 : o.Cache.Unref()
1368 0 : }
1369 1 : o.Cache = hooks.NewCache(n)
1370 : }
1371 : // We avoid calling cache.New in parsing because it makes it
1372 : // too easy to leak a cache.
1373 1 : case "cleaner":
1374 1 : switch value {
1375 0 : case "archive":
1376 0 : o.Cleaner = ArchiveCleaner{}
1377 1 : case "delete":
1378 1 : o.Cleaner = DeleteCleaner{}
1379 0 : default:
1380 0 : if hooks != nil && hooks.NewCleaner != nil {
1381 0 : o.Cleaner, err = hooks.NewCleaner(value)
1382 0 : }
1383 : }
1384 1 : case "comparer":
1385 1 : switch value {
1386 0 : case "leveldb.BytewiseComparator":
1387 0 : o.Comparer = DefaultComparer
1388 1 : default:
1389 1 : if hooks != nil && hooks.NewComparer != nil {
1390 0 : o.Comparer, err = hooks.NewComparer(value)
1391 0 : }
1392 : }
1393 1 : case "compaction_debt_concurrency":
1394 1 : o.Experimental.CompactionDebtConcurrency, err = strconv.ParseUint(value, 10, 64)
1395 0 : case "delete_range_flush_delay":
1396 0 : // NB: This is a deprecated serialization of the
1397 0 : // `flush_delay_delete_range`.
1398 0 : o.FlushDelayDeleteRange, err = time.ParseDuration(value)
1399 1 : case "disable_delete_only_compactions":
1400 1 : o.private.disableDeleteOnlyCompactions, err = strconv.ParseBool(value)
1401 1 : case "disable_elision_only_compactions":
1402 1 : o.private.disableElisionOnlyCompactions, err = strconv.ParseBool(value)
1403 1 : case "disable_ingest_as_flushable":
1404 1 : var v bool
1405 1 : v, err = strconv.ParseBool(value)
1406 1 : if err == nil {
1407 1 : o.Experimental.DisableIngestAsFlushable = func() bool { return v }
1408 : }
1409 1 : case "disable_lazy_combined_iteration":
1410 1 : o.private.disableLazyCombinedIteration, err = strconv.ParseBool(value)
1411 1 : case "disable_wal":
1412 1 : o.DisableWAL, err = strconv.ParseBool(value)
1413 1 : case "flush_delay_delete_range":
1414 1 : o.FlushDelayDeleteRange, err = time.ParseDuration(value)
1415 1 : case "flush_delay_range_key":
1416 1 : o.FlushDelayRangeKey, err = time.ParseDuration(value)
1417 1 : case "flush_split_bytes":
1418 1 : o.FlushSplitBytes, err = strconv.ParseInt(value, 10, 64)
1419 1 : case "format_major_version":
1420 1 : // NB: The version written here may be stale. Open does
1421 1 : // not use the format major version encoded in the
1422 1 : // OPTIONS file other than to validate that the encoded
1423 1 : // version is valid right here.
1424 1 : var v uint64
1425 1 : v, err = strconv.ParseUint(value, 10, 64)
1426 1 : if vers := FormatMajorVersion(v); vers > internalFormatNewest || vers == FormatDefault {
1427 0 : err = errors.Newf("unknown format major version %d", o.FormatMajorVersion)
1428 0 : }
1429 1 : if err == nil {
1430 1 : o.FormatMajorVersion = FormatMajorVersion(v)
1431 1 : }
1432 1 : case "l0_compaction_concurrency":
1433 1 : o.Experimental.L0CompactionConcurrency, err = strconv.Atoi(value)
1434 1 : case "l0_compaction_file_threshold":
1435 1 : o.L0CompactionFileThreshold, err = strconv.Atoi(value)
1436 1 : case "l0_compaction_threshold":
1437 1 : o.L0CompactionThreshold, err = strconv.Atoi(value)
1438 1 : case "l0_stop_writes_threshold":
1439 1 : o.L0StopWritesThreshold, err = strconv.Atoi(value)
1440 0 : case "l0_sublevel_compactions":
1441 : // Do nothing; option existed in older versions of pebble.
1442 1 : case "lbase_max_bytes":
1443 1 : o.LBaseMaxBytes, err = strconv.ParseInt(value, 10, 64)
1444 1 : case "level_multiplier":
1445 1 : o.Experimental.LevelMultiplier, err = strconv.Atoi(value)
1446 1 : case "max_concurrent_compactions":
1447 1 : var concurrentCompactions int
1448 1 : concurrentCompactions, err = strconv.Atoi(value)
1449 1 : if concurrentCompactions <= 0 {
1450 0 : err = errors.New("max_concurrent_compactions cannot be <= 0")
1451 1 : } else {
1452 1 : o.MaxConcurrentCompactions = func() int { return concurrentCompactions }
1453 : }
1454 1 : case "max_manifest_file_size":
1455 1 : o.MaxManifestFileSize, err = strconv.ParseInt(value, 10, 64)
1456 1 : case "max_open_files":
1457 1 : o.MaxOpenFiles, err = strconv.Atoi(value)
1458 1 : case "mem_table_size":
1459 1 : o.MemTableSize, err = strconv.ParseUint(value, 10, 64)
1460 1 : case "mem_table_stop_writes_threshold":
1461 1 : o.MemTableStopWritesThreshold, err = strconv.Atoi(value)
1462 0 : case "min_compaction_rate":
1463 : // Do nothing; option existed in older versions of pebble, and
1464 : // may be meaningful again eventually.
1465 1 : case "min_deletion_rate":
1466 1 : o.TargetByteDeletionRate, err = strconv.Atoi(value)
1467 0 : case "min_flush_rate":
1468 : // Do nothing; option existed in older versions of pebble, and
1469 : // may be meaningful again eventually.
1470 0 : case "point_tombstone_weight":
1471 : // Do nothing; deprecated.
1472 1 : case "strict_wal_tail":
1473 1 : o.private.strictWALTail, err = strconv.ParseBool(value)
1474 1 : case "merger":
1475 1 : switch value {
1476 0 : case "nullptr":
1477 0 : o.Merger = nil
1478 1 : case "pebble.concatenate":
1479 1 : o.Merger = DefaultMerger
1480 0 : default:
1481 0 : if hooks != nil && hooks.NewMerger != nil {
1482 0 : o.Merger, err = hooks.NewMerger(value)
1483 0 : }
1484 : }
1485 1 : case "read_compaction_rate":
1486 1 : o.Experimental.ReadCompactionRate, err = strconv.ParseInt(value, 10, 64)
1487 1 : case "read_sampling_multiplier":
1488 1 : o.Experimental.ReadSamplingMultiplier, err = strconv.ParseInt(value, 10, 64)
1489 1 : case "table_cache_shards":
1490 1 : o.Experimental.TableCacheShards, err = strconv.Atoi(value)
1491 0 : case "table_format":
1492 0 : switch value {
1493 0 : case "leveldb":
1494 0 : case "rocksdbv2":
1495 0 : default:
1496 0 : return errors.Errorf("pebble: unknown table format: %q", errors.Safe(value))
1497 : }
1498 1 : case "table_property_collectors":
1499 : // TODO(peter): set o.TablePropertyCollectors
1500 1 : case "validate_on_ingest":
1501 1 : o.Experimental.ValidateOnIngest, err = strconv.ParseBool(value)
1502 1 : case "wal_dir":
1503 1 : o.WALDir = value
1504 1 : case "wal_bytes_per_sync":
1505 1 : o.WALBytesPerSync, err = strconv.Atoi(value)
1506 1 : case "max_writer_concurrency":
1507 1 : o.Experimental.MaxWriterConcurrency, err = strconv.Atoi(value)
1508 1 : case "force_writer_parallelism":
1509 1 : o.Experimental.ForceWriterParallelism, err = strconv.ParseBool(value)
1510 1 : case "secondary_cache_size_bytes":
1511 1 : o.Experimental.SecondaryCacheSizeBytes, err = strconv.ParseInt(value, 10, 64)
1512 0 : default:
1513 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
1514 0 : return nil
1515 0 : }
1516 0 : return errors.Errorf("pebble: unknown option: %s.%s",
1517 0 : errors.Safe(section), errors.Safe(key))
1518 : }
1519 1 : return err
1520 :
1521 1 : case strings.HasPrefix(section, "Level "):
1522 1 : var index int
1523 1 : if n, err := fmt.Sscanf(section, `Level "%d"`, &index); err != nil {
1524 0 : return err
1525 1 : } else if n != 1 {
1526 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section, value) {
1527 0 : return nil
1528 0 : }
1529 0 : return errors.Errorf("pebble: unknown section: %q", errors.Safe(section))
1530 : }
1531 :
1532 1 : if len(o.Levels) <= index {
1533 1 : newLevels := make([]LevelOptions, index+1)
1534 1 : copy(newLevels, o.Levels)
1535 1 : o.Levels = newLevels
1536 1 : }
1537 1 : l := &o.Levels[index]
1538 1 :
1539 1 : var err error
1540 1 : switch key {
1541 1 : case "block_restart_interval":
1542 1 : l.BlockRestartInterval, err = strconv.Atoi(value)
1543 1 : case "block_size":
1544 1 : l.BlockSize, err = strconv.Atoi(value)
1545 1 : case "block_size_threshold":
1546 1 : l.BlockSizeThreshold, err = strconv.Atoi(value)
1547 1 : case "compression":
1548 1 : switch value {
1549 0 : case "Default":
1550 0 : l.Compression = DefaultCompression
1551 1 : case "NoCompression":
1552 1 : l.Compression = NoCompression
1553 1 : case "Snappy":
1554 1 : l.Compression = SnappyCompression
1555 1 : case "ZSTD":
1556 1 : l.Compression = ZstdCompression
1557 0 : default:
1558 0 : return errors.Errorf("pebble: unknown compression: %q", errors.Safe(value))
1559 : }
1560 1 : case "filter_policy":
1561 1 : if hooks != nil && hooks.NewFilterPolicy != nil {
1562 1 : l.FilterPolicy, err = hooks.NewFilterPolicy(value)
1563 1 : }
1564 1 : case "filter_type":
1565 1 : switch value {
1566 1 : case "table":
1567 1 : l.FilterType = TableFilter
1568 0 : default:
1569 0 : return errors.Errorf("pebble: unknown filter type: %q", errors.Safe(value))
1570 : }
1571 1 : case "index_block_size":
1572 1 : l.IndexBlockSize, err = strconv.Atoi(value)
1573 1 : case "target_file_size":
1574 1 : l.TargetFileSize, err = strconv.ParseInt(value, 10, 64)
1575 0 : default:
1576 0 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
1577 0 : return nil
1578 0 : }
1579 0 : return errors.Errorf("pebble: unknown option: %s.%s", errors.Safe(section), errors.Safe(key))
1580 : }
1581 1 : return err
1582 : }
1583 1 : if hooks != nil && hooks.SkipUnknown != nil && hooks.SkipUnknown(section+"."+key, value) {
1584 1 : return nil
1585 1 : }
1586 0 : return errors.Errorf("pebble: unknown section: %q", errors.Safe(section))
1587 : })
1588 : }
1589 :
1590 1 : func (o *Options) checkOptions(s string) (strictWALTail bool, err error) {
1591 1 : // TODO(jackson): Refactor to avoid awkwardness of the strictWALTail return value.
1592 1 : return strictWALTail, parseOptions(s, func(section, key, value string) error {
1593 1 : switch section + "." + key {
1594 1 : case "Options.comparer":
1595 1 : if value != o.Comparer.Name {
1596 0 : return errors.Errorf("pebble: comparer name from file %q != comparer name from options %q",
1597 0 : errors.Safe(value), errors.Safe(o.Comparer.Name))
1598 0 : }
1599 1 : case "Options.merger":
1600 1 : // RocksDB allows the merge operator to be unspecified, in which case it
1601 1 : // shows up as "nullptr".
1602 1 : if value != "nullptr" && value != o.Merger.Name {
1603 0 : return errors.Errorf("pebble: merger name from file %q != merger name from options %q",
1604 0 : errors.Safe(value), errors.Safe(o.Merger.Name))
1605 0 : }
1606 1 : case "Options.strict_wal_tail":
1607 1 : strictWALTail, err = strconv.ParseBool(value)
1608 1 : if err != nil {
1609 0 : return errors.Errorf("pebble: error parsing strict_wal_tail value %q: %w", value, err)
1610 0 : }
1611 : }
1612 1 : return nil
1613 : })
1614 : }
1615 :
1616 : // Check verifies the options are compatible with the previous options
1617 : // serialized by Options.String(). For example, the Comparer and Merger must be
1618 : // the same, or data will not be able to be properly read from the DB.
1619 0 : func (o *Options) Check(s string) error {
1620 0 : _, err := o.checkOptions(s)
1621 0 : return err
1622 0 : }
1623 :
1624 : // Validate verifies that the options are mutually consistent. For example,
1625 : // L0StopWritesThreshold must be >= L0CompactionThreshold, otherwise a write
1626 : // stall would persist indefinitely.
1627 1 : func (o *Options) Validate() error {
1628 1 : // Note that we can presume Options.EnsureDefaults has been called, so there
1629 1 : // is no need to check for zero values.
1630 1 :
1631 1 : var buf strings.Builder
1632 1 : if o.Experimental.L0CompactionConcurrency < 1 {
1633 0 : fmt.Fprintf(&buf, "L0CompactionConcurrency (%d) must be >= 1\n",
1634 0 : o.Experimental.L0CompactionConcurrency)
1635 0 : }
1636 1 : if o.L0StopWritesThreshold < o.L0CompactionThreshold {
1637 0 : fmt.Fprintf(&buf, "L0StopWritesThreshold (%d) must be >= L0CompactionThreshold (%d)\n",
1638 0 : o.L0StopWritesThreshold, o.L0CompactionThreshold)
1639 0 : }
1640 1 : if uint64(o.MemTableSize) >= maxMemTableSize {
1641 0 : fmt.Fprintf(&buf, "MemTableSize (%s) must be < %s\n",
1642 0 : humanize.Bytes.Uint64(uint64(o.MemTableSize)), humanize.Bytes.Uint64(maxMemTableSize))
1643 0 : }
1644 1 : if o.MemTableStopWritesThreshold < 2 {
1645 0 : fmt.Fprintf(&buf, "MemTableStopWritesThreshold (%d) must be >= 2\n",
1646 0 : o.MemTableStopWritesThreshold)
1647 0 : }
1648 1 : if o.FormatMajorVersion > internalFormatNewest {
1649 0 : fmt.Fprintf(&buf, "FormatMajorVersion (%d) must be <= %d\n",
1650 0 : o.FormatMajorVersion, internalFormatNewest)
1651 0 : }
1652 1 : if o.TableCache != nil && o.Cache != o.TableCache.cache {
1653 0 : fmt.Fprintf(&buf, "underlying cache in the TableCache and the Cache dont match\n")
1654 0 : }
1655 1 : if buf.Len() == 0 {
1656 1 : return nil
1657 1 : }
1658 0 : return errors.New(buf.String())
1659 : }
1660 :
1661 : // MakeReaderOptions constructs sstable.ReaderOptions from the corresponding
1662 : // options in the receiver.
1663 1 : func (o *Options) MakeReaderOptions() sstable.ReaderOptions {
1664 1 : var readerOpts sstable.ReaderOptions
1665 1 : if o != nil {
1666 1 : readerOpts.Cache = o.Cache
1667 1 : readerOpts.Comparer = o.Comparer
1668 1 : readerOpts.Filters = o.Filters
1669 1 : if o.Merger != nil {
1670 1 : readerOpts.Merge = o.Merger.Merge
1671 1 : readerOpts.MergerName = o.Merger.Name
1672 1 : }
1673 1 : readerOpts.LoggerAndTracer = o.LoggerAndTracer
1674 : }
1675 1 : return readerOpts
1676 : }
1677 :
1678 : // MakeWriterOptions constructs sstable.WriterOptions for the specified level
1679 : // from the corresponding options in the receiver.
1680 1 : func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstable.WriterOptions {
1681 1 : var writerOpts sstable.WriterOptions
1682 1 : writerOpts.TableFormat = format
1683 1 : if o != nil {
1684 1 : writerOpts.Cache = o.Cache
1685 1 : writerOpts.Comparer = o.Comparer
1686 1 : if o.Merger != nil {
1687 1 : writerOpts.MergerName = o.Merger.Name
1688 1 : }
1689 1 : writerOpts.TablePropertyCollectors = o.TablePropertyCollectors
1690 1 : writerOpts.BlockPropertyCollectors = o.BlockPropertyCollectors
1691 : }
1692 1 : if format >= sstable.TableFormatPebblev3 {
1693 1 : writerOpts.ShortAttributeExtractor = o.Experimental.ShortAttributeExtractor
1694 1 : writerOpts.RequiredInPlaceValueBound = o.Experimental.RequiredInPlaceValueBound
1695 1 : if format >= sstable.TableFormatPebblev4 && level == numLevels-1 {
1696 1 : writerOpts.WritingToLowestLevel = true
1697 1 : }
1698 : }
1699 1 : levelOpts := o.Level(level)
1700 1 : writerOpts.BlockRestartInterval = levelOpts.BlockRestartInterval
1701 1 : writerOpts.BlockSize = levelOpts.BlockSize
1702 1 : writerOpts.BlockSizeThreshold = levelOpts.BlockSizeThreshold
1703 1 : writerOpts.Compression = levelOpts.Compression
1704 1 : writerOpts.FilterPolicy = levelOpts.FilterPolicy
1705 1 : writerOpts.FilterType = levelOpts.FilterType
1706 1 : writerOpts.IndexBlockSize = levelOpts.IndexBlockSize
1707 1 : return writerOpts
1708 : }
|