Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package pebble provides an ordered key/value store.
6 : package pebble // import "github.com/cockroachdb/pebble"
7 :
8 : import (
9 : "context"
10 : "fmt"
11 : "io"
12 : "slices"
13 : "sync"
14 : "sync/atomic"
15 : "time"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/crlib/crtime"
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/pebble/internal/arenaskl"
21 : "github.com/cockroachdb/pebble/internal/base"
22 : "github.com/cockroachdb/pebble/internal/cache"
23 : "github.com/cockroachdb/pebble/internal/invalidating"
24 : "github.com/cockroachdb/pebble/internal/invariants"
25 : "github.com/cockroachdb/pebble/internal/keyspan"
26 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
27 : "github.com/cockroachdb/pebble/internal/manifest"
28 : "github.com/cockroachdb/pebble/internal/manual"
29 : "github.com/cockroachdb/pebble/internal/problemspans"
30 : "github.com/cockroachdb/pebble/objstorage"
31 : "github.com/cockroachdb/pebble/objstorage/remote"
32 : "github.com/cockroachdb/pebble/rangekey"
33 : "github.com/cockroachdb/pebble/record"
34 : "github.com/cockroachdb/pebble/sstable"
35 : "github.com/cockroachdb/pebble/sstable/block"
36 : "github.com/cockroachdb/pebble/vfs"
37 : "github.com/cockroachdb/pebble/vfs/atomicfs"
38 : "github.com/cockroachdb/pebble/wal"
39 : "github.com/cockroachdb/tokenbucket"
40 : "github.com/prometheus/client_golang/prometheus"
41 : )
42 :
43 : const (
44 : // minFileCacheSize is the minimum size of the file cache, for a single db.
45 : minFileCacheSize = 64
46 :
47 : // numNonFileCacheFiles is an approximation for the number of files
48 : // that we don't account for in the file cache, for a given db.
49 : numNonFileCacheFiles = 10
50 : )
51 :
52 : var (
53 : // ErrNotFound is returned when a get operation does not find the requested
54 : // key.
55 : ErrNotFound = base.ErrNotFound
56 : // ErrClosed is panicked when an operation is performed on a closed snapshot or
57 : // DB. Use errors.Is(err, ErrClosed) to check for this error.
58 : ErrClosed = errors.New("pebble: closed")
59 : // ErrReadOnly is returned when a write operation is performed on a read-only
60 : // database.
61 : ErrReadOnly = errors.New("pebble: read-only")
62 : // errNoSplit indicates that the user is trying to perform a range key
63 : // operation but the configured Comparer does not provide a Split
64 : // implementation.
65 : errNoSplit = errors.New("pebble: Comparer.Split required for range key operations")
66 : )
67 :
68 : // Reader is a readable key/value store.
69 : //
70 : // It is safe to call Get and NewIter from concurrent goroutines.
71 : type Reader interface {
72 : // Get gets the value for the given key. It returns ErrNotFound if the DB
73 : // does not contain the key.
74 : //
75 : // The caller should not modify the contents of the returned slice, but it is
76 : // safe to modify the contents of the argument after Get returns. The
77 : // returned slice will remain valid until the returned Closer is closed. On
78 : // success, the caller MUST call closer.Close() or a memory leak will occur.
79 : Get(key []byte) (value []byte, closer io.Closer, err error)
80 :
81 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
82 : // return false). The iterator can be positioned via a call to SeekGE,
83 : // SeekLT, First or Last.
84 : NewIter(o *IterOptions) (*Iterator, error)
85 :
86 : // NewIterWithContext is like NewIter, and additionally accepts a context
87 : // for tracing.
88 : NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error)
89 :
90 : // Close closes the Reader. It may or may not close any underlying io.Reader
91 : // or io.Writer, depending on how the DB was created.
92 : //
93 : // It is not safe to close a DB until all outstanding iterators are closed.
94 : // It is valid to call Close multiple times. Other methods should not be
95 : // called after the DB has been closed.
96 : Close() error
97 : }
98 :
99 : // Writer is a writable key/value store.
100 : //
101 : // Goroutine safety is dependent on the specific implementation.
102 : type Writer interface {
103 : // Apply the operations contained in the batch to the DB.
104 : //
105 : // It is safe to modify the contents of the arguments after Apply returns.
106 : Apply(batch *Batch, o *WriteOptions) error
107 :
108 : // Delete deletes the value for the given key. Deletes are blind all will
109 : // succeed even if the given key does not exist.
110 : //
111 : // It is safe to modify the contents of the arguments after Delete returns.
112 : Delete(key []byte, o *WriteOptions) error
113 :
114 : // DeleteSized behaves identically to Delete, but takes an additional
115 : // argument indicating the size of the value being deleted. DeleteSized
116 : // should be preferred when the caller has the expectation that there exists
117 : // a single internal KV pair for the key (eg, the key has not been
118 : // overwritten recently), and the caller knows the size of its value.
119 : //
120 : // DeleteSized will record the value size within the tombstone and use it to
121 : // inform compaction-picking heuristics which strive to reduce space
122 : // amplification in the LSM. This "calling your shot" mechanic allows the
123 : // storage engine to more accurately estimate and reduce space
124 : // amplification.
125 : //
126 : // It is safe to modify the contents of the arguments after DeleteSized
127 : // returns.
128 : DeleteSized(key []byte, valueSize uint32, _ *WriteOptions) error
129 :
130 : // SingleDelete is similar to Delete in that it deletes the value for the given key. Like Delete,
131 : // it is a blind operation that will succeed even if the given key does not exist.
132 : //
133 : // WARNING: Undefined (non-deterministic) behavior will result if a key is overwritten and
134 : // then deleted using SingleDelete. The record may appear deleted immediately, but be
135 : // resurrected at a later time after compactions have been performed. Or the record may
136 : // be deleted permanently. A Delete operation lays down a "tombstone" which shadows all
137 : // previous versions of a key. The SingleDelete operation is akin to "anti-matter" and will
138 : // only delete the most recently written version for a key. These different semantics allow
139 : // the DB to avoid propagating a SingleDelete operation during a compaction as soon as the
140 : // corresponding Set operation is encountered. These semantics require extreme care to handle
141 : // properly. Only use if you have a workload where the performance gain is critical and you
142 : // can guarantee that a record is written once and then deleted once.
143 : //
144 : // Note that SINGLEDEL, SET, SINGLEDEL, SET, DEL/RANGEDEL, ... from most
145 : // recent to older will work as intended since there is a single SET
146 : // sandwiched between SINGLEDEL/DEL/RANGEDEL.
147 : //
148 : // IMPLEMENTATION WARNING: By offering SingleDelete, Pebble must guarantee
149 : // that there is no duplication of writes inside Pebble. That is, idempotent
150 : // application of writes is insufficient. For example, if a SET operation
151 : // gets duplicated inside Pebble, resulting in say SET#20 and SET#17, the
152 : // caller may issue a SINGLEDEL#25 and it will not have the desired effect.
153 : // A duplication where a SET#20 is duplicated across two sstables will have
154 : // the same correctness problem, since the SINGLEDEL may meet one of the
155 : // SETs. This guarantee is partially achieved by ensuring that a WAL and a
156 : // flushable are usually in one-to-one correspondence, and atomically
157 : // updating the MANIFEST when the flushable is flushed (which ensures the
158 : // WAL will never be replayed). There is one exception: a flushableBatch (a
159 : // batch too large to fit in a memtable) is written to the end of the WAL
160 : // that it shares with the preceding memtable. This is safe because the
161 : // memtable and the flushableBatch are part of the same flush (see DB.flush1
162 : // where this invariant is maintained). If the memtable were to be flushed
163 : // without the flushableBatch, the WAL cannot yet be deleted and if a crash
164 : // happened, the WAL would be replayed despite the memtable already being
165 : // flushed.
166 : //
167 : // It is safe to modify the contents of the arguments after SingleDelete returns.
168 : SingleDelete(key []byte, o *WriteOptions) error
169 :
170 : // DeleteRange deletes all of the point keys (and values) in the range
171 : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
172 : // delete overlapping range keys (eg, keys set via RangeKeySet).
173 : //
174 : // It is safe to modify the contents of the arguments after DeleteRange
175 : // returns.
176 : DeleteRange(start, end []byte, o *WriteOptions) error
177 :
178 : // LogData adds the specified to the batch. The data will be written to the
179 : // WAL, but not added to memtables or sstables. Log data is never indexed,
180 : // which makes it useful for testing WAL performance.
181 : //
182 : // It is safe to modify the contents of the argument after LogData returns.
183 : LogData(data []byte, opts *WriteOptions) error
184 :
185 : // Merge merges the value for the given key. The details of the merge are
186 : // dependent upon the configured merge operation.
187 : //
188 : // It is safe to modify the contents of the arguments after Merge returns.
189 : Merge(key, value []byte, o *WriteOptions) error
190 :
191 : // Set sets the value for the given key. It overwrites any previous value
192 : // for that key; a DB is not a multi-map.
193 : //
194 : // It is safe to modify the contents of the arguments after Set returns.
195 : Set(key, value []byte, o *WriteOptions) error
196 :
197 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
198 : // timestamp suffix to value. The suffix is optional. If any portion of the key
199 : // range [start, end) is already set by a range key with the same suffix value,
200 : // RangeKeySet overrides it.
201 : //
202 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
203 : RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error
204 :
205 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
206 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
207 : // range key. RangeKeyUnset only removes portions of range keys that fall within
208 : // the [start, end) key span, and only range keys with suffixes that exactly
209 : // match the unset suffix.
210 : //
211 : // It is safe to modify the contents of the arguments after RangeKeyUnset
212 : // returns.
213 : RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error
214 :
215 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
216 : // (inclusive on start, exclusive on end). It does not delete point keys (for
217 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
218 : // bounds, including those with or without suffixes.
219 : //
220 : // It is safe to modify the contents of the arguments after RangeKeyDelete
221 : // returns.
222 : RangeKeyDelete(start, end []byte, opts *WriteOptions) error
223 : }
224 :
225 : // DB provides a concurrent, persistent ordered key/value store.
226 : //
227 : // A DB's basic operations (Get, Set, Delete) should be self-explanatory. Get
228 : // and Delete will return ErrNotFound if the requested key is not in the store.
229 : // Callers are free to ignore this error.
230 : //
231 : // A DB also allows for iterating over the key/value pairs in key order. If d
232 : // is a DB, the code below prints all key/value pairs whose keys are 'greater
233 : // than or equal to' k:
234 : //
235 : // iter := d.NewIter(readOptions)
236 : // for iter.SeekGE(k); iter.Valid(); iter.Next() {
237 : // fmt.Printf("key=%q value=%q\n", iter.Key(), iter.Value())
238 : // }
239 : // return iter.Close()
240 : //
241 : // The Options struct holds the optional parameters for the DB, including a
242 : // Comparer to define a 'less than' relationship over keys. It is always valid
243 : // to pass a nil *Options, which means to use the default parameter values. Any
244 : // zero field of a non-nil *Options also means to use the default value for
245 : // that parameter. Thus, the code below uses a custom Comparer, but the default
246 : // values for every other parameter:
247 : //
248 : // db := pebble.Open(&Options{
249 : // Comparer: myComparer,
250 : // })
251 : type DB struct {
252 : // The count and size of referenced memtables. This includes memtables
253 : // present in DB.mu.mem.queue, as well as memtables that have been flushed
254 : // but are still referenced by an inuse readState, as well as up to one
255 : // memTable waiting to be reused and stored in d.memTableRecycle.
256 : memTableCount atomic.Int64
257 : memTableReserved atomic.Int64 // number of bytes reserved in the cache for memtables
258 : // memTableRecycle holds a pointer to an obsolete memtable. The next
259 : // memtable allocation will reuse this memtable if it has not already been
260 : // recycled.
261 : memTableRecycle atomic.Pointer[memTable]
262 :
263 : // The logical size of the current WAL.
264 : logSize atomic.Uint64
265 : // The number of input bytes to the log. This is the raw size of the
266 : // batches written to the WAL, without the overhead of the record
267 : // envelopes.
268 : logBytesIn atomic.Uint64
269 :
270 : // The number of bytes available on disk.
271 : diskAvailBytes atomic.Uint64
272 : lowDiskSpaceReporter lowDiskSpaceReporter
273 :
274 : cacheHandle *cache.Handle
275 : dirname string
276 : opts *Options
277 : cmp Compare
278 : equal Equal
279 : merge Merge
280 : split Split
281 : abbreviatedKey AbbreviatedKey
282 : // The threshold for determining when a batch is "large" and will skip being
283 : // inserted into a memtable.
284 : largeBatchThreshold uint64
285 : // The current OPTIONS file number.
286 : optionsFileNum base.DiskFileNum
287 : // The on-disk size of the current OPTIONS file.
288 : optionsFileSize uint64
289 :
290 : // objProvider is used to access and manage SSTs.
291 : objProvider objstorage.Provider
292 :
293 : dataDirLock *base.DirLock
294 : dataDir vfs.File
295 :
296 : fileCache *fileCacheHandle
297 : newIters tableNewIters
298 : tableNewRangeKeyIter keyspanimpl.TableNewSpanIter
299 :
300 : commit *commitPipeline
301 :
302 : // readState provides access to the state needed for reading without needing
303 : // to acquire DB.mu.
304 : readState struct {
305 : sync.RWMutex
306 : val *readState
307 : }
308 :
309 : closed *atomic.Value
310 : closedCh chan struct{}
311 :
312 : cleanupManager *cleanupManager
313 :
314 : // During an iterator close, we may asynchronously schedule read compactions.
315 : // We want to wait for those goroutines to finish, before closing the DB.
316 : // compactionShedulers.Wait() should not be called while the DB.mu is held.
317 : compactionSchedulers sync.WaitGroup
318 :
319 : // The main mutex protecting internal DB state. This mutex encompasses many
320 : // fields because those fields need to be accessed and updated atomically. In
321 : // particular, the current version, log.*, mem.*, and snapshot list need to
322 : // be accessed and updated atomically during compaction.
323 : //
324 : // Care is taken to avoid holding DB.mu during IO operations. Accomplishing
325 : // this sometimes requires releasing DB.mu in a method that was called with
326 : // it held. See versionSet.UpdateVersionLocked() and DB.makeRoomForWrite() for
327 : // examples. This is a common pattern, so be careful about expectations that
328 : // DB.mu will be held continuously across a set of calls.
329 : mu struct {
330 : sync.Mutex
331 :
332 : formatVers struct {
333 : // vers is the database's current format major version.
334 : // Backwards-incompatible features are gated behind new
335 : // format major versions and not enabled until a database's
336 : // version is ratcheted upwards.
337 : //
338 : // Although this is under the `mu` prefix, readers may read vers
339 : // atomically without holding d.mu. Writers must only write to this
340 : // value through finalizeFormatVersUpgrade which requires d.mu is
341 : // held.
342 : vers atomic.Uint64
343 : // marker is the atomic marker for the format major version.
344 : // When a database's version is ratcheted upwards, the
345 : // marker is moved in order to atomically record the new
346 : // version.
347 : marker *atomicfs.Marker
348 : // ratcheting when set to true indicates that the database is
349 : // currently in the process of ratcheting the format major version
350 : // to vers + 1. As a part of ratcheting the format major version,
351 : // migrations may drop and re-acquire the mutex.
352 : ratcheting bool
353 : }
354 :
355 : // The ID of the next job. Job IDs are passed to event listener
356 : // notifications and act as a mechanism for tying together the events and
357 : // log messages for a single job such as a flush, compaction, or file
358 : // ingestion. Job IDs are not serialized to disk or used for correctness.
359 : nextJobID JobID
360 :
361 : // The collection of immutable versions and state about the log and visible
362 : // sequence numbers. Use the pointer here to ensure the atomic fields in
363 : // version set are aligned properly.
364 : versions *versionSet
365 :
366 : log struct {
367 : // manager is not protected by mu, but calls to Create must be
368 : // serialized, and happen after the previous writer is closed.
369 : manager wal.Manager
370 : // The Writer is protected by commitPipeline.mu. This allows log writes
371 : // to be performed without holding DB.mu, but requires both
372 : // commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
373 : // (i.e. makeRoomForWrite). Can be nil.
374 : writer wal.Writer
375 : metrics struct {
376 : // fsyncLatency has its own internal synchronization, and is not
377 : // protected by mu.
378 : fsyncLatency prometheus.Histogram
379 : // Updated whenever a wal.Writer is closed.
380 : record.LogWriterMetrics
381 : }
382 : }
383 :
384 : mem struct {
385 : // The current mutable memTable. Readers of the pointer may hold
386 : // either DB.mu or commitPipeline.mu.
387 : //
388 : // Its internal fields are protected by commitPipeline.mu. This
389 : // allows batch commits to be performed without DB.mu as long as no
390 : // memtable rotation is required.
391 : //
392 : // Both commitPipeline.mu and DB.mu must be held when rotating the
393 : // memtable.
394 : mutable *memTable
395 : // Queue of flushables (the mutable memtable is at end). Elements are
396 : // added to the end of the slice and removed from the beginning. Once an
397 : // index is set it is never modified making a fixed slice immutable and
398 : // safe for concurrent reads.
399 : queue flushableList
400 : // nextSize is the size of the next memtable. The memtable size starts at
401 : // min(256KB,Options.MemTableSize) and doubles each time a new memtable
402 : // is allocated up to Options.MemTableSize. This reduces the memory
403 : // footprint of memtables when lots of DB instances are used concurrently
404 : // in test environments.
405 : nextSize uint64
406 : }
407 :
408 : compact struct {
409 : // Condition variable used to signal when a flush or compaction has
410 : // completed. Used by the write-stall mechanism to wait for the stall
411 : // condition to clear. See DB.makeRoomForWrite().
412 : cond sync.Cond
413 : // True when a flush is in progress.
414 : flushing bool
415 : // The number of ongoing non-download compactions.
416 : compactingCount int
417 : // The number of download compactions.
418 : downloadingCount int
419 : // The list of deletion hints, suggesting ranges for delete-only
420 : // compactions.
421 : deletionHints []deleteCompactionHint
422 : // The list of manual compactions. The next manual compaction to perform
423 : // is at the start of the list. New entries are added to the end.
424 : manual []*manualCompaction
425 : manualLen atomic.Int32
426 : // manualID is used to identify manualCompactions in the manual slice.
427 : manualID uint64
428 : // downloads is the list of pending download tasks. The next download to
429 : // perform is at the start of the list. New entries are added to the end.
430 : downloads []*downloadSpanTask
431 : // inProgress is the set of in-progress flushes and compactions.
432 : // It's used in the calculation of some metrics and to initialize L0
433 : // sublevels' state. Some of the compactions contained within this
434 : // map may have already committed an edit to the version but are
435 : // lingering performing cleanup, like deleting obsolete files.
436 : inProgress map[compaction]struct{}
437 :
438 : // rescheduleReadCompaction indicates to an iterator that a read compaction
439 : // should be scheduled.
440 : rescheduleReadCompaction bool
441 :
442 : // readCompactions is a readCompactionQueue which keeps track of the
443 : // compactions which we might have to perform.
444 : readCompactions readCompactionQueue
445 :
446 : // The cumulative duration of all completed compactions since Open.
447 : // Does not include flushes.
448 : duration time.Duration
449 : // Flush throughput metric.
450 : flushWriteThroughput ThroughputMetric
451 : // The idle start time for the flush "loop", i.e., when the flushing
452 : // bool above transitions to false.
453 : noOngoingFlushStartTime crtime.Mono
454 : }
455 :
456 : fileDeletions struct {
457 : // Non-zero when file cleaning is disableCount. The disableCount
458 : // count acts as a reference count to prohibit file cleaning. See
459 : // DB.{disable,enable}FileDeletions().
460 : disableCount int
461 : // queuedStats holds cumulative stats for files that have been
462 : // queued for deletion by the cleanup manager. These stats are
463 : // monotonically increasing for the *DB's lifetime.
464 : queuedStats obsoleteObjectStats
465 : }
466 :
467 : snapshots struct {
468 : // The list of active snapshots.
469 : snapshotList
470 :
471 : // The cumulative count and size of snapshot-pinned keys written to
472 : // sstables.
473 : cumulativePinnedCount uint64
474 : cumulativePinnedSize uint64
475 : }
476 :
477 : tableStats struct {
478 : // Condition variable used to signal the completion of a
479 : // job to collect table stats.
480 : cond sync.Cond
481 : // True when a stat collection operation is in progress.
482 : loading bool
483 : // True if stat collection has loaded statistics for all tables
484 : // other than those listed explicitly in pending. This flag starts
485 : // as false when a database is opened and flips to true once stat
486 : // collection has caught up.
487 : loadedInitial bool
488 : // A slice of files for which stats have not been computed.
489 : // Compactions, ingests, flushes append files to be processed. An
490 : // active stat collection goroutine clears the list and processes
491 : // them.
492 : pending []manifest.NewTableEntry
493 : }
494 :
495 : tableValidation struct {
496 : // cond is a condition variable used to signal the completion of a
497 : // job to validate one or more sstables.
498 : cond sync.Cond
499 : // pending is a slice of metadata for sstables waiting to be
500 : // validated. Only physical sstables should be added to the pending
501 : // queue.
502 : pending []manifest.NewTableEntry
503 : // validating is set to true when validation is running.
504 : validating bool
505 : }
506 :
507 : // annotators contains various instances of manifest.Annotator which
508 : // should be protected from concurrent access.
509 : annotators struct {
510 : // totalFileSize is the sum of the size of all files in the
511 : // database. This includes local, remote, and external sstables --
512 : // along with blob files.
513 : totalFileSize *manifest.Annotator[uint64]
514 : remoteSize *manifest.Annotator[uint64]
515 : externalSize *manifest.Annotator[uint64]
516 : }
517 : }
518 :
519 : // problemSpans keeps track of spans of keys within LSM levels where
520 : // compactions have failed; used to avoid retrying these compactions too
521 : // quickly.
522 : problemSpans problemspans.ByLevel
523 :
524 : // Normally equal to time.Now() but may be overridden in tests.
525 : timeNow func() time.Time
526 : // the time at database Open; may be used to compute metrics like effective
527 : // compaction concurrency
528 : openedAt time.Time
529 : }
530 :
531 : var _ Reader = (*DB)(nil)
532 : var _ Writer = (*DB)(nil)
533 :
534 : // TestOnlyWaitForCleaning MUST only be used in tests.
535 0 : func (d *DB) TestOnlyWaitForCleaning() {
536 0 : d.cleanupManager.Wait()
537 0 : }
538 :
539 : // Get gets the value for the given key. It returns ErrNotFound if the DB does
540 : // not contain the key.
541 : //
542 : // The caller should not modify the contents of the returned slice, but it is
543 : // safe to modify the contents of the argument after Get returns. The returned
544 : // slice will remain valid until the returned Closer is closed. On success, the
545 : // caller MUST call closer.Close() or a memory leak will occur.
546 1 : func (d *DB) Get(key []byte) ([]byte, io.Closer, error) {
547 1 : return d.getInternal(key, nil /* batch */, nil /* snapshot */)
548 1 : }
549 :
550 : type getIterAlloc struct {
551 : dbi Iterator
552 : keyBuf []byte
553 : get getIter
554 : }
555 :
556 : var getIterAllocPool = sync.Pool{
557 1 : New: func() interface{} {
558 1 : return &getIterAlloc{}
559 1 : },
560 : }
561 :
562 1 : func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer, error) {
563 1 : if err := d.closed.Load(); err != nil {
564 0 : panic(err)
565 : }
566 :
567 : // Grab and reference the current readState. This prevents the underlying
568 : // files in the associated version from being deleted if there is a current
569 : // compaction. The readState is unref'd by Iterator.Close().
570 1 : readState := d.loadReadState()
571 1 :
572 1 : // Determine the seqnum to read at after grabbing the read state (current and
573 1 : // memtables) above.
574 1 : var seqNum base.SeqNum
575 1 : if s != nil {
576 1 : seqNum = s.seqNum
577 1 : } else {
578 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
579 1 : }
580 :
581 1 : buf := getIterAllocPool.Get().(*getIterAlloc)
582 1 :
583 1 : get := &buf.get
584 1 : *get = getIter{
585 1 : comparer: d.opts.Comparer,
586 1 : newIters: d.newIters,
587 1 : snapshot: seqNum,
588 1 : iterOpts: IterOptions{
589 1 : // TODO(sumeer): replace with a parameter provided by the caller.
590 1 : Category: categoryGet,
591 1 : logger: d.opts.Logger,
592 1 : snapshotForHideObsoletePoints: seqNum,
593 1 : },
594 1 : key: key,
595 1 : // Compute the key prefix for bloom filtering.
596 1 : prefix: key[:d.opts.Comparer.Split(key)],
597 1 : batch: b,
598 1 : mem: readState.memtables,
599 1 : l0: readState.current.L0SublevelFiles,
600 1 : version: readState.current,
601 1 : }
602 1 :
603 1 : // Strip off memtables which cannot possibly contain the seqNum being read
604 1 : // at.
605 1 : for len(get.mem) > 0 {
606 1 : n := len(get.mem)
607 1 : if logSeqNum := get.mem[n-1].logSeqNum; logSeqNum < seqNum {
608 1 : break
609 : }
610 1 : get.mem = get.mem[:n-1]
611 : }
612 :
613 1 : i := &buf.dbi
614 1 : pointIter := get
615 1 : *i = Iterator{
616 1 : ctx: context.Background(),
617 1 : getIterAlloc: buf,
618 1 : iter: pointIter,
619 1 : pointIter: pointIter,
620 1 : merge: d.merge,
621 1 : comparer: *d.opts.Comparer,
622 1 : readState: readState,
623 1 : keyBuf: buf.keyBuf,
624 1 : }
625 1 : // Set up a blob value fetcher to use for retrieving values from blob files.
626 1 : i.blobValueFetcher.Init(&readState.current.BlobFiles, d.fileCache, block.NoReadEnv)
627 1 : get.iiopts.blobValueFetcher = &i.blobValueFetcher
628 1 :
629 1 : if !i.First() {
630 1 : err := i.Close()
631 1 : if err != nil {
632 0 : return nil, nil, err
633 0 : }
634 1 : return nil, nil, ErrNotFound
635 : }
636 1 : val, err := i.ValueAndErr()
637 1 : if err != nil {
638 0 : return nil, nil, errors.CombineErrors(err, i.Close())
639 0 : }
640 1 : return val, i, nil
641 : }
642 :
643 : // Set sets the value for the given key. It overwrites any previous value
644 : // for that key; a DB is not a multi-map.
645 : //
646 : // It is safe to modify the contents of the arguments after Set returns.
647 1 : func (d *DB) Set(key, value []byte, opts *WriteOptions) error {
648 1 : b := newBatch(d)
649 1 : _ = b.Set(key, value, opts)
650 1 : if err := d.Apply(b, opts); err != nil {
651 0 : return err
652 0 : }
653 : // Only release the batch on success.
654 1 : return b.Close()
655 : }
656 :
657 : // Delete deletes the value for the given key. Deletes are blind all will
658 : // succeed even if the given key does not exist.
659 : //
660 : // It is safe to modify the contents of the arguments after Delete returns.
661 1 : func (d *DB) Delete(key []byte, opts *WriteOptions) error {
662 1 : b := newBatch(d)
663 1 : _ = b.Delete(key, opts)
664 1 : if err := d.Apply(b, opts); err != nil {
665 0 : return err
666 0 : }
667 : // Only release the batch on success.
668 1 : return b.Close()
669 : }
670 :
671 : // DeleteSized behaves identically to Delete, but takes an additional
672 : // argument indicating the size of the value being deleted. DeleteSized
673 : // should be preferred when the caller has the expectation that there exists
674 : // a single internal KV pair for the key (eg, the key has not been
675 : // overwritten recently), and the caller knows the size of its value.
676 : //
677 : // DeleteSized will record the value size within the tombstone and use it to
678 : // inform compaction-picking heuristics which strive to reduce space
679 : // amplification in the LSM. This "calling your shot" mechanic allows the
680 : // storage engine to more accurately estimate and reduce space amplification.
681 : //
682 : // It is safe to modify the contents of the arguments after DeleteSized
683 : // returns.
684 1 : func (d *DB) DeleteSized(key []byte, valueSize uint32, opts *WriteOptions) error {
685 1 : b := newBatch(d)
686 1 : _ = b.DeleteSized(key, valueSize, opts)
687 1 : if err := d.Apply(b, opts); err != nil {
688 0 : return err
689 0 : }
690 : // Only release the batch on success.
691 1 : return b.Close()
692 : }
693 :
694 : // SingleDelete adds an action to the batch that single deletes the entry for key.
695 : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
696 : //
697 : // WARNING: See the detailed warning in Writer.SingleDelete before using this.
698 : //
699 : // It is safe to modify the contents of the arguments after SingleDelete returns.
700 1 : func (d *DB) SingleDelete(key []byte, opts *WriteOptions) error {
701 1 : b := newBatch(d)
702 1 : _ = b.SingleDelete(key, opts)
703 1 : if err := d.Apply(b, opts); err != nil {
704 0 : return err
705 0 : }
706 : // Only release the batch on success.
707 1 : return b.Close()
708 : }
709 :
710 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
711 : // (inclusive on start, exclusive on end).
712 : //
713 : // It is safe to modify the contents of the arguments after DeleteRange
714 : // returns.
715 1 : func (d *DB) DeleteRange(start, end []byte, opts *WriteOptions) error {
716 1 : b := newBatch(d)
717 1 : _ = b.DeleteRange(start, end, opts)
718 1 : if err := d.Apply(b, opts); err != nil {
719 0 : return err
720 0 : }
721 : // Only release the batch on success.
722 1 : return b.Close()
723 : }
724 :
725 : // Merge adds an action to the DB that merges the value at key with the new
726 : // value. The details of the merge are dependent upon the configured merge
727 : // operator.
728 : //
729 : // It is safe to modify the contents of the arguments after Merge returns.
730 1 : func (d *DB) Merge(key, value []byte, opts *WriteOptions) error {
731 1 : b := newBatch(d)
732 1 : _ = b.Merge(key, value, opts)
733 1 : if err := d.Apply(b, opts); err != nil {
734 0 : return err
735 0 : }
736 : // Only release the batch on success.
737 1 : return b.Close()
738 : }
739 :
740 : // LogData adds the specified to the batch. The data will be written to the
741 : // WAL, but not added to memtables or sstables. Log data is never indexed,
742 : // which makes it useful for testing WAL performance.
743 : //
744 : // It is safe to modify the contents of the argument after LogData returns.
745 1 : func (d *DB) LogData(data []byte, opts *WriteOptions) error {
746 1 : b := newBatch(d)
747 1 : _ = b.LogData(data, opts)
748 1 : if err := d.Apply(b, opts); err != nil {
749 0 : return err
750 0 : }
751 : // Only release the batch on success.
752 1 : return b.Close()
753 : }
754 :
755 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
756 : // timestamp suffix to value. The suffix is optional. If any portion of the key
757 : // range [start, end) is already set by a range key with the same suffix value,
758 : // RangeKeySet overrides it.
759 : //
760 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
761 1 : func (d *DB) RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error {
762 1 : b := newBatch(d)
763 1 : _ = b.RangeKeySet(start, end, suffix, value, opts)
764 1 : if err := d.Apply(b, opts); err != nil {
765 0 : return err
766 0 : }
767 : // Only release the batch on success.
768 1 : return b.Close()
769 : }
770 :
771 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
772 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
773 : // range key. RangeKeyUnset only removes portions of range keys that fall within
774 : // the [start, end) key span, and only range keys with suffixes that exactly
775 : // match the unset suffix.
776 : //
777 : // It is safe to modify the contents of the arguments after RangeKeyUnset
778 : // returns.
779 1 : func (d *DB) RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error {
780 1 : b := newBatch(d)
781 1 : _ = b.RangeKeyUnset(start, end, suffix, opts)
782 1 : if err := d.Apply(b, opts); err != nil {
783 0 : return err
784 0 : }
785 : // Only release the batch on success.
786 1 : return b.Close()
787 : }
788 :
789 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
790 : // (inclusive on start, exclusive on end). It does not delete point keys (for
791 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
792 : // bounds, including those with or without suffixes.
793 : //
794 : // It is safe to modify the contents of the arguments after RangeKeyDelete
795 : // returns.
796 1 : func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error {
797 1 : b := newBatch(d)
798 1 : _ = b.RangeKeyDelete(start, end, opts)
799 1 : if err := d.Apply(b, opts); err != nil {
800 0 : return err
801 0 : }
802 : // Only release the batch on success.
803 1 : return b.Close()
804 : }
805 :
806 : // Apply the operations contained in the batch to the DB. If the batch is large
807 : // the contents of the batch may be retained by the database. If that occurs
808 : // the batch contents will be cleared preventing the caller from attempting to
809 : // reuse them.
810 : //
811 : // It is safe to modify the contents of the arguments after Apply returns.
812 : //
813 : // Apply returns ErrInvalidBatch if the provided batch is invalid in any way.
814 1 : func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
815 1 : return d.applyInternal(batch, opts, false)
816 1 : }
817 :
818 : // ApplyNoSyncWait must only be used when opts.Sync is true and the caller
819 : // does not want to wait for the WAL fsync to happen. The method will return
820 : // once the mutation is applied to the memtable and is visible (note that a
821 : // mutation is visible before the WAL sync even in the wait case, so we have
822 : // not weakened the durability semantics). The caller must call Batch.SyncWait
823 : // to wait for the WAL fsync. The caller must not Close the batch without
824 : // first calling Batch.SyncWait.
825 : //
826 : // RECOMMENDATION: Prefer using Apply unless you really understand why you
827 : // need ApplyNoSyncWait.
828 : // EXPERIMENTAL: API/feature subject to change. Do not yet use outside
829 : // CockroachDB.
830 1 : func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error {
831 1 : if !opts.Sync {
832 0 : return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false")
833 0 : }
834 1 : return d.applyInternal(batch, opts, true)
835 : }
836 :
837 : // REQUIRES: noSyncWait => opts.Sync
838 1 : func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error {
839 1 : if err := d.closed.Load(); err != nil {
840 0 : panic(err)
841 : }
842 1 : if batch.committing {
843 0 : panic("pebble: batch already committing")
844 : }
845 1 : if batch.applied.Load() {
846 0 : panic("pebble: batch already applied")
847 : }
848 1 : if d.opts.ReadOnly {
849 0 : return ErrReadOnly
850 0 : }
851 1 : if batch.db != nil && batch.db != d {
852 0 : panic(fmt.Sprintf("pebble: batch db mismatch: %p != %p", batch.db, d))
853 : }
854 :
855 1 : sync := opts.GetSync()
856 1 : if sync && d.opts.DisableWAL {
857 0 : return errors.New("pebble: WAL disabled")
858 0 : }
859 :
860 1 : if fmv := d.FormatMajorVersion(); fmv < batch.minimumFormatMajorVersion {
861 0 : panic(fmt.Sprintf(
862 0 : "pebble: batch requires at least format major version %d (current: %d)",
863 0 : batch.minimumFormatMajorVersion, fmv,
864 0 : ))
865 : }
866 :
867 1 : if batch.countRangeKeys > 0 {
868 1 : if d.split == nil {
869 0 : return errNoSplit
870 0 : }
871 : }
872 1 : batch.committing = true
873 1 :
874 1 : if batch.db == nil {
875 0 : if err := batch.refreshMemTableSize(); err != nil {
876 0 : return err
877 0 : }
878 : }
879 1 : if batch.memTableSize >= d.largeBatchThreshold {
880 1 : var err error
881 1 : batch.flushable, err = newFlushableBatch(batch, d.opts.Comparer)
882 1 : if err != nil {
883 0 : return err
884 0 : }
885 : }
886 1 : if err := d.commit.Commit(batch, sync, noSyncWait); err != nil {
887 0 : // There isn't much we can do on an error here. The commit pipeline will be
888 0 : // horked at this point.
889 0 : d.opts.Logger.Fatalf("pebble: fatal commit error: %v", err)
890 0 : }
891 : // If this is a large batch, we need to clear the batch contents as the
892 : // flushable batch may still be present in the flushables queue.
893 : //
894 : // TODO(peter): Currently large batches are written to the WAL. We could
895 : // skip the WAL write and instead wait for the large batch to be flushed to
896 : // an sstable. For a 100 MB batch, this might actually be faster. For a 1
897 : // GB batch this is almost certainly faster.
898 1 : if batch.flushable != nil {
899 1 : batch.data = nil
900 1 : }
901 1 : return nil
902 : }
903 :
904 1 : func (d *DB) commitApply(b *Batch, mem *memTable) error {
905 1 : if b.flushable != nil {
906 1 : // This is a large batch which was already added to the immutable queue.
907 1 : return nil
908 1 : }
909 1 : err := mem.apply(b, b.SeqNum())
910 1 : if err != nil {
911 0 : return err
912 0 : }
913 :
914 : // If the batch contains range tombstones and the database is configured
915 : // to flush range deletions, schedule a delayed flush so that disk space
916 : // may be reclaimed without additional writes or an explicit flush.
917 1 : if b.countRangeDels > 0 && d.opts.FlushDelayDeleteRange > 0 {
918 1 : d.mu.Lock()
919 1 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayDeleteRange)
920 1 : d.mu.Unlock()
921 1 : }
922 :
923 : // If the batch contains range keys and the database is configured to flush
924 : // range keys, schedule a delayed flush so that the range keys are cleared
925 : // from the memtable.
926 1 : if b.countRangeKeys > 0 && d.opts.FlushDelayRangeKey > 0 {
927 1 : d.mu.Lock()
928 1 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayRangeKey)
929 1 : d.mu.Unlock()
930 1 : }
931 :
932 1 : if mem.writerUnref() {
933 1 : d.mu.Lock()
934 1 : d.maybeScheduleFlush()
935 1 : d.mu.Unlock()
936 1 : }
937 1 : return nil
938 : }
939 :
940 1 : func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
941 1 : var size int64
942 1 : repr := b.Repr()
943 1 :
944 1 : if b.flushable != nil {
945 1 : // We have a large batch. Such batches are special in that they don't get
946 1 : // added to the memtable, and are instead inserted into the queue of
947 1 : // memtables. The call to makeRoomForWrite with this batch will force the
948 1 : // current memtable to be flushed. We want the large batch to be part of
949 1 : // the same log, so we add it to the WAL here, rather than after the call
950 1 : // to makeRoomForWrite().
951 1 : //
952 1 : // Set the sequence number since it was not set to the correct value earlier
953 1 : // (see comment in newFlushableBatch()).
954 1 : b.flushable.setSeqNum(b.SeqNum())
955 1 : if !d.opts.DisableWAL {
956 1 : var err error
957 1 : size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
958 1 : if err != nil {
959 0 : panic(err)
960 : }
961 : }
962 : }
963 :
964 1 : var err error
965 1 : // Grab a reference to the memtable. We don't hold DB.mu, but we do hold
966 1 : // d.commit.mu. It's okay for readers of d.mu.mem.mutable to only hold one of
967 1 : // d.commit.mu or d.mu, because memtable rotations require holding both.
968 1 : mem := d.mu.mem.mutable
969 1 : // Batches which contain keys of kind InternalKeyKindIngestSST will
970 1 : // never be applied to the memtable, so we don't need to make room for
971 1 : // write.
972 1 : if !b.ingestedSSTBatch {
973 1 : // Flushable batches will require a rotation of the memtable regardless,
974 1 : // so only attempt an optimistic reservation of space in the current
975 1 : // memtable if this batch is not a large flushable batch.
976 1 : if b.flushable == nil {
977 1 : err = d.mu.mem.mutable.prepare(b)
978 1 : }
979 1 : if b.flushable != nil || err == arenaskl.ErrArenaFull {
980 1 : // Slow path.
981 1 : // We need to acquire DB.mu and rotate the memtable.
982 1 : func() {
983 1 : d.mu.Lock()
984 1 : defer d.mu.Unlock()
985 1 : err = d.makeRoomForWrite(b)
986 1 : mem = d.mu.mem.mutable
987 1 : }()
988 : }
989 : }
990 1 : if err != nil {
991 0 : return nil, err
992 0 : }
993 1 : if d.opts.DisableWAL {
994 1 : return mem, nil
995 1 : }
996 1 : d.logBytesIn.Add(uint64(len(repr)))
997 1 :
998 1 : if b.flushable == nil {
999 1 : size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
1000 1 : if err != nil {
1001 0 : panic(err)
1002 : }
1003 : }
1004 :
1005 1 : d.logSize.Store(uint64(size))
1006 1 : return mem, err
1007 : }
1008 :
1009 : type iterAlloc struct {
1010 : dbi Iterator
1011 : keyBuf []byte
1012 : boundsBuf [2][]byte
1013 : prefixOrFullSeekKey []byte
1014 : merging mergingIter
1015 : mlevels [3 + numLevels]mergingIterLevel
1016 : levels [3 + numLevels]levelIter
1017 : levelsPositioned [3 + numLevels]bool
1018 : }
1019 :
1020 : var iterAllocPool = sync.Pool{
1021 1 : New: func() interface{} {
1022 1 : return &iterAlloc{}
1023 1 : },
1024 : }
1025 :
1026 : // snapshotIterOpts denotes snapshot-related iterator options when calling
1027 : // newIter. These are the possible cases for a snapshotIterOpts:
1028 : // - No snapshot: All fields are zero values.
1029 : // - Classic snapshot: Only `seqNum` is set. The latest readState will be used
1030 : // and the specified seqNum will be used as the snapshot seqNum.
1031 : // - EventuallyFileOnlySnapshot (EFOS) behaving as a classic snapshot. Only
1032 : // the `seqNum` is set. The latest readState will be used
1033 : // and the specified seqNum will be used as the snapshot seqNum.
1034 : // - EFOS in file-only state: Only `seqNum` and `vers` are set. All the
1035 : // relevant SSTs are referenced by the *version.
1036 : // - EFOS that has been excised but is in alwaysCreateIters mode (tests only).
1037 : // Only `seqNum` and `readState` are set.
1038 : type snapshotIterOpts struct {
1039 : seqNum base.SeqNum
1040 : vers *manifest.Version
1041 : readState *readState
1042 : }
1043 :
1044 : type batchIterOpts struct {
1045 : batchOnly bool
1046 : }
1047 : type newIterOpts struct {
1048 : snapshot snapshotIterOpts
1049 : batch batchIterOpts
1050 : }
1051 :
1052 : // newIter constructs a new iterator, merging in batch iterators as an extra
1053 : // level.
1054 : func (d *DB) newIter(
1055 : ctx context.Context, batch *Batch, newIterOpts newIterOpts, o *IterOptions,
1056 1 : ) *Iterator {
1057 1 : if newIterOpts.batch.batchOnly {
1058 0 : if batch == nil {
1059 0 : panic("batchOnly is true, but batch is nil")
1060 : }
1061 0 : if newIterOpts.snapshot.vers != nil {
1062 0 : panic("batchOnly is true, but snapshotIterOpts is initialized")
1063 : }
1064 : }
1065 1 : if err := d.closed.Load(); err != nil {
1066 0 : panic(err)
1067 : }
1068 1 : seqNum := newIterOpts.snapshot.seqNum
1069 1 : if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
1070 0 : panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
1071 : }
1072 1 : if (batch != nil || seqNum != 0) && (o != nil && o.OnlyReadGuaranteedDurable) {
1073 0 : // We could add support for OnlyReadGuaranteedDurable on snapshots if
1074 0 : // there was a need: this would require checking that the sequence number
1075 0 : // of the snapshot has been flushed, by comparing with
1076 0 : // DB.mem.queue[0].logSeqNum.
1077 0 : panic("OnlyReadGuaranteedDurable is not supported for batches or snapshots")
1078 : }
1079 1 : var readState *readState
1080 1 : var newIters tableNewIters
1081 1 : var newIterRangeKey keyspanimpl.TableNewSpanIter
1082 1 : if !newIterOpts.batch.batchOnly {
1083 1 : // Grab and reference the current readState. This prevents the underlying
1084 1 : // files in the associated version from being deleted if there is a current
1085 1 : // compaction. The readState is unref'd by Iterator.Close().
1086 1 : if newIterOpts.snapshot.vers == nil {
1087 1 : if newIterOpts.snapshot.readState != nil {
1088 0 : readState = newIterOpts.snapshot.readState
1089 0 : readState.ref()
1090 1 : } else {
1091 1 : // NB: loadReadState() calls readState.ref().
1092 1 : readState = d.loadReadState()
1093 1 : }
1094 1 : } else {
1095 1 : // vers != nil
1096 1 : newIterOpts.snapshot.vers.Ref()
1097 1 : }
1098 :
1099 : // Determine the seqnum to read at after grabbing the read state (current and
1100 : // memtables) above.
1101 1 : if seqNum == 0 {
1102 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
1103 1 : }
1104 1 : newIters = d.newIters
1105 1 : newIterRangeKey = d.tableNewRangeKeyIter
1106 : }
1107 :
1108 : // Bundle various structures under a single umbrella in order to allocate
1109 : // them together.
1110 1 : buf := iterAllocPool.Get().(*iterAlloc)
1111 1 : dbi := &buf.dbi
1112 1 : *dbi = Iterator{
1113 1 : ctx: ctx,
1114 1 : alloc: buf,
1115 1 : merge: d.merge,
1116 1 : comparer: *d.opts.Comparer,
1117 1 : readState: readState,
1118 1 : version: newIterOpts.snapshot.vers,
1119 1 : keyBuf: buf.keyBuf,
1120 1 : prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
1121 1 : boundsBuf: buf.boundsBuf,
1122 1 : batch: batch,
1123 1 : fc: d.fileCache,
1124 1 : newIters: newIters,
1125 1 : newIterRangeKey: newIterRangeKey,
1126 1 : seqNum: seqNum,
1127 1 : batchOnlyIter: newIterOpts.batch.batchOnly,
1128 1 : }
1129 1 : if o != nil {
1130 1 : dbi.opts = *o
1131 1 : dbi.processBounds(o.LowerBound, o.UpperBound)
1132 1 : }
1133 1 : dbi.opts.logger = d.opts.Logger
1134 1 : if d.opts.private.disableLazyCombinedIteration {
1135 1 : dbi.opts.disableLazyCombinedIteration = true
1136 1 : }
1137 1 : if batch != nil {
1138 1 : dbi.batchSeqNum = dbi.batch.nextSeqNum()
1139 1 : }
1140 1 : return finishInitializingIter(ctx, buf)
1141 : }
1142 :
1143 : // finishInitializingIter is a helper for doing the non-trivial initialization
1144 : // of an Iterator. It's invoked to perform the initial initialization of an
1145 : // Iterator during NewIter or Clone, and to perform reinitialization due to a
1146 : // change in IterOptions by a call to Iterator.SetOptions.
1147 1 : func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
1148 1 : // Short-hand.
1149 1 : dbi := &buf.dbi
1150 1 : var memtables flushableList
1151 1 : if dbi.readState != nil {
1152 1 : memtables = dbi.readState.memtables
1153 1 : }
1154 1 : if dbi.opts.OnlyReadGuaranteedDurable {
1155 0 : memtables = nil
1156 1 : } else {
1157 1 : // We only need to read from memtables which contain sequence numbers older
1158 1 : // than seqNum. Trim off newer memtables.
1159 1 : for i := len(memtables) - 1; i >= 0; i-- {
1160 1 : if logSeqNum := memtables[i].logSeqNum; logSeqNum < dbi.seqNum {
1161 1 : break
1162 : }
1163 1 : memtables = memtables[:i]
1164 : }
1165 : }
1166 :
1167 1 : if dbi.opts.pointKeys() {
1168 1 : // Construct the point iterator, initializing dbi.pointIter to point to
1169 1 : // dbi.merging. If this is called during a SetOptions call and this
1170 1 : // Iterator has already initialized dbi.merging, constructPointIter is a
1171 1 : // noop and an initialized pointIter already exists in dbi.pointIter.
1172 1 : dbi.constructPointIter(ctx, memtables, buf)
1173 1 : dbi.iter = dbi.pointIter
1174 1 : } else {
1175 1 : dbi.iter = emptyIter
1176 1 : }
1177 :
1178 1 : if dbi.opts.rangeKeys() {
1179 1 : dbi.rangeKeyMasking.init(dbi, &dbi.comparer)
1180 1 :
1181 1 : // When iterating over both point and range keys, don't create the
1182 1 : // range-key iterator stack immediately if we can avoid it. This
1183 1 : // optimization takes advantage of the expected sparseness of range
1184 1 : // keys, and configures the point-key iterator to dynamically switch to
1185 1 : // combined iteration when it observes a file containing range keys.
1186 1 : //
1187 1 : // Lazy combined iteration is not possible if a batch or a memtable
1188 1 : // contains any range keys.
1189 1 : useLazyCombinedIteration := dbi.rangeKey == nil &&
1190 1 : dbi.opts.KeyTypes == IterKeyTypePointsAndRanges &&
1191 1 : (dbi.batch == nil || dbi.batch.countRangeKeys == 0) &&
1192 1 : !dbi.opts.disableLazyCombinedIteration
1193 1 : if useLazyCombinedIteration {
1194 1 : // The user requested combined iteration, and there's no indexed
1195 1 : // batch currently containing range keys that would prevent lazy
1196 1 : // combined iteration. Check the memtables to see if they contain
1197 1 : // any range keys.
1198 1 : for i := range memtables {
1199 1 : if memtables[i].containsRangeKeys() {
1200 1 : useLazyCombinedIteration = false
1201 1 : break
1202 : }
1203 : }
1204 : }
1205 :
1206 1 : if useLazyCombinedIteration {
1207 1 : dbi.lazyCombinedIter = lazyCombinedIter{
1208 1 : parent: dbi,
1209 1 : pointIter: dbi.pointIter,
1210 1 : combinedIterState: combinedIterState{
1211 1 : initialized: false,
1212 1 : },
1213 1 : }
1214 1 : dbi.iter = &dbi.lazyCombinedIter
1215 1 : dbi.iter = invalidating.MaybeWrapIfInvariants(dbi.iter)
1216 1 : } else {
1217 1 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{
1218 1 : initialized: true,
1219 1 : }
1220 1 : if dbi.rangeKey == nil {
1221 1 : dbi.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1222 1 : dbi.constructRangeKeyIter()
1223 1 : } else {
1224 1 : dbi.rangeKey.iterConfig.SetBounds(dbi.opts.LowerBound, dbi.opts.UpperBound)
1225 1 : }
1226 :
1227 : // Wrap the point iterator (currently dbi.iter) with an interleaving
1228 : // iterator that interleaves range keys pulled from
1229 : // dbi.rangeKey.rangeKeyIter.
1230 : //
1231 : // NB: The interleaving iterator is always reinitialized, even if
1232 : // dbi already had an initialized range key iterator, in case the point
1233 : // iterator changed or the range key masking suffix changed.
1234 1 : dbi.rangeKey.iiter.Init(&dbi.comparer, dbi.iter, dbi.rangeKey.rangeKeyIter,
1235 1 : keyspan.InterleavingIterOpts{
1236 1 : Mask: &dbi.rangeKeyMasking,
1237 1 : LowerBound: dbi.opts.LowerBound,
1238 1 : UpperBound: dbi.opts.UpperBound,
1239 1 : })
1240 1 : dbi.iter = &dbi.rangeKey.iiter
1241 : }
1242 1 : } else {
1243 1 : // !dbi.opts.rangeKeys()
1244 1 : //
1245 1 : // Reset the combined iterator state. The initialized=true ensures the
1246 1 : // iterator doesn't unnecessarily try to switch to combined iteration.
1247 1 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{initialized: true}
1248 1 : }
1249 1 : return dbi
1250 : }
1251 :
1252 : // ScanInternal scans all internal keys within the specified bounds, truncating
1253 : // any rangedels and rangekeys to those bounds if they span past them. For use
1254 : // when an external user needs to be aware of all internal keys that make up a
1255 : // key range.
1256 : //
1257 : // Keys deleted by range deletions must not be returned or exposed by this
1258 : // method, while the range deletion deleting that key must be exposed using
1259 : // visitRangeDel. Keys that would be masked by range key masking (if an
1260 : // appropriate prefix were set) should be exposed, alongside the range key
1261 : // that would have masked it. This method also collapses all point keys into
1262 : // one InternalKey; so only one internal key at most per user key is returned
1263 : // to visitPointKey.
1264 : //
1265 : // If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
1266 : // mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
1267 : // their metadatas truncated to [lower, upper) and passed into visitSharedFile.
1268 : // ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
1269 : // sstable in L5 or L6 is found that is not in shared storage according to
1270 : // provider.IsShared, or an sstable in those levels contains a newer key than the
1271 : // snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
1272 : // of when this could happen could be if Pebble started writing sstables before a
1273 : // creator ID was set (as creator IDs are necessary to enable shared storage)
1274 : // resulting in some lower level SSTs being on non-shared storage. Skip-shared
1275 : // iteration is invalid in those cases.
1276 : func (d *DB) ScanInternal(
1277 : ctx context.Context,
1278 : category block.Category,
1279 : lower, upper []byte,
1280 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
1281 : visitRangeDel func(start, end []byte, seqNum SeqNum) error,
1282 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
1283 : visitSharedFile func(sst *SharedSSTMeta) error,
1284 : visitExternalFile func(sst *ExternalFile) error,
1285 1 : ) error {
1286 1 : scanInternalOpts := &scanInternalOptions{
1287 1 : category: category,
1288 1 : visitPointKey: visitPointKey,
1289 1 : visitRangeDel: visitRangeDel,
1290 1 : visitRangeKey: visitRangeKey,
1291 1 : visitSharedFile: visitSharedFile,
1292 1 : visitExternalFile: visitExternalFile,
1293 1 : IterOptions: IterOptions{
1294 1 : KeyTypes: IterKeyTypePointsAndRanges,
1295 1 : LowerBound: lower,
1296 1 : UpperBound: upper,
1297 1 : },
1298 1 : }
1299 1 : iter, err := d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, scanInternalOpts)
1300 1 : if err != nil {
1301 0 : return err
1302 0 : }
1303 1 : defer iter.close()
1304 1 : return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
1305 : }
1306 :
1307 : // newInternalIter constructs and returns a new scanInternalIterator on this db.
1308 : // If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
1309 : // to the internal iterator.
1310 : //
1311 : // TODO(bilal): This method has a lot of similarities with db.newIter as well as
1312 : // finishInitializingIter. Both pairs of methods should be refactored to reduce
1313 : // this duplication.
1314 : func (d *DB) newInternalIter(
1315 : ctx context.Context, sOpts snapshotIterOpts, o *scanInternalOptions,
1316 1 : ) (*scanInternalIterator, error) {
1317 1 : if err := d.closed.Load(); err != nil {
1318 0 : panic(err)
1319 : }
1320 : // Grab and reference the current readState. This prevents the underlying
1321 : // files in the associated version from being deleted if there is a current
1322 : // compaction. The readState is unref'd by Iterator.Close().
1323 1 : var readState *readState
1324 1 : var vers *manifest.Version
1325 1 : if sOpts.vers == nil {
1326 1 : if sOpts.readState != nil {
1327 0 : readState = sOpts.readState
1328 0 : readState.ref()
1329 0 : vers = readState.current
1330 1 : } else {
1331 1 : readState = d.loadReadState()
1332 1 : vers = readState.current
1333 1 : }
1334 0 : } else {
1335 0 : vers = sOpts.vers
1336 0 : sOpts.vers.Ref()
1337 0 : }
1338 :
1339 : // Determine the seqnum to read at after grabbing the read state (current and
1340 : // memtables) above.
1341 1 : seqNum := sOpts.seqNum
1342 1 : if seqNum == 0 {
1343 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
1344 1 : }
1345 :
1346 : // Bundle various structures under a single umbrella in order to allocate
1347 : // them together.
1348 1 : buf := iterAllocPool.Get().(*iterAlloc)
1349 1 : dbi := &scanInternalIterator{
1350 1 : ctx: ctx,
1351 1 : db: d,
1352 1 : comparer: d.opts.Comparer,
1353 1 : merge: d.opts.Merger.Merge,
1354 1 : readState: readState,
1355 1 : version: sOpts.vers,
1356 1 : alloc: buf,
1357 1 : newIters: d.newIters,
1358 1 : newIterRangeKey: d.tableNewRangeKeyIter,
1359 1 : seqNum: seqNum,
1360 1 : mergingIter: &buf.merging,
1361 1 : }
1362 1 : dbi.blobValueFetcher.Init(&vers.BlobFiles, d.fileCache, block.ReadEnv{})
1363 1 :
1364 1 : dbi.opts = *o
1365 1 : dbi.opts.logger = d.opts.Logger
1366 1 : if d.opts.private.disableLazyCombinedIteration {
1367 1 : dbi.opts.disableLazyCombinedIteration = true
1368 1 : }
1369 1 : return finishInitializingInternalIter(buf, dbi)
1370 : }
1371 :
1372 : type internalIterOpts struct {
1373 : // if compaction is set, sstable-level iterators will be created using
1374 : // NewCompactionIter; these iterators have a more constrained interface
1375 : // and are optimized for the sequential scan of a compaction.
1376 : compaction bool
1377 : readEnv sstable.ReadEnv
1378 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
1379 : // blobValueFetcher is the base.ValueFetcher to use when constructing
1380 : // internal values to represent values stored externally in blob files.
1381 : blobValueFetcher base.ValueFetcher
1382 : }
1383 :
1384 : func finishInitializingInternalIter(
1385 : buf *iterAlloc, i *scanInternalIterator,
1386 1 : ) (*scanInternalIterator, error) {
1387 1 : // Short-hand.
1388 1 : var memtables flushableList
1389 1 : if i.readState != nil {
1390 1 : memtables = i.readState.memtables
1391 1 : }
1392 : // We only need to read from memtables which contain sequence numbers older
1393 : // than seqNum. Trim off newer memtables.
1394 1 : for j := len(memtables) - 1; j >= 0; j-- {
1395 1 : if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
1396 1 : break
1397 : }
1398 1 : memtables = memtables[:j]
1399 : }
1400 1 : i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
1401 1 :
1402 1 : if err := i.constructPointIter(i.opts.category, memtables, buf); err != nil {
1403 0 : return nil, err
1404 0 : }
1405 :
1406 : // For internal iterators, we skip the lazy combined iteration optimization
1407 : // entirely, and create the range key iterator stack directly.
1408 1 : i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1409 1 : if err := i.constructRangeKeyIter(); err != nil {
1410 0 : return nil, err
1411 0 : }
1412 :
1413 : // Wrap the point iterator (currently i.iter) with an interleaving
1414 : // iterator that interleaves range keys pulled from
1415 : // i.rangeKey.rangeKeyIter.
1416 1 : i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
1417 1 : keyspan.InterleavingIterOpts{
1418 1 : LowerBound: i.opts.LowerBound,
1419 1 : UpperBound: i.opts.UpperBound,
1420 1 : })
1421 1 : i.iter = &i.rangeKey.iiter
1422 1 :
1423 1 : return i, nil
1424 : }
1425 :
1426 : func (i *Iterator) constructPointIter(
1427 : ctx context.Context, memtables flushableList, buf *iterAlloc,
1428 1 : ) {
1429 1 : if i.pointIter != nil {
1430 1 : // Already have one.
1431 1 : return
1432 1 : }
1433 1 : readEnv := block.ReadEnv{
1434 1 : Stats: &i.stats.InternalStats,
1435 1 : // If the file cache has a sstable stats collector, ask it for an
1436 1 : // accumulator for this iterator's configured category and QoS. All SSTable
1437 1 : // iterators created by this Iterator will accumulate their stats to it as
1438 1 : // they Close during iteration.
1439 1 : IterStats: i.fc.SSTStatsCollector().Accumulator(
1440 1 : uint64(uintptr(unsafe.Pointer(i))),
1441 1 : i.opts.Category,
1442 1 : ),
1443 1 : }
1444 1 : if i.readState != nil {
1445 1 : i.blobValueFetcher.Init(&i.readState.current.BlobFiles, i.fc, readEnv)
1446 1 : } else if i.version != nil {
1447 1 : i.blobValueFetcher.Init(&i.version.BlobFiles, i.fc, readEnv)
1448 1 : }
1449 1 : internalOpts := internalIterOpts{
1450 1 : readEnv: sstable.ReadEnv{Block: readEnv},
1451 1 : blobValueFetcher: &i.blobValueFetcher,
1452 1 : }
1453 1 : if i.opts.RangeKeyMasking.Filter != nil {
1454 1 : internalOpts.boundLimitedFilter = &i.rangeKeyMasking
1455 1 : }
1456 :
1457 : // Merging levels and levels from iterAlloc.
1458 1 : mlevels := buf.mlevels[:0]
1459 1 : levels := buf.levels[:0]
1460 1 :
1461 1 : // We compute the number of levels needed ahead of time and reallocate a slice if
1462 1 : // the array from the iterAlloc isn't large enough. Doing this allocation once
1463 1 : // should improve the performance.
1464 1 : numMergingLevels := 0
1465 1 : numLevelIters := 0
1466 1 : if i.batch != nil {
1467 1 : numMergingLevels++
1468 1 : }
1469 :
1470 1 : var current *manifest.Version
1471 1 : if !i.batchOnlyIter {
1472 1 : numMergingLevels += len(memtables)
1473 1 :
1474 1 : current = i.version
1475 1 : if current == nil {
1476 1 : current = i.readState.current
1477 1 : }
1478 1 : numMergingLevels += len(current.L0SublevelFiles)
1479 1 : numLevelIters += len(current.L0SublevelFiles)
1480 1 : for level := 1; level < len(current.Levels); level++ {
1481 1 : if current.Levels[level].Empty() {
1482 1 : continue
1483 : }
1484 1 : numMergingLevels++
1485 1 : numLevelIters++
1486 : }
1487 : }
1488 :
1489 1 : if numMergingLevels > cap(mlevels) {
1490 1 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
1491 1 : }
1492 1 : if numLevelIters > cap(levels) {
1493 1 : levels = make([]levelIter, 0, numLevelIters)
1494 1 : }
1495 :
1496 : // Top-level is the batch, if any.
1497 1 : if i.batch != nil {
1498 1 : if i.batch.index == nil {
1499 0 : // This isn't an indexed batch. We shouldn't have gotten this far.
1500 0 : panic(errors.AssertionFailedf("creating an iterator over an unindexed batch"))
1501 1 : } else {
1502 1 : i.batch.initInternalIter(&i.opts, &i.batchPointIter)
1503 1 : i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum)
1504 1 : // Only include the batch's rangedel iterator if it's non-empty.
1505 1 : // This requires some subtle logic in the case a rangedel is later
1506 1 : // written to the batch and the view of the batch is refreshed
1507 1 : // during a call to SetOptions—in this case, we need to reconstruct
1508 1 : // the point iterator to add the batch rangedel iterator.
1509 1 : var rangeDelIter keyspan.FragmentIterator
1510 1 : if i.batchRangeDelIter.Count() > 0 {
1511 1 : rangeDelIter = &i.batchRangeDelIter
1512 1 : }
1513 1 : mlevels = append(mlevels, mergingIterLevel{
1514 1 : iter: &i.batchPointIter,
1515 1 : rangeDelIter: rangeDelIter,
1516 1 : })
1517 : }
1518 : }
1519 :
1520 1 : if !i.batchOnlyIter {
1521 1 : // Next are the memtables.
1522 1 : for j := len(memtables) - 1; j >= 0; j-- {
1523 1 : mem := memtables[j]
1524 1 : mlevels = append(mlevels, mergingIterLevel{
1525 1 : iter: mem.newIter(&i.opts),
1526 1 : rangeDelIter: mem.newRangeDelIter(&i.opts),
1527 1 : })
1528 1 : }
1529 :
1530 : // Next are the file levels: L0 sub-levels followed by lower levels.
1531 1 : mlevelsIndex := len(mlevels)
1532 1 : levelsIndex := len(levels)
1533 1 : mlevels = mlevels[:numMergingLevels]
1534 1 : levels = levels[:numLevelIters]
1535 1 : i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
1536 1 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Layer) {
1537 1 : li := &levels[levelsIndex]
1538 1 :
1539 1 : li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
1540 1 : li.initRangeDel(&mlevels[mlevelsIndex])
1541 1 : li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
1542 1 : mlevels[mlevelsIndex].levelIter = li
1543 1 : mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
1544 1 :
1545 1 : levelsIndex++
1546 1 : mlevelsIndex++
1547 1 : }
1548 :
1549 : // Add level iterators for the L0 sublevels, iterating from newest to
1550 : // oldest.
1551 1 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
1552 1 : addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
1553 1 : }
1554 :
1555 : // Add level iterators for the non-empty non-L0 levels.
1556 1 : for level := 1; level < len(current.Levels); level++ {
1557 1 : if current.Levels[level].Empty() {
1558 1 : continue
1559 : }
1560 1 : addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
1561 : }
1562 : }
1563 1 : buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
1564 1 : if len(mlevels) <= cap(buf.levelsPositioned) {
1565 1 : buf.merging.levelsPositioned = buf.levelsPositioned[:len(mlevels)]
1566 1 : }
1567 1 : buf.merging.snapshot = i.seqNum
1568 1 : buf.merging.batchSnapshot = i.batchSeqNum
1569 1 : buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
1570 1 : i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging).(topLevelIterator)
1571 1 : i.merging = &buf.merging
1572 : }
1573 :
1574 : // NewBatch returns a new empty write-only batch. Any reads on the batch will
1575 : // return an error. If the batch is committed it will be applied to the DB.
1576 1 : func (d *DB) NewBatch(opts ...BatchOption) *Batch {
1577 1 : return newBatch(d, opts...)
1578 1 : }
1579 :
1580 : // NewBatchWithSize is mostly identical to NewBatch, but it will allocate the
1581 : // the specified memory space for the internal slice in advance.
1582 0 : func (d *DB) NewBatchWithSize(size int, opts ...BatchOption) *Batch {
1583 0 : return newBatchWithSize(d, size, opts...)
1584 0 : }
1585 :
1586 : // NewIndexedBatch returns a new empty read-write batch. Any reads on the batch
1587 : // will read from both the batch and the DB. If the batch is committed it will
1588 : // be applied to the DB. An indexed batch is slower that a non-indexed batch
1589 : // for insert operations. If you do not need to perform reads on the batch, use
1590 : // NewBatch instead.
1591 1 : func (d *DB) NewIndexedBatch() *Batch {
1592 1 : return newIndexedBatch(d, d.opts.Comparer)
1593 1 : }
1594 :
1595 : // NewIndexedBatchWithSize is mostly identical to NewIndexedBatch, but it will
1596 : // allocate the specified memory space for the internal slice in advance.
1597 0 : func (d *DB) NewIndexedBatchWithSize(size int) *Batch {
1598 0 : return newIndexedBatchWithSize(d, d.opts.Comparer, size)
1599 0 : }
1600 :
1601 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
1602 : // return false). The iterator can be positioned via a call to SeekGE, SeekLT,
1603 : // First or Last. The iterator provides a point-in-time view of the current DB
1604 : // state. This view is maintained by preventing file deletions and preventing
1605 : // memtables referenced by the iterator from being deleted. Using an iterator
1606 : // to maintain a long-lived point-in-time view of the DB state can lead to an
1607 : // apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for
1608 : // point-in-time snapshots which avoids these problems.
1609 1 : func (d *DB) NewIter(o *IterOptions) (*Iterator, error) {
1610 1 : return d.NewIterWithContext(context.Background(), o)
1611 1 : }
1612 :
1613 : // NewIterWithContext is like NewIter, and additionally accepts a context for
1614 : // tracing.
1615 1 : func (d *DB) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
1616 1 : return d.newIter(ctx, nil /* batch */, newIterOpts{}, o), nil
1617 1 : }
1618 :
1619 : // NewSnapshot returns a point-in-time view of the current DB state. Iterators
1620 : // created with this handle will all observe a stable snapshot of the current
1621 : // DB state. The caller must call Snapshot.Close() when the snapshot is no
1622 : // longer needed. Snapshots are not persisted across DB restarts (close ->
1623 : // open). Unlike the implicit snapshot maintained by an iterator, a snapshot
1624 : // will not prevent memtables from being released or sstables from being
1625 : // deleted. Instead, a snapshot prevents deletion of sequence numbers
1626 : // referenced by the snapshot.
1627 : //
1628 : // There exists one violation of a Snapshot's point-in-time guarantee: An excise
1629 : // (see DB.Excise and DB.IngestAndExcise) that occurs after the snapshot's
1630 : // creation will be observed by iterators created from the snapshot after the
1631 : // excise. See NewEventuallyFileOnlySnapshot for a variant of NewSnapshot that
1632 : // provides a full point-in-time guarantee.
1633 1 : func (d *DB) NewSnapshot() *Snapshot {
1634 1 : // TODO(jackson): Consider removal of regular, non-eventually-file-only
1635 1 : // snapshots given they no longer provide a true point-in-time snapshot of
1636 1 : // the database due to excises. If we had a mechanism to construct a maximal
1637 1 : // key range, we could implement NewSnapshot in terms of
1638 1 : // NewEventuallyFileOnlySnapshot and provide a true point-in-time guarantee.
1639 1 : if err := d.closed.Load(); err != nil {
1640 0 : panic(err)
1641 : }
1642 1 : d.mu.Lock()
1643 1 : s := &Snapshot{
1644 1 : db: d,
1645 1 : seqNum: d.mu.versions.visibleSeqNum.Load(),
1646 1 : }
1647 1 : d.mu.snapshots.pushBack(s)
1648 1 : d.mu.Unlock()
1649 1 : return s
1650 : }
1651 :
1652 : // NewEventuallyFileOnlySnapshot returns a point-in-time view of the current DB
1653 : // state, similar to NewSnapshot, but with consistency constrained to the
1654 : // provided set of key ranges. See the comment at EventuallyFileOnlySnapshot for
1655 : // its semantics.
1656 1 : func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
1657 1 : if err := d.closed.Load(); err != nil {
1658 0 : panic(err)
1659 : }
1660 1 : for i := range keyRanges {
1661 1 : if i > 0 && d.cmp(keyRanges[i-1].End, keyRanges[i].Start) > 0 {
1662 0 : panic("pebble: key ranges for eventually-file-only-snapshot not in order")
1663 : }
1664 : }
1665 1 : return d.makeEventuallyFileOnlySnapshot(keyRanges)
1666 : }
1667 :
1668 : // Close closes the DB.
1669 : //
1670 : // It is not safe to close a DB until all outstanding iterators are closed
1671 : // or to call Close concurrently with any other DB method. It is not valid
1672 : // to call any of a DB's methods after the DB has been closed.
1673 1 : func (d *DB) Close() error {
1674 1 : if err := d.closed.Load(); err != nil {
1675 0 : panic(err)
1676 : }
1677 1 : d.compactionSchedulers.Wait()
1678 1 : // Compactions can be asynchronously started by the CompactionScheduler
1679 1 : // calling d.Schedule. When this Unregister returns, we know that the
1680 1 : // CompactionScheduler will never again call a method on the DB. Note that
1681 1 : // this must be called without holding d.mu.
1682 1 : d.opts.Experimental.CompactionScheduler.Unregister()
1683 1 : // Lock the commit pipeline for the duration of Close. This prevents a race
1684 1 : // with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires
1685 1 : // dropping d.mu several times for I/O. If Close only holds d.mu, an
1686 1 : // in-progress WAL rotation may re-acquire d.mu only once the database is
1687 1 : // closed.
1688 1 : //
1689 1 : // Additionally, locking the commit pipeline makes it more likely that
1690 1 : // (illegal) concurrent writes will observe d.closed.Load() != nil, creating
1691 1 : // more understable panics if the database is improperly used concurrently
1692 1 : // during Close.
1693 1 : d.commit.mu.Lock()
1694 1 : defer d.commit.mu.Unlock()
1695 1 : d.mu.Lock()
1696 1 : defer d.mu.Unlock()
1697 1 : // Check that the DB is not closed again. If there are two concurrent calls
1698 1 : // to DB.Close, the best-effort check at the top of DB.Close may not fire.
1699 1 : // But since this second check happens after mutex acquisition, the two
1700 1 : // concurrent calls will get serialized and the second one will see the
1701 1 : // effect of the d.closed.Store below.
1702 1 : if err := d.closed.Load(); err != nil {
1703 0 : panic(err)
1704 : }
1705 : // Clear the finalizer that is used to check that an unreferenced DB has been
1706 : // closed. We're closing the DB here, so the check performed by that
1707 : // finalizer isn't necessary.
1708 : //
1709 : // Note: this is a no-op if invariants are disabled or race is enabled.
1710 1 : invariants.SetFinalizer(d.closed, nil)
1711 1 :
1712 1 : d.closed.Store(errors.WithStack(ErrClosed))
1713 1 : close(d.closedCh)
1714 1 :
1715 1 : defer d.cacheHandle.Close()
1716 1 :
1717 1 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
1718 1 : d.mu.compact.cond.Wait()
1719 1 : }
1720 1 : for d.mu.tableStats.loading {
1721 1 : d.mu.tableStats.cond.Wait()
1722 1 : }
1723 1 : for d.mu.tableValidation.validating {
1724 1 : d.mu.tableValidation.cond.Wait()
1725 1 : }
1726 :
1727 1 : var err error
1728 1 : if n := len(d.mu.compact.inProgress); n > 0 {
1729 0 : err = errors.Errorf("pebble: %d unexpected in-progress compactions", errors.Safe(n))
1730 0 : }
1731 1 : err = firstError(err, d.mu.formatVers.marker.Close())
1732 1 : if !d.opts.ReadOnly {
1733 1 : if d.mu.log.writer != nil {
1734 1 : _, err2 := d.mu.log.writer.Close()
1735 1 : err = firstError(err, err2)
1736 1 : }
1737 0 : } else if d.mu.log.writer != nil {
1738 0 : panic("pebble: log-writer should be nil in read-only mode")
1739 : }
1740 1 : err = firstError(err, d.mu.log.manager.Close())
1741 1 : err = firstError(err, d.dataDirLock.Close())
1742 1 :
1743 1 : // Note that versionSet.close() only closes the MANIFEST. The versions list
1744 1 : // is still valid for the checks below.
1745 1 : err = firstError(err, d.mu.versions.close())
1746 1 :
1747 1 : err = firstError(err, d.dataDir.Close())
1748 1 :
1749 1 : d.readState.val.unrefLocked()
1750 1 :
1751 1 : current := d.mu.versions.currentVersion()
1752 1 : for v := d.mu.versions.versions.Front(); true; v = v.Next() {
1753 1 : refs := v.Refs()
1754 1 : if v == current {
1755 1 : if refs != 1 {
1756 0 : err = firstError(err, errors.Errorf("leaked iterators: current\n%s", v))
1757 0 : }
1758 1 : break
1759 : }
1760 0 : if refs != 0 {
1761 0 : err = firstError(err, errors.Errorf("leaked iterators:\n%s", v))
1762 0 : }
1763 : }
1764 :
1765 1 : for _, mem := range d.mu.mem.queue {
1766 1 : // Usually, we'd want to delete the files returned by readerUnref. But
1767 1 : // in this case, even if we're unreferencing the flushables, the
1768 1 : // flushables aren't obsolete. They will be reconstructed during WAL
1769 1 : // replay.
1770 1 : mem.readerUnrefLocked(false)
1771 1 : }
1772 : // If there's an unused, recycled memtable, we need to release its memory.
1773 1 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
1774 1 : d.freeMemTable(obsoleteMemTable)
1775 1 : }
1776 1 : if reserved := d.memTableReserved.Load(); reserved != 0 {
1777 0 : err = firstError(err, errors.Errorf("leaked memtable reservation: %d", errors.Safe(reserved)))
1778 0 : }
1779 :
1780 : // Since we called d.readState.val.unrefLocked() above, we are expected to
1781 : // manually schedule deletion of obsolete files.
1782 1 : if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
1783 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
1784 1 : }
1785 :
1786 1 : d.mu.Unlock()
1787 1 :
1788 1 : // Wait for all cleaning jobs to finish.
1789 1 : d.cleanupManager.Close()
1790 1 :
1791 1 : // Sanity check metrics.
1792 1 : if invariants.Enabled {
1793 1 : m := d.Metrics()
1794 1 : if m.Compact.NumInProgress > 0 || m.Compact.InProgressBytes > 0 {
1795 0 : d.mu.Lock()
1796 0 : panic(fmt.Sprintf("invalid metrics on close:\n%s", m))
1797 : }
1798 : }
1799 :
1800 1 : d.mu.Lock()
1801 1 :
1802 1 : // As a sanity check, ensure that there are no zombie tables or blob files.
1803 1 : // A non-zero count hints at a reference count leak.
1804 1 : if ztbls := d.mu.versions.zombieTables.Count(); ztbls > 0 {
1805 0 : err = firstError(err, errors.Errorf("non-zero zombie file count: %d", ztbls))
1806 0 : }
1807 1 : if zblobs := d.mu.versions.zombieBlobs.Count(); zblobs > 0 {
1808 0 : err = firstError(err, errors.Errorf("non-zero zombie blob count: %d", zblobs))
1809 0 : }
1810 :
1811 1 : err = firstError(err, d.fileCache.Close())
1812 1 :
1813 1 : err = firstError(err, d.objProvider.Close())
1814 1 :
1815 1 : // If the options include a closer to 'close' the filesystem, close it.
1816 1 : if d.opts.private.fsCloser != nil {
1817 1 : d.opts.private.fsCloser.Close()
1818 1 : }
1819 :
1820 : // Return an error if the user failed to close all open snapshots.
1821 1 : if v := d.mu.snapshots.count(); v > 0 {
1822 0 : err = firstError(err, errors.Errorf("leaked snapshots: %d open snapshots on DB %p", v, d))
1823 0 : }
1824 :
1825 1 : return err
1826 : }
1827 :
1828 : // Compact the specified range of keys in the database.
1829 1 : func (d *DB) Compact(ctx context.Context, start, end []byte, parallelize bool) error {
1830 1 : if err := d.closed.Load(); err != nil {
1831 0 : panic(err)
1832 : }
1833 1 : if d.opts.ReadOnly {
1834 0 : return ErrReadOnly
1835 0 : }
1836 1 : if d.cmp(start, end) >= 0 {
1837 1 : return errors.Errorf("Compact start %s is not less than end %s",
1838 1 : d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
1839 1 : }
1840 :
1841 1 : d.mu.Lock()
1842 1 : maxLevelWithFiles := 1
1843 1 : cur := d.mu.versions.currentVersion()
1844 1 : for level := 0; level < numLevels; level++ {
1845 1 : overlaps := cur.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
1846 1 : if !overlaps.Empty() {
1847 1 : maxLevelWithFiles = level + 1
1848 1 : }
1849 : }
1850 :
1851 : // Determine if any memtable overlaps with the compaction range. We wait for
1852 : // any such overlap to flush (initiating a flush if necessary).
1853 1 : mem, err := func() (*flushableEntry, error) {
1854 1 : // Check to see if any files overlap with any of the memtables. The queue
1855 1 : // is ordered from oldest to newest with the mutable memtable being the
1856 1 : // last element in the slice. We want to wait for the newest table that
1857 1 : // overlaps.
1858 1 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1859 1 : mem := d.mu.mem.queue[i]
1860 1 : var anyOverlaps bool
1861 1 : mem.computePossibleOverlaps(func(b bounded) shouldContinue {
1862 1 : anyOverlaps = true
1863 1 : return stopIteration
1864 1 : }, KeyRange{Start: start, End: end})
1865 1 : if !anyOverlaps {
1866 1 : continue
1867 : }
1868 1 : var err error
1869 1 : if mem.flushable == d.mu.mem.mutable {
1870 1 : // We have to hold both commitPipeline.mu and DB.mu when calling
1871 1 : // makeRoomForWrite(). Lock order requirements elsewhere force us to
1872 1 : // unlock DB.mu in order to grab commitPipeline.mu first.
1873 1 : d.mu.Unlock()
1874 1 : d.commit.mu.Lock()
1875 1 : d.mu.Lock()
1876 1 : defer d.commit.mu.Unlock() //nolint:deferloop
1877 1 : if mem.flushable == d.mu.mem.mutable {
1878 1 : // Only flush if the active memtable is unchanged.
1879 1 : err = d.makeRoomForWrite(nil)
1880 1 : }
1881 : }
1882 1 : mem.flushForced = true
1883 1 : d.maybeScheduleFlush()
1884 1 : return mem, err
1885 : }
1886 1 : return nil, nil
1887 : }()
1888 :
1889 1 : d.mu.Unlock()
1890 1 :
1891 1 : if err != nil {
1892 0 : return err
1893 0 : }
1894 1 : if mem != nil {
1895 1 : select {
1896 1 : case <-mem.flushed:
1897 0 : case <-ctx.Done():
1898 0 : return ctx.Err()
1899 : }
1900 : }
1901 :
1902 1 : for level := 0; level < maxLevelWithFiles; {
1903 1 : for {
1904 1 : if err := d.manualCompact(
1905 1 : ctx, start, end, level, parallelize); err != nil {
1906 0 : if errors.Is(err, ErrCancelledCompaction) {
1907 0 : continue
1908 : }
1909 0 : return err
1910 : }
1911 1 : break
1912 : }
1913 1 : level++
1914 1 : if level == numLevels-1 {
1915 1 : // A manual compaction of the bottommost level occurred.
1916 1 : // There is no next level to try and compact.
1917 1 : break
1918 : }
1919 : }
1920 1 : return nil
1921 : }
1922 :
1923 : func (d *DB) manualCompact(
1924 : ctx context.Context, start, end []byte, level int, parallelize bool,
1925 1 : ) error {
1926 1 : d.mu.Lock()
1927 1 : curr := d.mu.versions.currentVersion()
1928 1 : files := curr.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
1929 1 : if files.Empty() {
1930 1 : d.mu.Unlock()
1931 1 : return nil
1932 1 : }
1933 :
1934 1 : var compactions []*manualCompaction
1935 1 : if parallelize {
1936 1 : compactions = append(compactions, d.splitManualCompaction(start, end, level)...)
1937 1 : } else {
1938 1 : compactions = append(compactions, &manualCompaction{
1939 1 : level: level,
1940 1 : done: make(chan error, 1),
1941 1 : start: start,
1942 1 : end: end,
1943 1 : })
1944 1 : }
1945 1 : n := len(compactions)
1946 1 : if n == 0 {
1947 0 : d.mu.Unlock()
1948 0 : return nil
1949 0 : }
1950 1 : for i := range compactions {
1951 1 : d.mu.compact.manualID++
1952 1 : compactions[i].id = d.mu.compact.manualID
1953 1 : }
1954 : // [manualIDStart, manualIDEnd] are the compactions that have been added to
1955 : // d.mu.compact.manual.
1956 1 : manualIDStart := compactions[0].id
1957 1 : manualIDEnd := compactions[n-1].id
1958 1 : d.mu.compact.manual = append(d.mu.compact.manual, compactions...)
1959 1 : d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
1960 1 : d.maybeScheduleCompaction()
1961 1 : d.mu.Unlock()
1962 1 :
1963 1 : // On context cancellation, we only cancel the compactions that have not yet
1964 1 : // started. The assumption is that it is relatively harmless to have the
1965 1 : // already started compactions run to completion. We don't wait for the
1966 1 : // ongoing compactions to finish, since the assumption is that the caller
1967 1 : // has already given up on the operation (and the cancellation error is
1968 1 : // going to be returned anyway).
1969 1 : //
1970 1 : // An alternative would be to store the context in each *manualCompaction,
1971 1 : // and have the goroutine that retrieves the *manualCompaction for running
1972 1 : // notice the cancellation and write the cancellation error to
1973 1 : // manualCompaction.done. That approach would require this method to wait
1974 1 : // for all the *manualCompactions it has enqueued to finish before returning
1975 1 : // (to not leak a context). Since there is no timeliness guarantee on when a
1976 1 : // *manualCompaction will be retrieved for running, the wait until a
1977 1 : // cancelled context causes this method to return is not bounded. Hence, we
1978 1 : // don't adopt that approach.
1979 1 : cancelPendingCompactions := func() {
1980 0 : d.mu.Lock()
1981 0 : for i := 0; i < len(d.mu.compact.manual); {
1982 0 : if d.mu.compact.manual[i].id >= manualIDStart && d.mu.compact.manual[i].id <= manualIDEnd {
1983 0 : d.mu.compact.manual = slices.Delete(d.mu.compact.manual, i, i+1)
1984 0 : d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
1985 0 : } else {
1986 0 : i++
1987 0 : }
1988 : }
1989 0 : d.mu.Unlock()
1990 : }
1991 : // Each of the channels is guaranteed to be eventually sent to once. After a
1992 : // compaction is possibly picked in d.maybeScheduleCompaction(), either the
1993 : // compaction is dropped, executed after being scheduled, or retried later.
1994 : // Assuming eventual progress when a compaction is retried, all outcomes send
1995 : // a value to the done channel. Since the channels are buffered, it is not
1996 : // necessary to read from each channel, and so we can exit early in the event
1997 : // of an error.
1998 1 : for _, compaction := range compactions {
1999 1 : select {
2000 0 : case <-ctx.Done():
2001 0 : cancelPendingCompactions()
2002 0 : return ctx.Err()
2003 1 : case err := <-compaction.done:
2004 1 : if err != nil {
2005 0 : cancelPendingCompactions()
2006 0 : return err
2007 0 : }
2008 : }
2009 : }
2010 1 : return nil
2011 : }
2012 :
2013 : // splitManualCompaction splits a manual compaction over [start,end] on level
2014 : // such that the resulting compactions have no key overlap.
2015 : func (d *DB) splitManualCompaction(
2016 : start, end []byte, level int,
2017 1 : ) (splitCompactions []*manualCompaction) {
2018 1 : curr := d.mu.versions.currentVersion()
2019 1 : endLevel := level + 1
2020 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
2021 1 : if level == 0 {
2022 1 : endLevel = baseLevel
2023 1 : }
2024 1 : keyRanges := curr.CalculateInuseKeyRanges(d.mu.versions.latest.l0Organizer, level, endLevel, start, end)
2025 1 : for _, keyRange := range keyRanges {
2026 1 : splitCompactions = append(splitCompactions, &manualCompaction{
2027 1 : level: level,
2028 1 : done: make(chan error, 1),
2029 1 : start: keyRange.Start,
2030 1 : end: keyRange.End.Key,
2031 1 : split: true,
2032 1 : })
2033 1 : }
2034 1 : return splitCompactions
2035 : }
2036 :
2037 : // Flush the memtable to stable storage.
2038 1 : func (d *DB) Flush() error {
2039 1 : flushDone, err := d.AsyncFlush()
2040 1 : if err != nil {
2041 0 : return err
2042 0 : }
2043 1 : <-flushDone
2044 1 : return nil
2045 : }
2046 :
2047 : // AsyncFlush asynchronously flushes the memtable to stable storage.
2048 : //
2049 : // If no error is returned, the caller can receive from the returned channel in
2050 : // order to wait for the flush to complete.
2051 1 : func (d *DB) AsyncFlush() (<-chan struct{}, error) {
2052 1 : if err := d.closed.Load(); err != nil {
2053 0 : panic(err)
2054 : }
2055 1 : if d.opts.ReadOnly {
2056 0 : return nil, ErrReadOnly
2057 0 : }
2058 :
2059 1 : d.commit.mu.Lock()
2060 1 : defer d.commit.mu.Unlock()
2061 1 : d.mu.Lock()
2062 1 : defer d.mu.Unlock()
2063 1 : flushed := d.mu.mem.queue[len(d.mu.mem.queue)-1].flushed
2064 1 : err := d.makeRoomForWrite(nil)
2065 1 : if err != nil {
2066 0 : return nil, err
2067 0 : }
2068 1 : return flushed, nil
2069 : }
2070 :
2071 : // Metrics returns metrics about the database.
2072 1 : func (d *DB) Metrics() *Metrics {
2073 1 : metrics := &Metrics{}
2074 1 : walStats := d.mu.log.manager.Stats()
2075 1 : completedObsoleteFileStats := d.cleanupManager.CompletedStats()
2076 1 :
2077 1 : d.mu.Lock()
2078 1 : vers := d.mu.versions.currentVersion()
2079 1 : *metrics = d.mu.versions.metrics
2080 1 : metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt()
2081 1 : metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load()
2082 1 : // TODO(radu): split this to separate the download compactions.
2083 1 : metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount + d.mu.compact.downloadingCount)
2084 1 : metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction
2085 1 : metrics.Compact.Duration = d.mu.compact.duration
2086 1 : for c := range d.mu.compact.inProgress {
2087 0 : if !c.IsFlush() {
2088 0 : metrics.Compact.Duration += d.timeNow().Sub(c.BeganAt())
2089 0 : }
2090 : }
2091 1 : metrics.Compact.NumProblemSpans = d.problemSpans.Len()
2092 1 :
2093 1 : for _, m := range d.mu.mem.queue {
2094 1 : metrics.MemTable.Size += m.totalBytes()
2095 1 : }
2096 1 : metrics.Snapshots.Count = d.mu.snapshots.count()
2097 1 : if metrics.Snapshots.Count > 0 {
2098 0 : metrics.Snapshots.EarliestSeqNum = d.mu.snapshots.earliest()
2099 0 : }
2100 1 : metrics.Snapshots.PinnedKeys = d.mu.snapshots.cumulativePinnedCount
2101 1 : metrics.Snapshots.PinnedSize = d.mu.snapshots.cumulativePinnedSize
2102 1 : metrics.MemTable.Count = int64(len(d.mu.mem.queue))
2103 1 : metrics.MemTable.ZombieCount = d.memTableCount.Load() - metrics.MemTable.Count
2104 1 : metrics.MemTable.ZombieSize = uint64(d.memTableReserved.Load()) - metrics.MemTable.Size
2105 1 : metrics.WAL.ObsoleteFiles = int64(walStats.ObsoleteFileCount)
2106 1 : metrics.WAL.ObsoletePhysicalSize = walStats.ObsoleteFileSize
2107 1 : metrics.WAL.Files = int64(walStats.LiveFileCount)
2108 1 : // The current WAL's size (d.logSize) is the logical size, which may be less
2109 1 : // than the WAL's physical size if it was recycled. walStats.LiveFileSize
2110 1 : // includes the physical size of all live WALs, but for the current WAL it
2111 1 : // reflects the physical size when it was opened. So it is possible that
2112 1 : // d.atomic.logSize has exceeded that physical size. We allow for this
2113 1 : // anomaly.
2114 1 : metrics.WAL.PhysicalSize = walStats.LiveFileSize
2115 1 : metrics.WAL.BytesIn = d.logBytesIn.Load()
2116 1 : metrics.WAL.Size = d.logSize.Load()
2117 1 : for i, n := 0, len(d.mu.mem.queue)-1; i < n; i++ {
2118 1 : metrics.WAL.Size += d.mu.mem.queue[i].logSize
2119 1 : }
2120 1 : metrics.WAL.BytesWritten = metrics.Levels[0].TableBytesIn + metrics.WAL.Size
2121 1 : metrics.WAL.Failover = walStats.Failover
2122 1 :
2123 1 : if p := d.mu.versions.picker; p != nil {
2124 1 : compactions := d.getInProgressCompactionInfoLocked(nil)
2125 1 : m := p.getMetrics(compactions)
2126 1 : for level, lm := range m.levels {
2127 1 : metrics.Levels[level].Score = lm.score
2128 1 : metrics.Levels[level].FillFactor = lm.fillFactor
2129 1 : metrics.Levels[level].CompensatedFillFactor = lm.compensatedFillFactor
2130 1 : }
2131 : }
2132 1 : metrics.Table.ZombieCount = int64(d.mu.versions.zombieTables.Count())
2133 1 : metrics.Table.ZombieSize = d.mu.versions.zombieTables.TotalSize()
2134 1 : metrics.Table.Local.ZombieCount, metrics.Table.Local.ZombieSize = d.mu.versions.zombieTables.LocalStats()
2135 1 :
2136 1 : // The obsolete blob/table metrics have a subtle calculation:
2137 1 : //
2138 1 : // (A) The vs.metrics.{Table,BlobFiles}.[Local.]{ObsoleteCount,ObsoleteSize}
2139 1 : // fields reflect the set of files currently sitting in
2140 1 : // vs.obsolete{Tables,Blobs} but not yet enqueued to the cleanup manager.
2141 1 : //
2142 1 : // (B) The d.mu.fileDeletions.queuedStats field holds the set of files that have
2143 1 : // been queued for deletion by the cleanup manager.
2144 1 : //
2145 1 : // (C) The cleanup manager also maintains cumulative stats for the set of
2146 1 : // files that have been deleted.
2147 1 : //
2148 1 : // The value of currently pending obsolete files is (A) + (B) - (C).
2149 1 : pendingObsoleteFileStats := d.mu.fileDeletions.queuedStats
2150 1 : pendingObsoleteFileStats.Sub(completedObsoleteFileStats)
2151 1 : metrics.Table.Local.ObsoleteCount += pendingObsoleteFileStats.tablesLocal.count
2152 1 : metrics.Table.Local.ObsoleteSize += pendingObsoleteFileStats.tablesLocal.size
2153 1 : metrics.Table.ObsoleteCount += int64(pendingObsoleteFileStats.tablesAll.count)
2154 1 : metrics.Table.ObsoleteSize += pendingObsoleteFileStats.tablesAll.size
2155 1 : metrics.BlobFiles.Local.ObsoleteCount += pendingObsoleteFileStats.blobFilesLocal.count
2156 1 : metrics.BlobFiles.Local.ObsoleteSize += pendingObsoleteFileStats.blobFilesLocal.size
2157 1 : metrics.BlobFiles.ObsoleteCount += pendingObsoleteFileStats.blobFilesAll.count
2158 1 : metrics.BlobFiles.ObsoleteSize += pendingObsoleteFileStats.blobFilesAll.size
2159 1 : metrics.private.optionsFileSize = d.optionsFileSize
2160 1 :
2161 1 : // TODO(jackson): Consider making these metrics optional.
2162 1 : metrics.Keys.RangeKeySetsCount = *rangeKeySetsAnnotator.MultiLevelAnnotation(vers.RangeKeyLevels[:])
2163 1 : metrics.Keys.TombstoneCount = *tombstonesAnnotator.MultiLevelAnnotation(vers.Levels[:])
2164 1 :
2165 1 : metrics.Table.Garbage.PointDeletionsBytesEstimate =
2166 1 : *pointDeletionsBytesEstimateAnnotator.MultiLevelAnnotation(vers.Levels[:])
2167 1 : metrics.Table.Garbage.RangeDeletionsBytesEstimate =
2168 1 : *rangeDeletionsBytesEstimateAnnotator.MultiLevelAnnotation(vers.Levels[:])
2169 1 :
2170 1 : d.mu.versions.logLock()
2171 1 : metrics.private.manifestFileSize = uint64(d.mu.versions.manifest.Size())
2172 1 : backingCount, backingTotalSize := d.mu.versions.latest.virtualBackings.Stats()
2173 1 : metrics.Table.BackingTableCount = uint64(backingCount)
2174 1 : metrics.Table.BackingTableSize = backingTotalSize
2175 1 : blobStats, _ := d.mu.versions.latest.blobFiles.Stats()
2176 1 : d.mu.versions.logUnlock()
2177 1 : metrics.BlobFiles.LiveCount = blobStats.Count
2178 1 : metrics.BlobFiles.LiveSize = blobStats.PhysicalSize
2179 1 : metrics.BlobFiles.ValueSize = blobStats.ValueSize
2180 1 : metrics.BlobFiles.ReferencedValueSize = blobStats.ReferencedValueSize
2181 1 :
2182 1 : metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
2183 1 : if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
2184 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2185 0 : }
2186 1 : metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput
2187 1 : if d.mu.compact.flushing {
2188 0 : metrics.Flush.NumInProgress = 1
2189 0 : }
2190 1 : for i := 0; i < numLevels; i++ {
2191 1 : metrics.Levels[i].Additional.ValueBlocksSize = *valueBlockSizeAnnotator.LevelAnnotation(vers.Levels[i])
2192 1 : compressionMetrics := compressionStatsAnnotator.LevelAnnotation(vers.Levels[i])
2193 1 : metrics.Table.Compression.MergeWith(compressionMetrics)
2194 1 : }
2195 :
2196 1 : metrics.Table.PendingStatsCollectionCount = int64(len(d.mu.tableStats.pending))
2197 1 : metrics.Table.InitialStatsCollectionComplete = d.mu.tableStats.loadedInitial
2198 1 :
2199 1 : d.mu.Unlock()
2200 1 :
2201 1 : metrics.BlockCache = d.opts.Cache.Metrics()
2202 1 : metrics.FileCache, metrics.Filter = d.fileCache.Metrics()
2203 1 : metrics.TableIters = d.fileCache.IterCount()
2204 1 : metrics.CategoryStats = d.fileCache.SSTStatsCollector().GetStats()
2205 1 :
2206 1 : metrics.SecondaryCacheMetrics = d.objProvider.Metrics()
2207 1 :
2208 1 : metrics.Uptime = d.timeNow().Sub(d.openedAt)
2209 1 :
2210 1 : metrics.manualMemory = manual.GetMetrics()
2211 1 :
2212 1 : return metrics
2213 : }
2214 :
2215 : // sstablesOptions hold the optional parameters to retrieve TableInfo for all sstables.
2216 : type sstablesOptions struct {
2217 : // set to true will return the sstable properties in TableInfo
2218 : withProperties bool
2219 :
2220 : // if set, return sstables that overlap the key range (end-exclusive)
2221 : start []byte
2222 : end []byte
2223 :
2224 : withApproximateSpanBytes bool
2225 : }
2226 :
2227 : // SSTablesOption set optional parameter used by `DB.SSTables`.
2228 : type SSTablesOption func(*sstablesOptions)
2229 :
2230 : // WithProperties enable return sstable properties in each TableInfo.
2231 : //
2232 : // NOTE: if most of the sstable properties need to be read from disk,
2233 : // this options may make method `SSTables` quite slow.
2234 0 : func WithProperties() SSTablesOption {
2235 0 : return func(opt *sstablesOptions) {
2236 0 : opt.withProperties = true
2237 0 : }
2238 : }
2239 :
2240 : // WithKeyRangeFilter ensures returned sstables overlap start and end (end-exclusive)
2241 : // if start and end are both nil these properties have no effect.
2242 0 : func WithKeyRangeFilter(start, end []byte) SSTablesOption {
2243 0 : return func(opt *sstablesOptions) {
2244 0 : opt.end = end
2245 0 : opt.start = start
2246 0 : }
2247 : }
2248 :
2249 : // WithApproximateSpanBytes enables capturing the approximate number of bytes that
2250 : // overlap the provided key span for each sstable.
2251 : // NOTE: This option requires WithKeyRangeFilter.
2252 0 : func WithApproximateSpanBytes() SSTablesOption {
2253 0 : return func(opt *sstablesOptions) {
2254 0 : opt.withApproximateSpanBytes = true
2255 0 : }
2256 : }
2257 :
2258 : // BackingType denotes the type of storage backing a given sstable.
2259 : type BackingType int
2260 :
2261 : const (
2262 : // BackingTypeLocal denotes an sstable stored on local disk according to the
2263 : // objprovider. This file is completely owned by us.
2264 : BackingTypeLocal BackingType = iota
2265 : // BackingTypeShared denotes an sstable stored on shared storage, created
2266 : // by this Pebble instance and possibly shared by other Pebble instances.
2267 : // These types of files have lifecycle managed by Pebble.
2268 : BackingTypeShared
2269 : // BackingTypeSharedForeign denotes an sstable stored on shared storage,
2270 : // created by a Pebble instance other than this one. These types of files have
2271 : // lifecycle managed by Pebble.
2272 : BackingTypeSharedForeign
2273 : // BackingTypeExternal denotes an sstable stored on external storage,
2274 : // not owned by any Pebble instance and with no refcounting/cleanup methods
2275 : // or lifecycle management. An example of an external file is a file restored
2276 : // from a backup.
2277 : BackingTypeExternal
2278 : backingTypeCount
2279 : )
2280 :
2281 : var backingTypeToString = [backingTypeCount]string{
2282 : BackingTypeLocal: "local",
2283 : BackingTypeShared: "shared",
2284 : BackingTypeSharedForeign: "shared-foreign",
2285 : BackingTypeExternal: "external",
2286 : }
2287 :
2288 : // String implements fmt.Stringer.
2289 0 : func (b BackingType) String() string {
2290 0 : return backingTypeToString[b]
2291 0 : }
2292 :
2293 : // SSTableInfo export manifest.TableInfo with sstable.Properties alongside
2294 : // other file backing info.
2295 : type SSTableInfo struct {
2296 : manifest.TableInfo
2297 : TableStats manifest.TableStats
2298 : // Virtual indicates whether the sstable is virtual.
2299 : Virtual bool
2300 : // BackingSSTNum is the disk file number associated with the backing sstable.
2301 : // If Virtual is false, BackingSSTNum == PhysicalTableDiskFileNum(TableNum).
2302 : BackingSSTNum base.DiskFileNum
2303 : // BackingType is the type of storage backing this sstable.
2304 : BackingType BackingType
2305 : // Locator is the remote.Locator backing this sstable, if the backing type is
2306 : // not BackingTypeLocal.
2307 : Locator remote.Locator
2308 : // ApproximateSpanBytes describes the approximate number of bytes within the
2309 : // sstable that fall within a particular span. It's populated only when the
2310 : // ApproximateSpanBytes option is passed into DB.SSTables.
2311 : ApproximateSpanBytes uint64 `json:"ApproximateSpanBytes,omitempty"`
2312 :
2313 : // Properties is the sstable properties of this table. If Virtual is true,
2314 : // then the Properties are associated with the backing sst.
2315 : Properties *sstable.Properties
2316 : }
2317 :
2318 : // SSTables retrieves the current sstables. The returned slice is indexed by
2319 : // level and each level is indexed by the position of the sstable within the
2320 : // level. Note that this information may be out of date due to concurrent
2321 : // flushes and compactions.
2322 0 : func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
2323 0 : opt := &sstablesOptions{}
2324 0 : for _, fn := range opts {
2325 0 : fn(opt)
2326 0 : }
2327 :
2328 0 : if opt.withApproximateSpanBytes && (opt.start == nil || opt.end == nil) {
2329 0 : return nil, errors.Errorf("cannot use WithApproximateSpanBytes without WithKeyRangeFilter option")
2330 0 : }
2331 :
2332 : // Grab and reference the current readState.
2333 0 : readState := d.loadReadState()
2334 0 : defer readState.unref()
2335 0 :
2336 0 : // TODO(peter): This is somewhat expensive, especially on a large
2337 0 : // database. It might be worthwhile to unify TableInfo and TableMetadata and
2338 0 : // then we could simply return current.Files. Note that RocksDB is doing
2339 0 : // something similar to the current code, so perhaps it isn't too bad.
2340 0 : srcLevels := readState.current.Levels
2341 0 : var totalTables int
2342 0 : for i := range srcLevels {
2343 0 : totalTables += srcLevels[i].Len()
2344 0 : }
2345 :
2346 0 : destTables := make([]SSTableInfo, totalTables)
2347 0 : destLevels := make([][]SSTableInfo, len(srcLevels))
2348 0 : for i := range destLevels {
2349 0 : j := 0
2350 0 : for m := range srcLevels[i].All() {
2351 0 : if opt.start != nil && opt.end != nil {
2352 0 : b := base.UserKeyBoundsEndExclusive(opt.start, opt.end)
2353 0 : if !m.Overlaps(d.opts.Comparer.Compare, &b) {
2354 0 : continue
2355 : }
2356 : }
2357 0 : destTables[j] = SSTableInfo{
2358 0 : TableInfo: m.TableInfo(),
2359 0 : }
2360 0 : if stats, ok := m.Stats(); ok {
2361 0 : destTables[j].TableStats = *stats
2362 0 : }
2363 0 : if opt.withProperties {
2364 0 : p, err := d.fileCache.getTableProperties(
2365 0 : m,
2366 0 : )
2367 0 : if err != nil {
2368 0 : return nil, err
2369 0 : }
2370 0 : if m.Virtual {
2371 0 : commonProps := p.GetScaledProperties(m.TableBacking.Size, m.Size)
2372 0 : p = &sstable.Properties{CommonProperties: commonProps}
2373 0 : }
2374 0 : destTables[j].Properties = p
2375 : }
2376 0 : destTables[j].Virtual = m.Virtual
2377 0 : destTables[j].BackingSSTNum = m.TableBacking.DiskFileNum
2378 0 : objMeta, err := d.objProvider.Lookup(base.FileTypeTable, m.TableBacking.DiskFileNum)
2379 0 : if err != nil {
2380 0 : return nil, err
2381 0 : }
2382 0 : if objMeta.IsRemote() {
2383 0 : if objMeta.IsShared() {
2384 0 : if d.objProvider.IsSharedForeign(objMeta) {
2385 0 : destTables[j].BackingType = BackingTypeSharedForeign
2386 0 : } else {
2387 0 : destTables[j].BackingType = BackingTypeShared
2388 0 : }
2389 0 : } else {
2390 0 : destTables[j].BackingType = BackingTypeExternal
2391 0 : }
2392 0 : destTables[j].Locator = objMeta.Remote.Locator
2393 0 : } else {
2394 0 : destTables[j].BackingType = BackingTypeLocal
2395 0 : }
2396 :
2397 0 : if opt.withApproximateSpanBytes {
2398 0 : if m.ContainedWithinSpan(d.opts.Comparer.Compare, opt.start, opt.end) {
2399 0 : destTables[j].ApproximateSpanBytes = m.Size
2400 0 : } else {
2401 0 : size, err := d.fileCache.estimateSize(m, opt.start, opt.end)
2402 0 : if err != nil {
2403 0 : return nil, err
2404 0 : }
2405 0 : destTables[j].ApproximateSpanBytes = size
2406 : }
2407 : }
2408 0 : j++
2409 : }
2410 0 : destLevels[i] = destTables[:j]
2411 0 : destTables = destTables[j:]
2412 : }
2413 :
2414 0 : return destLevels, nil
2415 : }
2416 :
2417 : // makeFileSizeAnnotator returns an annotator that computes the total
2418 : // storage size of files that meet some criteria defined by filter. When
2419 : // applicable, this includes both the sstable size and the size of any
2420 : // referenced blob files.
2421 : func (d *DB) makeFileSizeAnnotator(
2422 : filter func(f *manifest.TableMetadata) bool,
2423 1 : ) *manifest.Annotator[uint64] {
2424 1 : return &manifest.Annotator[uint64]{
2425 1 : Aggregator: manifest.SumAggregator{
2426 1 : AccumulateFunc: func(f *manifest.TableMetadata) (uint64, bool) {
2427 0 : if filter(f) {
2428 0 : return f.Size + f.EstimatedReferenceSize(), true
2429 0 : }
2430 0 : return 0, true
2431 : },
2432 0 : AccumulatePartialOverlapFunc: func(f *manifest.TableMetadata, bounds base.UserKeyBounds) uint64 {
2433 0 : if filter(f) {
2434 0 : overlappingFileSize, err := d.fileCache.estimateSize(f, bounds.Start, bounds.End.Key)
2435 0 : if err != nil {
2436 0 : return 0
2437 0 : }
2438 0 : overlapFraction := float64(overlappingFileSize) / float64(f.Size)
2439 0 : // Scale the blob reference size proportionally to the file
2440 0 : // overlap from the bounds to approximate only the blob
2441 0 : // references that overlap with the requested bounds.
2442 0 : return overlappingFileSize + uint64(float64(f.EstimatedReferenceSize())*overlapFraction)
2443 : }
2444 0 : return 0
2445 : },
2446 : },
2447 : }
2448 : }
2449 :
2450 : // EstimateDiskUsage returns the estimated filesystem space used in bytes for
2451 : // storing the range `[start, end]`. The estimation is computed as follows:
2452 : //
2453 : // - For sstables fully contained in the range the whole file size is included.
2454 : // - For sstables partially contained in the range the overlapping data block sizes
2455 : // are included. Even if a data block partially overlaps, or we cannot determine
2456 : // overlap due to abbreviated index keys, the full data block size is included in
2457 : // the estimation. Note that unlike fully contained sstables, none of the
2458 : // meta-block space is counted for partially overlapped files.
2459 : // - For virtual sstables, we use the overlap between start, end and the virtual
2460 : // sstable bounds to determine disk usage.
2461 : // - There may also exist WAL entries for unflushed keys in this range. This
2462 : // estimation currently excludes space used for the range in the WAL.
2463 0 : func (d *DB) EstimateDiskUsage(start, end []byte) (uint64, error) {
2464 0 : bytes, _, _, err := d.EstimateDiskUsageByBackingType(start, end)
2465 0 : return bytes, err
2466 0 : }
2467 :
2468 : // EstimateDiskUsageByBackingType is like EstimateDiskUsage but additionally
2469 : // returns the subsets of that size in remote ane external files.
2470 : func (d *DB) EstimateDiskUsageByBackingType(
2471 : start, end []byte,
2472 0 : ) (totalSize, remoteSize, externalSize uint64, _ error) {
2473 0 : if err := d.closed.Load(); err != nil {
2474 0 : panic(err)
2475 : }
2476 :
2477 0 : bounds := base.UserKeyBoundsInclusive(start, end)
2478 0 : if !bounds.Valid(d.cmp) {
2479 0 : return 0, 0, 0, errors.New("invalid key-range specified (start > end)")
2480 0 : }
2481 :
2482 : // Grab and reference the current readState. This prevents the underlying
2483 : // files in the associated version from being deleted if there is a concurrent
2484 : // compaction.
2485 0 : readState := d.loadReadState()
2486 0 : defer readState.unref()
2487 0 :
2488 0 : totalSize = *d.mu.annotators.totalFileSize.VersionRangeAnnotation(readState.current, bounds)
2489 0 : remoteSize = *d.mu.annotators.remoteSize.VersionRangeAnnotation(readState.current, bounds)
2490 0 : externalSize = *d.mu.annotators.externalSize.VersionRangeAnnotation(readState.current, bounds)
2491 0 :
2492 0 : return
2493 : }
2494 :
2495 1 : func (d *DB) walPreallocateSize() int {
2496 1 : // Set the WAL preallocate size to 110% of the memtable size. Note that there
2497 1 : // is a bit of apples and oranges in units here as the memtabls size
2498 1 : // corresponds to the memory usage of the memtable while the WAL size is the
2499 1 : // size of the batches (plus overhead) stored in the WAL.
2500 1 : //
2501 1 : // TODO(peter): 110% of the memtable size is quite hefty for a block
2502 1 : // size. This logic is taken from GetWalPreallocateBlockSize in
2503 1 : // RocksDB. Could a smaller preallocation block size be used?
2504 1 : size := d.opts.MemTableSize
2505 1 : size = (size / 10) + size
2506 1 : return int(size)
2507 1 : }
2508 :
2509 : func (d *DB) newMemTable(
2510 : logNum base.DiskFileNum, logSeqNum base.SeqNum, minSize uint64,
2511 1 : ) (*memTable, *flushableEntry) {
2512 1 : targetSize := minSize + uint64(memTableEmptySize)
2513 1 : // The targetSize should be less than MemTableSize, because any batch >=
2514 1 : // MemTableSize/2 should be treated as a large flushable batch.
2515 1 : if targetSize > d.opts.MemTableSize {
2516 0 : panic(errors.AssertionFailedf("attempting to allocate memtable larger than MemTableSize"))
2517 : }
2518 : // Double until the next memtable size is at least large enough to fit
2519 : // minSize.
2520 1 : for d.mu.mem.nextSize < targetSize {
2521 0 : d.mu.mem.nextSize = min(2*d.mu.mem.nextSize, d.opts.MemTableSize)
2522 0 : }
2523 1 : size := d.mu.mem.nextSize
2524 1 : // The next memtable should be double the size, up to Options.MemTableSize.
2525 1 : if d.mu.mem.nextSize < d.opts.MemTableSize {
2526 1 : d.mu.mem.nextSize = min(2*d.mu.mem.nextSize, d.opts.MemTableSize)
2527 1 : }
2528 :
2529 1 : memtblOpts := memTableOptions{
2530 1 : Options: d.opts,
2531 1 : logSeqNum: logSeqNum,
2532 1 : }
2533 1 :
2534 1 : // Before attempting to allocate a new memtable, check if there's one
2535 1 : // available for recycling in memTableRecycle. Large contiguous allocations
2536 1 : // can be costly as fragmentation makes it more difficult to find a large
2537 1 : // contiguous free space. We've observed 64MB allocations taking 10ms+.
2538 1 : //
2539 1 : // To reduce these costly allocations, up to 1 obsolete memtable is stashed
2540 1 : // in `d.memTableRecycle` to allow a future memtable rotation to reuse
2541 1 : // existing memory.
2542 1 : var mem *memTable
2543 1 : mem = d.memTableRecycle.Swap(nil)
2544 1 : if mem != nil && uint64(mem.arenaBuf.Len()) != size {
2545 1 : d.freeMemTable(mem)
2546 1 : mem = nil
2547 1 : }
2548 1 : if mem != nil {
2549 1 : // Carry through the existing buffer and memory reservation.
2550 1 : memtblOpts.arenaBuf = mem.arenaBuf
2551 1 : memtblOpts.releaseAccountingReservation = mem.releaseAccountingReservation
2552 1 : } else {
2553 1 : mem = new(memTable)
2554 1 : memtblOpts.arenaBuf = manual.New(manual.MemTable, uintptr(size))
2555 1 : memtblOpts.releaseAccountingReservation = d.opts.Cache.Reserve(int(size))
2556 1 : d.memTableCount.Add(1)
2557 1 : d.memTableReserved.Add(int64(size))
2558 1 :
2559 1 : // Note: this is a no-op if invariants are disabled or race is enabled.
2560 1 : invariants.SetFinalizer(mem, checkMemTable)
2561 1 : }
2562 1 : mem.init(memtblOpts)
2563 1 :
2564 1 : entry := d.newFlushableEntry(mem, logNum, logSeqNum)
2565 1 : entry.releaseMemAccounting = func() {
2566 1 : // If the user leaks iterators, we may be releasing the memtable after
2567 1 : // the DB is already closed. In this case, we want to just release the
2568 1 : // memory because DB.Close won't come along to free it for us.
2569 1 : if err := d.closed.Load(); err != nil {
2570 1 : d.freeMemTable(mem)
2571 1 : return
2572 1 : }
2573 :
2574 : // The next memtable allocation might be able to reuse this memtable.
2575 : // Stash it on d.memTableRecycle.
2576 1 : if unusedMem := d.memTableRecycle.Swap(mem); unusedMem != nil {
2577 1 : // There was already a memtable waiting to be recycled. We're now
2578 1 : // responsible for freeing it.
2579 1 : d.freeMemTable(unusedMem)
2580 1 : }
2581 : }
2582 1 : return mem, entry
2583 : }
2584 :
2585 1 : func (d *DB) freeMemTable(m *memTable) {
2586 1 : d.memTableCount.Add(-1)
2587 1 : d.memTableReserved.Add(-int64(m.arenaBuf.Len()))
2588 1 : m.free()
2589 1 : }
2590 :
2591 : func (d *DB) newFlushableEntry(
2592 : f flushable, logNum base.DiskFileNum, logSeqNum base.SeqNum,
2593 1 : ) *flushableEntry {
2594 1 : fe := &flushableEntry{
2595 1 : flushable: f,
2596 1 : flushed: make(chan struct{}),
2597 1 : logNum: logNum,
2598 1 : logSeqNum: logSeqNum,
2599 1 : deleteFn: d.mu.versions.addObsolete,
2600 1 : deleteFnLocked: d.mu.versions.addObsoleteLocked,
2601 1 : }
2602 1 : fe.readerRefs.Store(1)
2603 1 : return fe
2604 1 : }
2605 :
2606 : // maybeInduceWriteStall is called before performing a memtable rotation in
2607 : // makeRoomForWrite. In some conditions, we prefer to stall the user's write
2608 : // workload rather than continuing to accept writes that may result in resource
2609 : // exhaustion or prohibitively slow reads.
2610 : //
2611 : // There are a couple reasons we might wait to rotate the memtable and
2612 : // instead induce a write stall:
2613 : // 1. If too many memtables have queued, we wait for a flush to finish before
2614 : // creating another memtable.
2615 : // 2. If L0 read amplification has grown too high, we wait for compactions
2616 : // to reduce the read amplification before accepting more writes that will
2617 : // increase write pressure.
2618 : //
2619 : // maybeInduceWriteStall checks these stall conditions, and if present, waits
2620 : // for them to abate.
2621 1 : func (d *DB) maybeInduceWriteStall(b *Batch) {
2622 1 : stalled := false
2623 1 : // This function will call EventListener.WriteStallBegin at most once. If
2624 1 : // it does call it, it will call EventListener.WriteStallEnd once before
2625 1 : // returning.
2626 1 : for {
2627 1 : var size uint64
2628 1 : for i := range d.mu.mem.queue {
2629 1 : size += d.mu.mem.queue[i].totalBytes()
2630 1 : }
2631 1 : if size >= uint64(d.opts.MemTableStopWritesThreshold)*d.opts.MemTableSize &&
2632 1 : !d.mu.log.manager.ElevateWriteStallThresholdForFailover() {
2633 1 : // We have filled up the current memtable, but already queued memtables
2634 1 : // are still flushing, so we wait.
2635 1 : if !stalled {
2636 1 : stalled = true
2637 1 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2638 1 : Reason: "memtable count limit reached",
2639 1 : })
2640 1 : }
2641 1 : beforeWait := crtime.NowMono()
2642 1 : d.mu.compact.cond.Wait()
2643 1 : if b != nil {
2644 1 : b.commitStats.MemTableWriteStallDuration += beforeWait.Elapsed()
2645 1 : }
2646 1 : continue
2647 : }
2648 1 : l0ReadAmp := d.mu.versions.latest.l0Organizer.ReadAmplification()
2649 1 : if l0ReadAmp >= d.opts.L0StopWritesThreshold {
2650 1 : // There are too many level-0 files, so we wait.
2651 1 : if !stalled {
2652 1 : stalled = true
2653 1 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2654 1 : Reason: "L0 file count limit exceeded",
2655 1 : })
2656 1 : }
2657 1 : beforeWait := crtime.NowMono()
2658 1 : d.mu.compact.cond.Wait()
2659 1 : if b != nil {
2660 0 : b.commitStats.L0ReadAmpWriteStallDuration += beforeWait.Elapsed()
2661 0 : }
2662 1 : continue
2663 : }
2664 : // Not stalled.
2665 1 : if stalled {
2666 1 : d.opts.EventListener.WriteStallEnd()
2667 1 : }
2668 1 : return
2669 : }
2670 : }
2671 :
2672 : // makeRoomForWrite rotates the current mutable memtable, ensuring that the
2673 : // resulting mutable memtable has room to hold the contents of the provided
2674 : // Batch. The current memtable is rotated (marked as immutable) and a new
2675 : // mutable memtable is allocated. It reserves space in the new memtable and adds
2676 : // a reference to the memtable. The caller must later ensure that the memtable
2677 : // is unreferenced. This memtable rotation also causes a log rotation.
2678 : //
2679 : // If the current memtable is not full but the caller wishes to trigger a
2680 : // rotation regardless, the caller may pass a nil Batch, and no space in the
2681 : // resulting mutable memtable will be reserved.
2682 : //
2683 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2684 : // may be released and reacquired.
2685 1 : func (d *DB) makeRoomForWrite(b *Batch) error {
2686 1 : if b != nil && b.ingestedSSTBatch {
2687 0 : panic("pebble: invalid function call")
2688 : }
2689 1 : d.maybeInduceWriteStall(b)
2690 1 :
2691 1 : var newLogNum base.DiskFileNum
2692 1 : var prevLogSize uint64
2693 1 : if !d.opts.DisableWAL {
2694 1 : beforeRotate := crtime.NowMono()
2695 1 : newLogNum, prevLogSize = d.rotateWAL()
2696 1 : if b != nil {
2697 1 : b.commitStats.WALRotationDuration += beforeRotate.Elapsed()
2698 1 : }
2699 : }
2700 1 : immMem := d.mu.mem.mutable
2701 1 : imm := d.mu.mem.queue[len(d.mu.mem.queue)-1]
2702 1 : imm.logSize = prevLogSize
2703 1 :
2704 1 : var logSeqNum base.SeqNum
2705 1 : var minSize uint64
2706 1 : if b != nil {
2707 1 : logSeqNum = b.SeqNum()
2708 1 : if b.flushable != nil {
2709 1 : logSeqNum += base.SeqNum(b.Count())
2710 1 : // The batch is too large to fit in the memtable so add it directly to
2711 1 : // the immutable queue. The flushable batch is associated with the same
2712 1 : // log as the immutable memtable, but logically occurs after it in
2713 1 : // seqnum space. We ensure while flushing that the flushable batch
2714 1 : // is flushed along with the previous memtable in the flushable
2715 1 : // queue. See the top level comment in DB.flush1 to learn how this
2716 1 : // is ensured.
2717 1 : //
2718 1 : // See DB.commitWrite for the special handling of log writes for large
2719 1 : // batches. In particular, the large batch has already written to
2720 1 : // imm.logNum.
2721 1 : entry := d.newFlushableEntry(b.flushable, imm.logNum, b.SeqNum())
2722 1 : // The large batch is by definition large. Reserve space from the cache
2723 1 : // for it until it is flushed.
2724 1 : entry.releaseMemAccounting = d.opts.Cache.Reserve(int(b.flushable.totalBytes()))
2725 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2726 1 : } else {
2727 1 : minSize = b.memTableSize
2728 1 : }
2729 1 : } else {
2730 1 : // b == nil
2731 1 : //
2732 1 : // This is a manual forced flush.
2733 1 : logSeqNum = base.SeqNum(d.mu.versions.logSeqNum.Load())
2734 1 : imm.flushForced = true
2735 1 : // If we are manually flushing and we used less than half of the bytes in
2736 1 : // the memtable, don't increase the size for the next memtable. This
2737 1 : // reduces memtable memory pressure when an application is frequently
2738 1 : // manually flushing.
2739 1 : if uint64(immMem.availBytes()) > immMem.totalBytes()/2 {
2740 1 : d.mu.mem.nextSize = immMem.totalBytes()
2741 1 : }
2742 : }
2743 1 : d.rotateMemtable(newLogNum, logSeqNum, immMem, minSize)
2744 1 : if b != nil && b.flushable == nil {
2745 1 : err := d.mu.mem.mutable.prepare(b)
2746 1 : // Reserving enough space for the batch after rotation must never fail.
2747 1 : // We pass in a minSize that's equal to b.memtableSize to ensure that
2748 1 : // memtable rotation allocates a memtable sufficiently large. We also
2749 1 : // held d.commit.mu for the entirety of this function, ensuring that no
2750 1 : // other committers may have reserved memory in the new memtable yet.
2751 1 : if err == arenaskl.ErrArenaFull {
2752 0 : panic(errors.AssertionFailedf("memtable still full after rotation"))
2753 : }
2754 1 : return err
2755 : }
2756 1 : return nil
2757 : }
2758 :
2759 : // Both DB.mu and commitPipeline.mu must be held by the caller.
2760 : func (d *DB) rotateMemtable(
2761 : newLogNum base.DiskFileNum, logSeqNum base.SeqNum, prev *memTable, minSize uint64,
2762 1 : ) {
2763 1 : // Create a new memtable, scheduling the previous one for flushing. We do
2764 1 : // this even if the previous memtable was empty because the DB.Flush
2765 1 : // mechanism is dependent on being able to wait for the empty memtable to
2766 1 : // flush. We can't just mark the empty memtable as flushed here because we
2767 1 : // also have to wait for all previous immutable tables to
2768 1 : // flush. Additionally, the memtable is tied to particular WAL file and we
2769 1 : // want to go through the flush path in order to recycle that WAL file.
2770 1 : //
2771 1 : // NB: newLogNum corresponds to the WAL that contains mutations that are
2772 1 : // present in the new memtable. When immutable memtables are flushed to
2773 1 : // disk, a VersionEdit will be created telling the manifest the minimum
2774 1 : // unflushed log number (which will be the next one in d.mu.mem.mutable
2775 1 : // that was not flushed).
2776 1 : //
2777 1 : // NB: prev should be the current mutable memtable.
2778 1 : var entry *flushableEntry
2779 1 : d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum, minSize)
2780 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2781 1 : // d.logSize tracks the log size of the WAL file corresponding to the most
2782 1 : // recent flushable. The log size of the previous mutable memtable no longer
2783 1 : // applies to the current mutable memtable.
2784 1 : //
2785 1 : // It's tempting to perform this update in rotateWAL, but that would not be
2786 1 : // atomic with the enqueue of the new flushable. A call to DB.Metrics()
2787 1 : // could acquire DB.mu after the WAL has been rotated but before the new
2788 1 : // memtable has been appended; this would result in omitting the log size of
2789 1 : // the most recent flushable.
2790 1 : d.logSize.Store(0)
2791 1 : d.updateReadStateLocked(nil)
2792 1 : if prev.writerUnref() {
2793 1 : d.maybeScheduleFlush()
2794 1 : }
2795 : }
2796 :
2797 : // rotateWAL creates a new write-ahead log, possibly recycling a previous WAL's
2798 : // files. It returns the file number assigned to the new WAL, and the size of
2799 : // the previous WAL file.
2800 : //
2801 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2802 : // may be released and reacquired.
2803 1 : func (d *DB) rotateWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
2804 1 : if d.opts.DisableWAL {
2805 0 : panic("pebble: invalid function call")
2806 : }
2807 1 : jobID := d.newJobIDLocked()
2808 1 : newLogNum = d.mu.versions.getNextDiskFileNum()
2809 1 :
2810 1 : d.mu.Unlock()
2811 1 : // Close the previous log first. This writes an EOF trailer
2812 1 : // signifying the end of the file and syncs it to disk. We must
2813 1 : // close the previous log before linking the new log file,
2814 1 : // otherwise a crash could leave both logs with unclean tails, and
2815 1 : // Open will treat the previous log as corrupt.
2816 1 : offset, err := d.mu.log.writer.Close()
2817 1 : if err != nil {
2818 0 : // What to do here? Stumbling on doesn't seem worthwhile. If we failed to
2819 0 : // close the previous log it is possible we lost a write.
2820 0 : panic(err)
2821 : }
2822 1 : prevLogSize = uint64(offset)
2823 1 : metrics := d.mu.log.writer.Metrics()
2824 1 :
2825 1 : d.mu.Lock()
2826 1 : if err := d.mu.log.metrics.LogWriterMetrics.Merge(&metrics); err != nil {
2827 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2828 0 : }
2829 :
2830 1 : d.mu.Unlock()
2831 1 : writer, err := d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
2832 1 : if err != nil {
2833 0 : panic(err)
2834 : }
2835 :
2836 1 : d.mu.Lock()
2837 1 : d.mu.log.writer = writer
2838 1 : return newLogNum, prevLogSize
2839 : }
2840 :
2841 1 : func (d *DB) getEarliestUnflushedSeqNumLocked() base.SeqNum {
2842 1 : seqNum := base.SeqNumMax
2843 1 : for i := range d.mu.mem.queue {
2844 1 : logSeqNum := d.mu.mem.queue[i].logSeqNum
2845 1 : if seqNum > logSeqNum {
2846 1 : seqNum = logSeqNum
2847 1 : }
2848 : }
2849 1 : return seqNum
2850 : }
2851 :
2852 1 : func (d *DB) getInProgressCompactionInfoLocked(finishing compaction) (rv []compactionInfo) {
2853 1 : for c := range d.mu.compact.inProgress {
2854 1 : if !c.IsFlush() && (finishing == nil || c != finishing) {
2855 1 : rv = append(rv, c.Info())
2856 1 : }
2857 : }
2858 1 : return
2859 : }
2860 :
2861 1 : func inProgressL0Compactions(inProgress []compactionInfo) []manifest.L0Compaction {
2862 1 : var compactions []manifest.L0Compaction
2863 1 : for _, info := range inProgress {
2864 1 : // Skip in-progress compactions that have already committed; the L0
2865 1 : // sublevels initialization code requires the set of in-progress
2866 1 : // compactions to be consistent with the current version. Compactions
2867 1 : // with versionEditApplied=true are already applied to the current
2868 1 : // version and but are performing cleanup without the database mutex.
2869 1 : if info.versionEditApplied {
2870 1 : continue
2871 : }
2872 1 : l0 := false
2873 1 : for _, cl := range info.inputs {
2874 1 : l0 = l0 || cl.level == 0
2875 1 : }
2876 1 : if !l0 {
2877 1 : continue
2878 : }
2879 1 : compactions = append(compactions, manifest.L0Compaction{
2880 1 : Bounds: *info.bounds,
2881 1 : IsIntraL0: info.outputLevel == 0,
2882 1 : })
2883 : }
2884 1 : return compactions
2885 : }
2886 :
2887 : // firstError returns the first non-nil error of err0 and err1, or nil if both
2888 : // are nil.
2889 1 : func firstError(err0, err1 error) error {
2890 1 : if err0 != nil {
2891 1 : return err0
2892 1 : }
2893 1 : return err1
2894 : }
2895 :
2896 : // SetCreatorID sets the CreatorID which is needed in order to use shared objects.
2897 : // Remote object usage is disabled until this method is called the first time.
2898 : // Once set, the Creator ID is persisted and cannot change.
2899 : //
2900 : // Does nothing if SharedStorage was not set in the options when the DB was
2901 : // opened or if the DB is in read-only mode.
2902 0 : func (d *DB) SetCreatorID(creatorID uint64) error {
2903 0 : if d.opts.Experimental.RemoteStorage == nil || d.opts.ReadOnly {
2904 0 : return nil
2905 0 : }
2906 0 : return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
2907 : }
2908 :
2909 : // KeyStatistics keeps track of the number of keys that have been pinned by a
2910 : // snapshot as well as counts of the different key kinds in the lsm.
2911 : //
2912 : // One way of using the accumulated stats, when we only have sets and dels,
2913 : // and say the counts are represented as del_count, set_count,
2914 : // del_latest_count, set_latest_count, snapshot_pinned_count.
2915 : //
2916 : // - del_latest_count + set_latest_count is the set of unique user keys
2917 : // (unique).
2918 : //
2919 : // - set_latest_count is the set of live unique user keys (live_unique).
2920 : //
2921 : // - Garbage is del_count + set_count - live_unique.
2922 : //
2923 : // - If everything were in the LSM, del_count+set_count-snapshot_pinned_count
2924 : // would also be the set of unique user keys (note that
2925 : // snapshot_pinned_count is counting something different -- see comment below).
2926 : // But snapshot_pinned_count only counts keys in the LSM so the excess here
2927 : // must be keys in memtables.
2928 : type KeyStatistics struct {
2929 : // TODO(sumeer): the SnapshotPinned* are incorrect in that these older
2930 : // versions can be in a different level. Either fix the accounting or
2931 : // rename these fields.
2932 :
2933 : // SnapshotPinnedKeys represents obsolete keys that cannot be elided during
2934 : // a compaction, because they are required by an open snapshot.
2935 : SnapshotPinnedKeys int
2936 : // SnapshotPinnedKeysBytes is the total number of bytes of all snapshot
2937 : // pinned keys.
2938 : SnapshotPinnedKeysBytes uint64
2939 : // KindsCount is the count for each kind of key. It includes point keys,
2940 : // range deletes and range keys.
2941 : KindsCount [InternalKeyKindMax + 1]int
2942 : // LatestKindsCount is the count for each kind of key when it is the latest
2943 : // kind for a user key. It is only populated for point keys.
2944 : LatestKindsCount [InternalKeyKindMax + 1]int
2945 : }
2946 :
2947 : // LSMKeyStatistics is used by DB.ScanStatistics.
2948 : type LSMKeyStatistics struct {
2949 : Accumulated KeyStatistics
2950 : // Levels contains statistics only for point keys. Range deletions and range keys will
2951 : // appear in Accumulated but not Levels.
2952 : Levels [numLevels]KeyStatistics
2953 : // BytesRead represents the logical, pre-compression size of keys and values read
2954 : BytesRead uint64
2955 : }
2956 :
2957 : // ScanStatisticsOptions is used by DB.ScanStatistics.
2958 : type ScanStatisticsOptions struct {
2959 : // LimitBytesPerSecond indicates the number of bytes that are able to be read
2960 : // per second using ScanInternal.
2961 : // A value of 0 indicates that there is no limit set.
2962 : LimitBytesPerSecond int64
2963 : }
2964 :
2965 : // ScanStatistics returns the count of different key kinds within the lsm for a
2966 : // key span [lower, upper) as well as the number of snapshot keys.
2967 : func (d *DB) ScanStatistics(
2968 : ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions,
2969 0 : ) (LSMKeyStatistics, error) {
2970 0 : stats := LSMKeyStatistics{}
2971 0 : var prevKey InternalKey
2972 0 : var rateLimitFunc func(key *InternalKey, val LazyValue) error
2973 0 : tb := tokenbucket.TokenBucket{}
2974 0 :
2975 0 : if opts.LimitBytesPerSecond != 0 {
2976 0 : const minBytesPerSec = 100 * 1024
2977 0 : if opts.LimitBytesPerSecond < minBytesPerSec {
2978 0 : return stats, errors.Newf("pebble: ScanStatistics read bandwidth limit %d is below minimum %d", opts.LimitBytesPerSecond, minBytesPerSec)
2979 0 : }
2980 : // Each "token" roughly corresponds to a byte that was read.
2981 0 : tb.Init(tokenbucket.TokensPerSecond(opts.LimitBytesPerSecond), tokenbucket.Tokens(1024))
2982 0 : rateLimitFunc = func(key *InternalKey, val LazyValue) error {
2983 0 : return tb.WaitCtx(ctx, tokenbucket.Tokens(key.Size()+val.Len()))
2984 0 : }
2985 : }
2986 :
2987 0 : scanInternalOpts := &scanInternalOptions{
2988 0 : visitPointKey: func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
2989 0 : // If the previous key is equal to the current point key, the current key was
2990 0 : // pinned by a snapshot.
2991 0 : size := uint64(key.Size())
2992 0 : kind := key.Kind()
2993 0 : sameKey := d.equal(prevKey.UserKey, key.UserKey)
2994 0 : if iterInfo.Kind == IteratorLevelLSM && sameKey {
2995 0 : stats.Levels[iterInfo.Level].SnapshotPinnedKeys++
2996 0 : stats.Levels[iterInfo.Level].SnapshotPinnedKeysBytes += size
2997 0 : stats.Accumulated.SnapshotPinnedKeys++
2998 0 : stats.Accumulated.SnapshotPinnedKeysBytes += size
2999 0 : }
3000 0 : if iterInfo.Kind == IteratorLevelLSM {
3001 0 : stats.Levels[iterInfo.Level].KindsCount[kind]++
3002 0 : }
3003 0 : if !sameKey {
3004 0 : if iterInfo.Kind == IteratorLevelLSM {
3005 0 : stats.Levels[iterInfo.Level].LatestKindsCount[kind]++
3006 0 : }
3007 0 : stats.Accumulated.LatestKindsCount[kind]++
3008 : }
3009 :
3010 0 : stats.Accumulated.KindsCount[kind]++
3011 0 : prevKey.CopyFrom(*key)
3012 0 : stats.BytesRead += uint64(key.Size() + value.Len())
3013 0 : return nil
3014 : },
3015 0 : visitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
3016 0 : stats.Accumulated.KindsCount[InternalKeyKindRangeDelete]++
3017 0 : stats.BytesRead += uint64(len(start) + len(end))
3018 0 : return nil
3019 0 : },
3020 0 : visitRangeKey: func(start, end []byte, keys []rangekey.Key) error {
3021 0 : stats.BytesRead += uint64(len(start) + len(end))
3022 0 : for _, key := range keys {
3023 0 : stats.Accumulated.KindsCount[key.Kind()]++
3024 0 : stats.BytesRead += uint64(len(key.Value) + len(key.Suffix))
3025 0 : }
3026 0 : return nil
3027 : },
3028 : includeObsoleteKeys: true,
3029 : IterOptions: IterOptions{
3030 : KeyTypes: IterKeyTypePointsAndRanges,
3031 : LowerBound: lower,
3032 : UpperBound: upper,
3033 : },
3034 : rateLimitFunc: rateLimitFunc,
3035 : }
3036 0 : iter, err := d.newInternalIter(ctx, snapshotIterOpts{}, scanInternalOpts)
3037 0 : if err != nil {
3038 0 : return LSMKeyStatistics{}, err
3039 0 : }
3040 0 : defer iter.close()
3041 0 :
3042 0 : err = scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
3043 0 :
3044 0 : if err != nil {
3045 0 : return LSMKeyStatistics{}, err
3046 0 : }
3047 :
3048 0 : return stats, nil
3049 : }
3050 :
3051 : // ObjProvider returns the objstorage.Provider for this database. Meant to be
3052 : // used for internal purposes only.
3053 1 : func (d *DB) ObjProvider() objstorage.Provider {
3054 1 : return d.objProvider
3055 1 : }
3056 :
3057 0 : func (d *DB) checkVirtualBounds(m *manifest.TableMetadata) {
3058 0 : if !invariants.Enabled {
3059 0 : return
3060 0 : }
3061 :
3062 0 : objMeta, err := d.objProvider.Lookup(base.FileTypeTable, m.TableBacking.DiskFileNum)
3063 0 : if err != nil {
3064 0 : panic(err)
3065 : }
3066 0 : if objMeta.IsExternal() {
3067 0 : // Nothing to do; bounds are expected to be loose.
3068 0 : return
3069 0 : }
3070 :
3071 0 : iters, err := d.newIters(context.TODO(), m, nil, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
3072 0 : if err != nil {
3073 0 : panic(errors.Wrap(err, "pebble: error creating iterators"))
3074 : }
3075 0 : defer func() { _ = iters.CloseAll() }()
3076 :
3077 0 : if m.HasPointKeys {
3078 0 : pointIter := iters.Point()
3079 0 : rangeDelIter := iters.RangeDeletion()
3080 0 :
3081 0 : // Check that the lower bound is tight.
3082 0 : pointKV := pointIter.First()
3083 0 : rangeDel, err := rangeDelIter.First()
3084 0 : if err != nil {
3085 0 : panic(err)
3086 : }
3087 0 : if (rangeDel == nil || d.cmp(rangeDel.SmallestKey().UserKey, m.PointKeyBounds.Smallest().UserKey) != 0) &&
3088 0 : (pointKV == nil || d.cmp(pointKV.K.UserKey, m.PointKeyBounds.Smallest().UserKey) != 0) {
3089 0 : panic(errors.Newf("pebble: virtual sstable %s lower point key bound is not tight", m.TableNum))
3090 : }
3091 :
3092 : // Check that the upper bound is tight.
3093 0 : pointKV = pointIter.Last()
3094 0 : rangeDel, err = rangeDelIter.Last()
3095 0 : if err != nil {
3096 0 : panic(err)
3097 : }
3098 0 : if (rangeDel == nil || d.cmp(rangeDel.LargestKey().UserKey, m.PointKeyBounds.LargestUserKey()) != 0) &&
3099 0 : (pointKV == nil || d.cmp(pointKV.K.UserKey, m.PointKeyBounds.Largest().UserKey) != 0) {
3100 0 : panic(errors.Newf("pebble: virtual sstable %s upper point key bound is not tight", m.TableNum))
3101 : }
3102 :
3103 : // Check that iterator keys are within bounds.
3104 0 : for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
3105 0 : if d.cmp(kv.K.UserKey, m.PointKeyBounds.Smallest().UserKey) < 0 || d.cmp(kv.K.UserKey, m.PointKeyBounds.LargestUserKey()) > 0 {
3106 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.TableNum, kv.K.UserKey))
3107 : }
3108 : }
3109 0 : s, err := rangeDelIter.First()
3110 0 : for ; s != nil; s, err = rangeDelIter.Next() {
3111 0 : if d.cmp(s.SmallestKey().UserKey, m.PointKeyBounds.Smallest().UserKey) < 0 {
3112 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.TableNum, s.SmallestKey().UserKey))
3113 : }
3114 0 : if d.cmp(s.LargestKey().UserKey, m.PointKeyBounds.Largest().UserKey) > 0 {
3115 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.TableNum, s.LargestKey().UserKey))
3116 : }
3117 : }
3118 0 : if err != nil {
3119 0 : panic(err)
3120 : }
3121 : }
3122 :
3123 0 : if !m.HasRangeKeys {
3124 0 : return
3125 0 : }
3126 0 : rangeKeyIter := iters.RangeKey()
3127 0 :
3128 0 : // Check that the lower bound is tight.
3129 0 : if s, err := rangeKeyIter.First(); err != nil {
3130 0 : panic(err)
3131 0 : } else if m.HasRangeKeys && d.cmp(s.SmallestKey().UserKey, m.RangeKeyBounds.SmallestUserKey()) != 0 {
3132 0 : panic(errors.Newf("pebble: virtual sstable %s lower range key bound is not tight", m.TableNum))
3133 : }
3134 :
3135 : // Check that upper bound is tight.
3136 0 : if s, err := rangeKeyIter.Last(); err != nil {
3137 0 : panic(err)
3138 0 : } else if d.cmp(s.LargestKey().UserKey, m.RangeKeyBounds.LargestUserKey()) != 0 {
3139 0 : panic(errors.Newf("pebble: virtual sstable %s upper range key bound is not tight", m.TableNum))
3140 : }
3141 :
3142 0 : s, err := rangeKeyIter.First()
3143 0 : for ; s != nil; s, err = rangeKeyIter.Next() {
3144 0 : if d.cmp(s.SmallestKey().UserKey, m.RangeKeyBounds.SmallestUserKey()) < 0 {
3145 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.TableNum, s.SmallestKey().UserKey))
3146 : }
3147 0 : if d.cmp(s.LargestKey().UserKey, m.RangeKeyBounds.LargestUserKey()) > 0 {
3148 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.TableNum, s.LargestKey().UserKey))
3149 : }
3150 : }
3151 0 : if err != nil {
3152 0 : panic(err)
3153 : }
3154 : }
3155 :
3156 : // DebugString returns a debugging string describing the LSM.
3157 0 : func (d *DB) DebugString() string {
3158 0 : return d.DebugCurrentVersion().DebugString()
3159 0 : }
3160 :
3161 : // DebugCurrentVersion returns the current LSM tree metadata. Should only be
3162 : // used for testing/debugging.
3163 0 : func (d *DB) DebugCurrentVersion() *manifest.Version {
3164 0 : d.mu.Lock()
3165 0 : defer d.mu.Unlock()
3166 0 : return d.mu.versions.currentVersion()
3167 0 : }
|