Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package pebble provides an ordered key/value store.
6 : package pebble // import "github.com/cockroachdb/pebble"
7 :
8 : import (
9 : "context"
10 : "fmt"
11 : "io"
12 : "sync"
13 : "sync/atomic"
14 : "time"
15 : "unsafe"
16 :
17 : "github.com/cockroachdb/crlib/crtime"
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/arenaskl"
20 : "github.com/cockroachdb/pebble/internal/base"
21 : "github.com/cockroachdb/pebble/internal/cache"
22 : "github.com/cockroachdb/pebble/internal/invalidating"
23 : "github.com/cockroachdb/pebble/internal/invariants"
24 : "github.com/cockroachdb/pebble/internal/keyspan"
25 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
26 : "github.com/cockroachdb/pebble/internal/manifest"
27 : "github.com/cockroachdb/pebble/internal/manual"
28 : "github.com/cockroachdb/pebble/objstorage"
29 : "github.com/cockroachdb/pebble/objstorage/remote"
30 : "github.com/cockroachdb/pebble/rangekey"
31 : "github.com/cockroachdb/pebble/record"
32 : "github.com/cockroachdb/pebble/sstable"
33 : "github.com/cockroachdb/pebble/vfs"
34 : "github.com/cockroachdb/pebble/vfs/atomicfs"
35 : "github.com/cockroachdb/pebble/wal"
36 : "github.com/cockroachdb/tokenbucket"
37 : "github.com/prometheus/client_golang/prometheus"
38 : )
39 :
40 : const (
41 : // minTableCacheSize is the minimum size of the table cache, for a single db.
42 : minTableCacheSize = 64
43 :
44 : // numNonTableCacheFiles is an approximation for the number of files
45 : // that we don't use for table caches, for a given db.
46 : numNonTableCacheFiles = 10
47 : )
48 :
49 : var (
50 : // ErrNotFound is returned when a get operation does not find the requested
51 : // key.
52 : ErrNotFound = base.ErrNotFound
53 : // ErrClosed is panicked when an operation is performed on a closed snapshot or
54 : // DB. Use errors.Is(err, ErrClosed) to check for this error.
55 : ErrClosed = errors.New("pebble: closed")
56 : // ErrReadOnly is returned when a write operation is performed on a read-only
57 : // database.
58 : ErrReadOnly = errors.New("pebble: read-only")
59 : // errNoSplit indicates that the user is trying to perform a range key
60 : // operation but the configured Comparer does not provide a Split
61 : // implementation.
62 : errNoSplit = errors.New("pebble: Comparer.Split required for range key operations")
63 : )
64 :
65 : // Reader is a readable key/value store.
66 : //
67 : // It is safe to call Get and NewIter from concurrent goroutines.
68 : type Reader interface {
69 : // Get gets the value for the given key. It returns ErrNotFound if the DB
70 : // does not contain the key.
71 : //
72 : // The caller should not modify the contents of the returned slice, but it is
73 : // safe to modify the contents of the argument after Get returns. The
74 : // returned slice will remain valid until the returned Closer is closed. On
75 : // success, the caller MUST call closer.Close() or a memory leak will occur.
76 : Get(key []byte) (value []byte, closer io.Closer, err error)
77 :
78 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
79 : // return false). The iterator can be positioned via a call to SeekGE,
80 : // SeekLT, First or Last.
81 : NewIter(o *IterOptions) (*Iterator, error)
82 :
83 : // NewIterWithContext is like NewIter, and additionally accepts a context
84 : // for tracing.
85 : NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error)
86 :
87 : // Close closes the Reader. It may or may not close any underlying io.Reader
88 : // or io.Writer, depending on how the DB was created.
89 : //
90 : // It is not safe to close a DB until all outstanding iterators are closed.
91 : // It is valid to call Close multiple times. Other methods should not be
92 : // called after the DB has been closed.
93 : Close() error
94 : }
95 :
96 : // Writer is a writable key/value store.
97 : //
98 : // Goroutine safety is dependent on the specific implementation.
99 : type Writer interface {
100 : // Apply the operations contained in the batch to the DB.
101 : //
102 : // It is safe to modify the contents of the arguments after Apply returns.
103 : Apply(batch *Batch, o *WriteOptions) error
104 :
105 : // Delete deletes the value for the given key. Deletes are blind all will
106 : // succeed even if the given key does not exist.
107 : //
108 : // It is safe to modify the contents of the arguments after Delete returns.
109 : Delete(key []byte, o *WriteOptions) error
110 :
111 : // DeleteSized behaves identically to Delete, but takes an additional
112 : // argument indicating the size of the value being deleted. DeleteSized
113 : // should be preferred when the caller has the expectation that there exists
114 : // a single internal KV pair for the key (eg, the key has not been
115 : // overwritten recently), and the caller knows the size of its value.
116 : //
117 : // DeleteSized will record the value size within the tombstone and use it to
118 : // inform compaction-picking heuristics which strive to reduce space
119 : // amplification in the LSM. This "calling your shot" mechanic allows the
120 : // storage engine to more accurately estimate and reduce space
121 : // amplification.
122 : //
123 : // It is safe to modify the contents of the arguments after DeleteSized
124 : // returns.
125 : DeleteSized(key []byte, valueSize uint32, _ *WriteOptions) error
126 :
127 : // SingleDelete is similar to Delete in that it deletes the value for the given key. Like Delete,
128 : // it is a blind operation that will succeed even if the given key does not exist.
129 : //
130 : // WARNING: Undefined (non-deterministic) behavior will result if a key is overwritten and
131 : // then deleted using SingleDelete. The record may appear deleted immediately, but be
132 : // resurrected at a later time after compactions have been performed. Or the record may
133 : // be deleted permanently. A Delete operation lays down a "tombstone" which shadows all
134 : // previous versions of a key. The SingleDelete operation is akin to "anti-matter" and will
135 : // only delete the most recently written version for a key. These different semantics allow
136 : // the DB to avoid propagating a SingleDelete operation during a compaction as soon as the
137 : // corresponding Set operation is encountered. These semantics require extreme care to handle
138 : // properly. Only use if you have a workload where the performance gain is critical and you
139 : // can guarantee that a record is written once and then deleted once.
140 : //
141 : // SingleDelete is internally transformed into a Delete if the most recent record for a key is either
142 : // a Merge or Delete record.
143 : //
144 : // It is safe to modify the contents of the arguments after SingleDelete returns.
145 : SingleDelete(key []byte, o *WriteOptions) error
146 :
147 : // DeleteRange deletes all of the point keys (and values) in the range
148 : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
149 : // delete overlapping range keys (eg, keys set via RangeKeySet).
150 : //
151 : // It is safe to modify the contents of the arguments after DeleteRange
152 : // returns.
153 : DeleteRange(start, end []byte, o *WriteOptions) error
154 :
155 : // LogData adds the specified to the batch. The data will be written to the
156 : // WAL, but not added to memtables or sstables. Log data is never indexed,
157 : // which makes it useful for testing WAL performance.
158 : //
159 : // It is safe to modify the contents of the argument after LogData returns.
160 : LogData(data []byte, opts *WriteOptions) error
161 :
162 : // Merge merges the value for the given key. The details of the merge are
163 : // dependent upon the configured merge operation.
164 : //
165 : // It is safe to modify the contents of the arguments after Merge returns.
166 : Merge(key, value []byte, o *WriteOptions) error
167 :
168 : // Set sets the value for the given key. It overwrites any previous value
169 : // for that key; a DB is not a multi-map.
170 : //
171 : // It is safe to modify the contents of the arguments after Set returns.
172 : Set(key, value []byte, o *WriteOptions) error
173 :
174 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
175 : // timestamp suffix to value. The suffix is optional. If any portion of the key
176 : // range [start, end) is already set by a range key with the same suffix value,
177 : // RangeKeySet overrides it.
178 : //
179 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
180 : RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error
181 :
182 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
183 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
184 : // range key. RangeKeyUnset only removes portions of range keys that fall within
185 : // the [start, end) key span, and only range keys with suffixes that exactly
186 : // match the unset suffix.
187 : //
188 : // It is safe to modify the contents of the arguments after RangeKeyUnset
189 : // returns.
190 : RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error
191 :
192 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
193 : // (inclusive on start, exclusive on end). It does not delete point keys (for
194 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
195 : // bounds, including those with or without suffixes.
196 : //
197 : // It is safe to modify the contents of the arguments after RangeKeyDelete
198 : // returns.
199 : RangeKeyDelete(start, end []byte, opts *WriteOptions) error
200 : }
201 :
202 : // CPUWorkHandle represents a handle used by the CPUWorkPermissionGranter API.
203 : type CPUWorkHandle interface {
204 : // Permitted indicates whether Pebble can use additional CPU resources.
205 : Permitted() bool
206 : }
207 :
208 : // CPUWorkPermissionGranter is used to request permission to opportunistically
209 : // use additional CPUs to speed up internal background work.
210 : type CPUWorkPermissionGranter interface {
211 : // GetPermission returns a handle regardless of whether permission is granted
212 : // or not. In the latter case, the handle is only useful for recording
213 : // the CPU time actually spent on this calling goroutine.
214 : GetPermission(time.Duration) CPUWorkHandle
215 : // CPUWorkDone must be called regardless of whether CPUWorkHandle.Permitted
216 : // returns true or false.
217 : CPUWorkDone(CPUWorkHandle)
218 : }
219 :
220 : // Use a default implementation for the CPU work granter to avoid excessive nil
221 : // checks in the code.
222 : type defaultCPUWorkHandle struct{}
223 :
224 2 : func (d defaultCPUWorkHandle) Permitted() bool {
225 2 : return false
226 2 : }
227 :
228 : type defaultCPUWorkGranter struct{}
229 :
230 2 : func (d defaultCPUWorkGranter) GetPermission(_ time.Duration) CPUWorkHandle {
231 2 : return defaultCPUWorkHandle{}
232 2 : }
233 :
234 2 : func (d defaultCPUWorkGranter) CPUWorkDone(_ CPUWorkHandle) {}
235 :
236 : // DB provides a concurrent, persistent ordered key/value store.
237 : //
238 : // A DB's basic operations (Get, Set, Delete) should be self-explanatory. Get
239 : // and Delete will return ErrNotFound if the requested key is not in the store.
240 : // Callers are free to ignore this error.
241 : //
242 : // A DB also allows for iterating over the key/value pairs in key order. If d
243 : // is a DB, the code below prints all key/value pairs whose keys are 'greater
244 : // than or equal to' k:
245 : //
246 : // iter := d.NewIter(readOptions)
247 : // for iter.SeekGE(k); iter.Valid(); iter.Next() {
248 : // fmt.Printf("key=%q value=%q\n", iter.Key(), iter.Value())
249 : // }
250 : // return iter.Close()
251 : //
252 : // The Options struct holds the optional parameters for the DB, including a
253 : // Comparer to define a 'less than' relationship over keys. It is always valid
254 : // to pass a nil *Options, which means to use the default parameter values. Any
255 : // zero field of a non-nil *Options also means to use the default value for
256 : // that parameter. Thus, the code below uses a custom Comparer, but the default
257 : // values for every other parameter:
258 : //
259 : // db := pebble.Open(&Options{
260 : // Comparer: myComparer,
261 : // })
262 : type DB struct {
263 : // The count and size of referenced memtables. This includes memtables
264 : // present in DB.mu.mem.queue, as well as memtables that have been flushed
265 : // but are still referenced by an inuse readState, as well as up to one
266 : // memTable waiting to be reused and stored in d.memTableRecycle.
267 : memTableCount atomic.Int64
268 : memTableReserved atomic.Int64 // number of bytes reserved in the cache for memtables
269 : // memTableRecycle holds a pointer to an obsolete memtable. The next
270 : // memtable allocation will reuse this memtable if it has not already been
271 : // recycled.
272 : memTableRecycle atomic.Pointer[memTable]
273 :
274 : // The logical size of the current WAL.
275 : logSize atomic.Uint64
276 : // The number of input bytes to the log. This is the raw size of the
277 : // batches written to the WAL, without the overhead of the record
278 : // envelopes.
279 : logBytesIn atomic.Uint64
280 :
281 : // The number of bytes available on disk.
282 : diskAvailBytes atomic.Uint64
283 :
284 : cacheID cache.ID
285 : dirname string
286 : opts *Options
287 : cmp Compare
288 : equal Equal
289 : merge Merge
290 : split Split
291 : abbreviatedKey AbbreviatedKey
292 : // The threshold for determining when a batch is "large" and will skip being
293 : // inserted into a memtable.
294 : largeBatchThreshold uint64
295 : // The current OPTIONS file number.
296 : optionsFileNum base.DiskFileNum
297 : // The on-disk size of the current OPTIONS file.
298 : optionsFileSize uint64
299 :
300 : // objProvider is used to access and manage SSTs.
301 : objProvider objstorage.Provider
302 :
303 : fileLock *Lock
304 : dataDir vfs.File
305 :
306 : tableCache *tableCacheContainer
307 : newIters tableNewIters
308 : tableNewRangeKeyIter keyspanimpl.TableNewSpanIter
309 :
310 : commit *commitPipeline
311 :
312 : // readState provides access to the state needed for reading without needing
313 : // to acquire DB.mu.
314 : readState struct {
315 : sync.RWMutex
316 : val *readState
317 : }
318 :
319 : closed *atomic.Value
320 : closedCh chan struct{}
321 :
322 : cleanupManager *cleanupManager
323 :
324 : // During an iterator close, we may asynchronously schedule read compactions.
325 : // We want to wait for those goroutines to finish, before closing the DB.
326 : // compactionShedulers.Wait() should not be called while the DB.mu is held.
327 : compactionSchedulers sync.WaitGroup
328 :
329 : // The main mutex protecting internal DB state. This mutex encompasses many
330 : // fields because those fields need to be accessed and updated atomically. In
331 : // particular, the current version, log.*, mem.*, and snapshot list need to
332 : // be accessed and updated atomically during compaction.
333 : //
334 : // Care is taken to avoid holding DB.mu during IO operations. Accomplishing
335 : // this sometimes requires releasing DB.mu in a method that was called with
336 : // it held. See versionSet.logAndApply() and DB.makeRoomForWrite() for
337 : // examples. This is a common pattern, so be careful about expectations that
338 : // DB.mu will be held continuously across a set of calls.
339 : mu struct {
340 : sync.Mutex
341 :
342 : formatVers struct {
343 : // vers is the database's current format major version.
344 : // Backwards-incompatible features are gated behind new
345 : // format major versions and not enabled until a database's
346 : // version is ratcheted upwards.
347 : //
348 : // Although this is under the `mu` prefix, readers may read vers
349 : // atomically without holding d.mu. Writers must only write to this
350 : // value through finalizeFormatVersUpgrade which requires d.mu is
351 : // held.
352 : vers atomic.Uint64
353 : // marker is the atomic marker for the format major version.
354 : // When a database's version is ratcheted upwards, the
355 : // marker is moved in order to atomically record the new
356 : // version.
357 : marker *atomicfs.Marker
358 : // ratcheting when set to true indicates that the database is
359 : // currently in the process of ratcheting the format major version
360 : // to vers + 1. As a part of ratcheting the format major version,
361 : // migrations may drop and re-acquire the mutex.
362 : ratcheting bool
363 : }
364 :
365 : // The ID of the next job. Job IDs are passed to event listener
366 : // notifications and act as a mechanism for tying together the events and
367 : // log messages for a single job such as a flush, compaction, or file
368 : // ingestion. Job IDs are not serialized to disk or used for correctness.
369 : nextJobID JobID
370 :
371 : // The collection of immutable versions and state about the log and visible
372 : // sequence numbers. Use the pointer here to ensure the atomic fields in
373 : // version set are aligned properly.
374 : versions *versionSet
375 :
376 : log struct {
377 : // manager is not protected by mu, but calls to Create must be
378 : // serialized, and happen after the previous writer is closed.
379 : manager wal.Manager
380 : // The Writer is protected by commitPipeline.mu. This allows log writes
381 : // to be performed without holding DB.mu, but requires both
382 : // commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
383 : // (i.e. makeRoomForWrite). Can be nil.
384 : writer wal.Writer
385 : metrics struct {
386 : // fsyncLatency has its own internal synchronization, and is not
387 : // protected by mu.
388 : fsyncLatency prometheus.Histogram
389 : // Updated whenever a wal.Writer is closed.
390 : record.LogWriterMetrics
391 : }
392 : }
393 :
394 : mem struct {
395 : // The current mutable memTable. Readers of the pointer may hold
396 : // either DB.mu or commitPipeline.mu.
397 : //
398 : // Its internal fields are protected by commitPipeline.mu. This
399 : // allows batch commits to be performed without DB.mu as long as no
400 : // memtable rotation is required.
401 : //
402 : // Both commitPipeline.mu and DB.mu must be held when rotating the
403 : // memtable.
404 : mutable *memTable
405 : // Queue of flushables (the mutable memtable is at end). Elements are
406 : // added to the end of the slice and removed from the beginning. Once an
407 : // index is set it is never modified making a fixed slice immutable and
408 : // safe for concurrent reads.
409 : queue flushableList
410 : // nextSize is the size of the next memtable. The memtable size starts at
411 : // min(256KB,Options.MemTableSize) and doubles each time a new memtable
412 : // is allocated up to Options.MemTableSize. This reduces the memory
413 : // footprint of memtables when lots of DB instances are used concurrently
414 : // in test environments.
415 : nextSize uint64
416 : }
417 :
418 : compact struct {
419 : // Condition variable used to signal when a flush or compaction has
420 : // completed. Used by the write-stall mechanism to wait for the stall
421 : // condition to clear. See DB.makeRoomForWrite().
422 : cond sync.Cond
423 : // True when a flush is in progress.
424 : flushing bool
425 : // The number of ongoing non-download compactions.
426 : compactingCount int
427 : // The number of download compactions.
428 : downloadingCount int
429 : // The list of deletion hints, suggesting ranges for delete-only
430 : // compactions.
431 : deletionHints []deleteCompactionHint
432 : // The list of manual compactions. The next manual compaction to perform
433 : // is at the start of the list. New entries are added to the end.
434 : manual []*manualCompaction
435 : // downloads is the list of pending download tasks. The next download to
436 : // perform is at the start of the list. New entries are added to the end.
437 : downloads []*downloadSpanTask
438 : // inProgress is the set of in-progress flushes and compactions.
439 : // It's used in the calculation of some metrics and to initialize L0
440 : // sublevels' state. Some of the compactions contained within this
441 : // map may have already committed an edit to the version but are
442 : // lingering performing cleanup, like deleting obsolete files.
443 : inProgress map[*compaction]struct{}
444 :
445 : // rescheduleReadCompaction indicates to an iterator that a read compaction
446 : // should be scheduled.
447 : rescheduleReadCompaction bool
448 :
449 : // readCompactions is a readCompactionQueue which keeps track of the
450 : // compactions which we might have to perform.
451 : readCompactions readCompactionQueue
452 :
453 : // The cumulative duration of all completed compactions since Open.
454 : // Does not include flushes.
455 : duration time.Duration
456 : // Flush throughput metric.
457 : flushWriteThroughput ThroughputMetric
458 : // The idle start time for the flush "loop", i.e., when the flushing
459 : // bool above transitions to false.
460 : noOngoingFlushStartTime crtime.Mono
461 : }
462 :
463 : // Non-zero when file cleaning is disabled. The disabled count acts as a
464 : // reference count to prohibit file cleaning. See
465 : // DB.{disable,Enable}FileDeletions().
466 : disableFileDeletions int
467 :
468 : snapshots struct {
469 : // The list of active snapshots.
470 : snapshotList
471 :
472 : // The cumulative count and size of snapshot-pinned keys written to
473 : // sstables.
474 : cumulativePinnedCount uint64
475 : cumulativePinnedSize uint64
476 : }
477 :
478 : tableStats struct {
479 : // Condition variable used to signal the completion of a
480 : // job to collect table stats.
481 : cond sync.Cond
482 : // True when a stat collection operation is in progress.
483 : loading bool
484 : // True if stat collection has loaded statistics for all tables
485 : // other than those listed explicitly in pending. This flag starts
486 : // as false when a database is opened and flips to true once stat
487 : // collection has caught up.
488 : loadedInitial bool
489 : // A slice of files for which stats have not been computed.
490 : // Compactions, ingests, flushes append files to be processed. An
491 : // active stat collection goroutine clears the list and processes
492 : // them.
493 : pending []manifest.NewFileEntry
494 : }
495 :
496 : tableValidation struct {
497 : // cond is a condition variable used to signal the completion of a
498 : // job to validate one or more sstables.
499 : cond sync.Cond
500 : // pending is a slice of metadata for sstables waiting to be
501 : // validated. Only physical sstables should be added to the pending
502 : // queue.
503 : pending []newFileEntry
504 : // validating is set to true when validation is running.
505 : validating bool
506 : }
507 :
508 : // annotators contains various instances of manifest.Annotator which
509 : // should be protected from concurrent access.
510 : annotators struct {
511 : totalSize *manifest.Annotator[uint64]
512 : remoteSize *manifest.Annotator[uint64]
513 : externalSize *manifest.Annotator[uint64]
514 : }
515 : }
516 :
517 : // Normally equal to time.Now() but may be overridden in tests.
518 : timeNow func() time.Time
519 : // the time at database Open; may be used to compute metrics like effective
520 : // compaction concurrency
521 : openedAt time.Time
522 : }
523 :
524 : var _ Reader = (*DB)(nil)
525 : var _ Writer = (*DB)(nil)
526 :
527 : // TestOnlyWaitForCleaning MUST only be used in tests.
528 1 : func (d *DB) TestOnlyWaitForCleaning() {
529 1 : d.cleanupManager.Wait()
530 1 : }
531 :
532 : // Get gets the value for the given key. It returns ErrNotFound if the DB does
533 : // not contain the key.
534 : //
535 : // The caller should not modify the contents of the returned slice, but it is
536 : // safe to modify the contents of the argument after Get returns. The returned
537 : // slice will remain valid until the returned Closer is closed. On success, the
538 : // caller MUST call closer.Close() or a memory leak will occur.
539 2 : func (d *DB) Get(key []byte) ([]byte, io.Closer, error) {
540 2 : return d.getInternal(key, nil /* batch */, nil /* snapshot */)
541 2 : }
542 :
543 : type getIterAlloc struct {
544 : dbi Iterator
545 : keyBuf []byte
546 : get getIter
547 : }
548 :
549 : var getIterAllocPool = sync.Pool{
550 2 : New: func() interface{} {
551 2 : return &getIterAlloc{}
552 2 : },
553 : }
554 :
555 2 : func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer, error) {
556 2 : if err := d.closed.Load(); err != nil {
557 1 : panic(err)
558 : }
559 :
560 : // Grab and reference the current readState. This prevents the underlying
561 : // files in the associated version from being deleted if there is a current
562 : // compaction. The readState is unref'd by Iterator.Close().
563 2 : readState := d.loadReadState()
564 2 :
565 2 : // Determine the seqnum to read at after grabbing the read state (current and
566 2 : // memtables) above.
567 2 : var seqNum base.SeqNum
568 2 : if s != nil {
569 2 : seqNum = s.seqNum
570 2 : } else {
571 2 : seqNum = d.mu.versions.visibleSeqNum.Load()
572 2 : }
573 :
574 2 : buf := getIterAllocPool.Get().(*getIterAlloc)
575 2 :
576 2 : get := &buf.get
577 2 : *get = getIter{
578 2 : comparer: d.opts.Comparer,
579 2 : newIters: d.newIters,
580 2 : snapshot: seqNum,
581 2 : iterOpts: IterOptions{
582 2 : // TODO(sumeer): replace with a parameter provided by the caller.
583 2 : Category: categoryGet,
584 2 : logger: d.opts.Logger,
585 2 : snapshotForHideObsoletePoints: seqNum,
586 2 : },
587 2 : key: key,
588 2 : // Compute the key prefix for bloom filtering.
589 2 : prefix: key[:d.opts.Comparer.Split(key)],
590 2 : batch: b,
591 2 : mem: readState.memtables,
592 2 : l0: readState.current.L0SublevelFiles,
593 2 : version: readState.current,
594 2 : }
595 2 :
596 2 : // Strip off memtables which cannot possibly contain the seqNum being read
597 2 : // at.
598 2 : for len(get.mem) > 0 {
599 2 : n := len(get.mem)
600 2 : if logSeqNum := get.mem[n-1].logSeqNum; logSeqNum < seqNum {
601 2 : break
602 : }
603 2 : get.mem = get.mem[:n-1]
604 : }
605 :
606 2 : i := &buf.dbi
607 2 : pointIter := get
608 2 : *i = Iterator{
609 2 : ctx: context.Background(),
610 2 : getIterAlloc: buf,
611 2 : iter: pointIter,
612 2 : pointIter: pointIter,
613 2 : merge: d.merge,
614 2 : comparer: *d.opts.Comparer,
615 2 : readState: readState,
616 2 : keyBuf: buf.keyBuf,
617 2 : }
618 2 :
619 2 : if !i.First() {
620 2 : err := i.Close()
621 2 : if err != nil {
622 1 : return nil, nil, err
623 1 : }
624 2 : return nil, nil, ErrNotFound
625 : }
626 2 : return i.Value(), i, nil
627 : }
628 :
629 : // Set sets the value for the given key. It overwrites any previous value
630 : // for that key; a DB is not a multi-map.
631 : //
632 : // It is safe to modify the contents of the arguments after Set returns.
633 2 : func (d *DB) Set(key, value []byte, opts *WriteOptions) error {
634 2 : b := newBatch(d)
635 2 : _ = b.Set(key, value, opts)
636 2 : if err := d.Apply(b, opts); err != nil {
637 1 : return err
638 1 : }
639 : // Only release the batch on success.
640 2 : return b.Close()
641 : }
642 :
643 : // Delete deletes the value for the given key. Deletes are blind all will
644 : // succeed even if the given key does not exist.
645 : //
646 : // It is safe to modify the contents of the arguments after Delete returns.
647 2 : func (d *DB) Delete(key []byte, opts *WriteOptions) error {
648 2 : b := newBatch(d)
649 2 : _ = b.Delete(key, opts)
650 2 : if err := d.Apply(b, opts); err != nil {
651 1 : return err
652 1 : }
653 : // Only release the batch on success.
654 2 : return b.Close()
655 : }
656 :
657 : // DeleteSized behaves identically to Delete, but takes an additional
658 : // argument indicating the size of the value being deleted. DeleteSized
659 : // should be preferred when the caller has the expectation that there exists
660 : // a single internal KV pair for the key (eg, the key has not been
661 : // overwritten recently), and the caller knows the size of its value.
662 : //
663 : // DeleteSized will record the value size within the tombstone and use it to
664 : // inform compaction-picking heuristics which strive to reduce space
665 : // amplification in the LSM. This "calling your shot" mechanic allows the
666 : // storage engine to more accurately estimate and reduce space amplification.
667 : //
668 : // It is safe to modify the contents of the arguments after DeleteSized
669 : // returns.
670 1 : func (d *DB) DeleteSized(key []byte, valueSize uint32, opts *WriteOptions) error {
671 1 : b := newBatch(d)
672 1 : _ = b.DeleteSized(key, valueSize, opts)
673 1 : if err := d.Apply(b, opts); err != nil {
674 0 : return err
675 0 : }
676 : // Only release the batch on success.
677 1 : return b.Close()
678 : }
679 :
680 : // SingleDelete adds an action to the batch that single deletes the entry for key.
681 : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
682 : //
683 : // It is safe to modify the contents of the arguments after SingleDelete returns.
684 2 : func (d *DB) SingleDelete(key []byte, opts *WriteOptions) error {
685 2 : b := newBatch(d)
686 2 : _ = b.SingleDelete(key, opts)
687 2 : if err := d.Apply(b, opts); err != nil {
688 0 : return err
689 0 : }
690 : // Only release the batch on success.
691 2 : return b.Close()
692 : }
693 :
694 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
695 : // (inclusive on start, exclusive on end).
696 : //
697 : // It is safe to modify the contents of the arguments after DeleteRange
698 : // returns.
699 2 : func (d *DB) DeleteRange(start, end []byte, opts *WriteOptions) error {
700 2 : b := newBatch(d)
701 2 : _ = b.DeleteRange(start, end, opts)
702 2 : if err := d.Apply(b, opts); err != nil {
703 1 : return err
704 1 : }
705 : // Only release the batch on success.
706 2 : return b.Close()
707 : }
708 :
709 : // Merge adds an action to the DB that merges the value at key with the new
710 : // value. The details of the merge are dependent upon the configured merge
711 : // operator.
712 : //
713 : // It is safe to modify the contents of the arguments after Merge returns.
714 2 : func (d *DB) Merge(key, value []byte, opts *WriteOptions) error {
715 2 : b := newBatch(d)
716 2 : _ = b.Merge(key, value, opts)
717 2 : if err := d.Apply(b, opts); err != nil {
718 1 : return err
719 1 : }
720 : // Only release the batch on success.
721 2 : return b.Close()
722 : }
723 :
724 : // LogData adds the specified to the batch. The data will be written to the
725 : // WAL, but not added to memtables or sstables. Log data is never indexed,
726 : // which makes it useful for testing WAL performance.
727 : //
728 : // It is safe to modify the contents of the argument after LogData returns.
729 2 : func (d *DB) LogData(data []byte, opts *WriteOptions) error {
730 2 : b := newBatch(d)
731 2 : _ = b.LogData(data, opts)
732 2 : if err := d.Apply(b, opts); err != nil {
733 1 : return err
734 1 : }
735 : // Only release the batch on success.
736 2 : return b.Close()
737 : }
738 :
739 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
740 : // timestamp suffix to value. The suffix is optional. If any portion of the key
741 : // range [start, end) is already set by a range key with the same suffix value,
742 : // RangeKeySet overrides it.
743 : //
744 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
745 2 : func (d *DB) RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error {
746 2 : b := newBatch(d)
747 2 : _ = b.RangeKeySet(start, end, suffix, value, opts)
748 2 : if err := d.Apply(b, opts); err != nil {
749 0 : return err
750 0 : }
751 : // Only release the batch on success.
752 2 : return b.Close()
753 : }
754 :
755 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
756 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
757 : // range key. RangeKeyUnset only removes portions of range keys that fall within
758 : // the [start, end) key span, and only range keys with suffixes that exactly
759 : // match the unset suffix.
760 : //
761 : // It is safe to modify the contents of the arguments after RangeKeyUnset
762 : // returns.
763 2 : func (d *DB) RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error {
764 2 : b := newBatch(d)
765 2 : _ = b.RangeKeyUnset(start, end, suffix, opts)
766 2 : if err := d.Apply(b, opts); err != nil {
767 0 : return err
768 0 : }
769 : // Only release the batch on success.
770 2 : return b.Close()
771 : }
772 :
773 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
774 : // (inclusive on start, exclusive on end). It does not delete point keys (for
775 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
776 : // bounds, including those with or without suffixes.
777 : //
778 : // It is safe to modify the contents of the arguments after RangeKeyDelete
779 : // returns.
780 2 : func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error {
781 2 : b := newBatch(d)
782 2 : _ = b.RangeKeyDelete(start, end, opts)
783 2 : if err := d.Apply(b, opts); err != nil {
784 0 : return err
785 0 : }
786 : // Only release the batch on success.
787 2 : return b.Close()
788 : }
789 :
790 : // Apply the operations contained in the batch to the DB. If the batch is large
791 : // the contents of the batch may be retained by the database. If that occurs
792 : // the batch contents will be cleared preventing the caller from attempting to
793 : // reuse them.
794 : //
795 : // It is safe to modify the contents of the arguments after Apply returns.
796 : //
797 : // Apply returns ErrInvalidBatch if the provided batch is invalid in any way.
798 2 : func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
799 2 : return d.applyInternal(batch, opts, false)
800 2 : }
801 :
802 : // ApplyNoSyncWait must only be used when opts.Sync is true and the caller
803 : // does not want to wait for the WAL fsync to happen. The method will return
804 : // once the mutation is applied to the memtable and is visible (note that a
805 : // mutation is visible before the WAL sync even in the wait case, so we have
806 : // not weakened the durability semantics). The caller must call Batch.SyncWait
807 : // to wait for the WAL fsync. The caller must not Close the batch without
808 : // first calling Batch.SyncWait.
809 : //
810 : // RECOMMENDATION: Prefer using Apply unless you really understand why you
811 : // need ApplyNoSyncWait.
812 : // EXPERIMENTAL: API/feature subject to change. Do not yet use outside
813 : // CockroachDB.
814 2 : func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error {
815 2 : if !opts.Sync {
816 0 : return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false")
817 0 : }
818 2 : return d.applyInternal(batch, opts, true)
819 : }
820 :
821 : // REQUIRES: noSyncWait => opts.Sync
822 2 : func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error {
823 2 : if err := d.closed.Load(); err != nil {
824 1 : panic(err)
825 : }
826 2 : if batch.committing {
827 0 : panic("pebble: batch already committing")
828 : }
829 2 : if batch.applied.Load() {
830 0 : panic("pebble: batch already applied")
831 : }
832 2 : if d.opts.ReadOnly {
833 1 : return ErrReadOnly
834 1 : }
835 2 : if batch.db != nil && batch.db != d {
836 1 : panic(fmt.Sprintf("pebble: batch db mismatch: %p != %p", batch.db, d))
837 : }
838 :
839 2 : sync := opts.GetSync()
840 2 : if sync && d.opts.DisableWAL {
841 0 : return errors.New("pebble: WAL disabled")
842 0 : }
843 :
844 2 : if fmv := d.FormatMajorVersion(); fmv < batch.minimumFormatMajorVersion {
845 0 : panic(fmt.Sprintf(
846 0 : "pebble: batch requires at least format major version %d (current: %d)",
847 0 : batch.minimumFormatMajorVersion, fmv,
848 0 : ))
849 : }
850 :
851 2 : if batch.countRangeKeys > 0 {
852 2 : if d.split == nil {
853 0 : return errNoSplit
854 0 : }
855 : }
856 2 : batch.committing = true
857 2 :
858 2 : if batch.db == nil {
859 1 : if err := batch.refreshMemTableSize(); err != nil {
860 0 : return err
861 0 : }
862 : }
863 2 : if batch.memTableSize >= d.largeBatchThreshold {
864 2 : var err error
865 2 : batch.flushable, err = newFlushableBatch(batch, d.opts.Comparer)
866 2 : if err != nil {
867 0 : return err
868 0 : }
869 : }
870 2 : if err := d.commit.Commit(batch, sync, noSyncWait); err != nil {
871 0 : // There isn't much we can do on an error here. The commit pipeline will be
872 0 : // horked at this point.
873 0 : d.opts.Logger.Fatalf("pebble: fatal commit error: %v", err)
874 0 : }
875 : // If this is a large batch, we need to clear the batch contents as the
876 : // flushable batch may still be present in the flushables queue.
877 : //
878 : // TODO(peter): Currently large batches are written to the WAL. We could
879 : // skip the WAL write and instead wait for the large batch to be flushed to
880 : // an sstable. For a 100 MB batch, this might actually be faster. For a 1
881 : // GB batch this is almost certainly faster.
882 2 : if batch.flushable != nil {
883 2 : batch.data = nil
884 2 : }
885 2 : return nil
886 : }
887 :
888 2 : func (d *DB) commitApply(b *Batch, mem *memTable) error {
889 2 : if b.flushable != nil {
890 2 : // This is a large batch which was already added to the immutable queue.
891 2 : return nil
892 2 : }
893 2 : err := mem.apply(b, b.SeqNum())
894 2 : if err != nil {
895 0 : return err
896 0 : }
897 :
898 : // If the batch contains range tombstones and the database is configured
899 : // to flush range deletions, schedule a delayed flush so that disk space
900 : // may be reclaimed without additional writes or an explicit flush.
901 2 : if b.countRangeDels > 0 && d.opts.FlushDelayDeleteRange > 0 {
902 2 : d.mu.Lock()
903 2 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayDeleteRange)
904 2 : d.mu.Unlock()
905 2 : }
906 :
907 : // If the batch contains range keys and the database is configured to flush
908 : // range keys, schedule a delayed flush so that the range keys are cleared
909 : // from the memtable.
910 2 : if b.countRangeKeys > 0 && d.opts.FlushDelayRangeKey > 0 {
911 2 : d.mu.Lock()
912 2 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayRangeKey)
913 2 : d.mu.Unlock()
914 2 : }
915 :
916 2 : if mem.writerUnref() {
917 2 : d.mu.Lock()
918 2 : d.maybeScheduleFlush()
919 2 : d.mu.Unlock()
920 2 : }
921 2 : return nil
922 : }
923 :
924 2 : func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
925 2 : var size int64
926 2 : repr := b.Repr()
927 2 :
928 2 : if b.flushable != nil {
929 2 : // We have a large batch. Such batches are special in that they don't get
930 2 : // added to the memtable, and are instead inserted into the queue of
931 2 : // memtables. The call to makeRoomForWrite with this batch will force the
932 2 : // current memtable to be flushed. We want the large batch to be part of
933 2 : // the same log, so we add it to the WAL here, rather than after the call
934 2 : // to makeRoomForWrite().
935 2 : //
936 2 : // Set the sequence number since it was not set to the correct value earlier
937 2 : // (see comment in newFlushableBatch()).
938 2 : b.flushable.setSeqNum(b.SeqNum())
939 2 : if !d.opts.DisableWAL {
940 2 : var err error
941 2 : size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
942 2 : if err != nil {
943 0 : panic(err)
944 : }
945 : }
946 : }
947 :
948 2 : var err error
949 2 : // Grab a reference to the memtable. We don't hold DB.mu, but we do hold
950 2 : // d.commit.mu. It's okay for readers of d.mu.mem.mutable to only hold one of
951 2 : // d.commit.mu or d.mu, because memtable rotations require holding both.
952 2 : mem := d.mu.mem.mutable
953 2 : // Batches which contain keys of kind InternalKeyKindIngestSST will
954 2 : // never be applied to the memtable, so we don't need to make room for
955 2 : // write.
956 2 : if !b.ingestedSSTBatch {
957 2 : // Flushable batches will require a rotation of the memtable regardless,
958 2 : // so only attempt an optimistic reservation of space in the current
959 2 : // memtable if this batch is not a large flushable batch.
960 2 : if b.flushable == nil {
961 2 : err = d.mu.mem.mutable.prepare(b)
962 2 : }
963 2 : if b.flushable != nil || err == arenaskl.ErrArenaFull {
964 2 : // Slow path.
965 2 : // We need to acquire DB.mu and rotate the memtable.
966 2 : func() {
967 2 : d.mu.Lock()
968 2 : defer d.mu.Unlock()
969 2 : err = d.makeRoomForWrite(b)
970 2 : mem = d.mu.mem.mutable
971 2 : }()
972 : }
973 : }
974 2 : if err != nil {
975 0 : return nil, err
976 0 : }
977 2 : if d.opts.DisableWAL {
978 2 : return mem, nil
979 2 : }
980 2 : d.logBytesIn.Add(uint64(len(repr)))
981 2 :
982 2 : if b.flushable == nil {
983 2 : size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
984 2 : if err != nil {
985 0 : panic(err)
986 : }
987 : }
988 :
989 2 : d.logSize.Store(uint64(size))
990 2 : return mem, err
991 : }
992 :
993 : type iterAlloc struct {
994 : dbi Iterator
995 : keyBuf []byte
996 : boundsBuf [2][]byte
997 : prefixOrFullSeekKey []byte
998 : merging mergingIter
999 : mlevels [3 + numLevels]mergingIterLevel
1000 : levels [3 + numLevels]levelIter
1001 : levelsPositioned [3 + numLevels]bool
1002 : }
1003 :
1004 : var iterAllocPool = sync.Pool{
1005 2 : New: func() interface{} {
1006 2 : return &iterAlloc{}
1007 2 : },
1008 : }
1009 :
1010 : // snapshotIterOpts denotes snapshot-related iterator options when calling
1011 : // newIter. These are the possible cases for a snapshotIterOpts:
1012 : // - No snapshot: All fields are zero values.
1013 : // - Classic snapshot: Only `seqNum` is set. The latest readState will be used
1014 : // and the specified seqNum will be used as the snapshot seqNum.
1015 : // - EventuallyFileOnlySnapshot (EFOS) behaving as a classic snapshot. Only
1016 : // the `seqNum` is set. The latest readState will be used
1017 : // and the specified seqNum will be used as the snapshot seqNum.
1018 : // - EFOS in file-only state: Only `seqNum` and `vers` are set. All the
1019 : // relevant SSTs are referenced by the *version.
1020 : // - EFOS that has been excised but is in alwaysCreateIters mode (tests only).
1021 : // Only `seqNum` and `readState` are set.
1022 : type snapshotIterOpts struct {
1023 : seqNum base.SeqNum
1024 : vers *version
1025 : readState *readState
1026 : }
1027 :
1028 : type batchIterOpts struct {
1029 : batchOnly bool
1030 : }
1031 : type newIterOpts struct {
1032 : snapshot snapshotIterOpts
1033 : batch batchIterOpts
1034 : }
1035 :
1036 : // newIter constructs a new iterator, merging in batch iterators as an extra
1037 : // level.
1038 : func (d *DB) newIter(
1039 : ctx context.Context, batch *Batch, newIterOpts newIterOpts, o *IterOptions,
1040 2 : ) *Iterator {
1041 2 : if newIterOpts.batch.batchOnly {
1042 1 : if batch == nil {
1043 0 : panic("batchOnly is true, but batch is nil")
1044 : }
1045 1 : if newIterOpts.snapshot.vers != nil {
1046 0 : panic("batchOnly is true, but snapshotIterOpts is initialized")
1047 : }
1048 : }
1049 2 : if err := d.closed.Load(); err != nil {
1050 1 : panic(err)
1051 : }
1052 2 : seqNum := newIterOpts.snapshot.seqNum
1053 2 : if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
1054 0 : panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
1055 : }
1056 2 : if (batch != nil || seqNum != 0) && (o != nil && o.OnlyReadGuaranteedDurable) {
1057 1 : // We could add support for OnlyReadGuaranteedDurable on snapshots if
1058 1 : // there was a need: this would require checking that the sequence number
1059 1 : // of the snapshot has been flushed, by comparing with
1060 1 : // DB.mem.queue[0].logSeqNum.
1061 1 : panic("OnlyReadGuaranteedDurable is not supported for batches or snapshots")
1062 : }
1063 2 : var readState *readState
1064 2 : var newIters tableNewIters
1065 2 : var newIterRangeKey keyspanimpl.TableNewSpanIter
1066 2 : if !newIterOpts.batch.batchOnly {
1067 2 : // Grab and reference the current readState. This prevents the underlying
1068 2 : // files in the associated version from being deleted if there is a current
1069 2 : // compaction. The readState is unref'd by Iterator.Close().
1070 2 : if newIterOpts.snapshot.vers == nil {
1071 2 : if newIterOpts.snapshot.readState != nil {
1072 0 : readState = newIterOpts.snapshot.readState
1073 0 : readState.ref()
1074 2 : } else {
1075 2 : // NB: loadReadState() calls readState.ref().
1076 2 : readState = d.loadReadState()
1077 2 : }
1078 2 : } else {
1079 2 : // vers != nil
1080 2 : newIterOpts.snapshot.vers.Ref()
1081 2 : }
1082 :
1083 : // Determine the seqnum to read at after grabbing the read state (current and
1084 : // memtables) above.
1085 2 : if seqNum == 0 {
1086 2 : seqNum = d.mu.versions.visibleSeqNum.Load()
1087 2 : }
1088 2 : newIters = d.newIters
1089 2 : newIterRangeKey = d.tableNewRangeKeyIter
1090 : }
1091 :
1092 : // Bundle various structures under a single umbrella in order to allocate
1093 : // them together.
1094 2 : buf := iterAllocPool.Get().(*iterAlloc)
1095 2 : dbi := &buf.dbi
1096 2 : *dbi = Iterator{
1097 2 : ctx: ctx,
1098 2 : alloc: buf,
1099 2 : merge: d.merge,
1100 2 : comparer: *d.opts.Comparer,
1101 2 : readState: readState,
1102 2 : version: newIterOpts.snapshot.vers,
1103 2 : keyBuf: buf.keyBuf,
1104 2 : prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
1105 2 : boundsBuf: buf.boundsBuf,
1106 2 : batch: batch,
1107 2 : tc: d.tableCache,
1108 2 : newIters: newIters,
1109 2 : newIterRangeKey: newIterRangeKey,
1110 2 : seqNum: seqNum,
1111 2 : batchOnlyIter: newIterOpts.batch.batchOnly,
1112 2 : }
1113 2 : if o != nil {
1114 2 : dbi.opts = *o
1115 2 : dbi.processBounds(o.LowerBound, o.UpperBound)
1116 2 : }
1117 2 : dbi.opts.logger = d.opts.Logger
1118 2 : if d.opts.private.disableLazyCombinedIteration {
1119 2 : dbi.opts.disableLazyCombinedIteration = true
1120 2 : }
1121 2 : if batch != nil {
1122 2 : dbi.batchSeqNum = dbi.batch.nextSeqNum()
1123 2 : }
1124 2 : return finishInitializingIter(ctx, buf)
1125 : }
1126 :
1127 : // finishInitializingIter is a helper for doing the non-trivial initialization
1128 : // of an Iterator. It's invoked to perform the initial initialization of an
1129 : // Iterator during NewIter or Clone, and to perform reinitialization due to a
1130 : // change in IterOptions by a call to Iterator.SetOptions.
1131 2 : func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
1132 2 : // Short-hand.
1133 2 : dbi := &buf.dbi
1134 2 : var memtables flushableList
1135 2 : if dbi.readState != nil {
1136 2 : memtables = dbi.readState.memtables
1137 2 : }
1138 2 : if dbi.opts.OnlyReadGuaranteedDurable {
1139 1 : memtables = nil
1140 2 : } else {
1141 2 : // We only need to read from memtables which contain sequence numbers older
1142 2 : // than seqNum. Trim off newer memtables.
1143 2 : for i := len(memtables) - 1; i >= 0; i-- {
1144 2 : if logSeqNum := memtables[i].logSeqNum; logSeqNum < dbi.seqNum {
1145 2 : break
1146 : }
1147 2 : memtables = memtables[:i]
1148 : }
1149 : }
1150 :
1151 2 : if dbi.opts.pointKeys() {
1152 2 : // Construct the point iterator, initializing dbi.pointIter to point to
1153 2 : // dbi.merging. If this is called during a SetOptions call and this
1154 2 : // Iterator has already initialized dbi.merging, constructPointIter is a
1155 2 : // noop and an initialized pointIter already exists in dbi.pointIter.
1156 2 : dbi.constructPointIter(ctx, memtables, buf)
1157 2 : dbi.iter = dbi.pointIter
1158 2 : } else {
1159 2 : dbi.iter = emptyIter
1160 2 : }
1161 :
1162 2 : if dbi.opts.rangeKeys() {
1163 2 : dbi.rangeKeyMasking.init(dbi, &dbi.comparer)
1164 2 :
1165 2 : // When iterating over both point and range keys, don't create the
1166 2 : // range-key iterator stack immediately if we can avoid it. This
1167 2 : // optimization takes advantage of the expected sparseness of range
1168 2 : // keys, and configures the point-key iterator to dynamically switch to
1169 2 : // combined iteration when it observes a file containing range keys.
1170 2 : //
1171 2 : // Lazy combined iteration is not possible if a batch or a memtable
1172 2 : // contains any range keys.
1173 2 : useLazyCombinedIteration := dbi.rangeKey == nil &&
1174 2 : dbi.opts.KeyTypes == IterKeyTypePointsAndRanges &&
1175 2 : (dbi.batch == nil || dbi.batch.countRangeKeys == 0) &&
1176 2 : !dbi.opts.disableLazyCombinedIteration
1177 2 : if useLazyCombinedIteration {
1178 2 : // The user requested combined iteration, and there's no indexed
1179 2 : // batch currently containing range keys that would prevent lazy
1180 2 : // combined iteration. Check the memtables to see if they contain
1181 2 : // any range keys.
1182 2 : for i := range memtables {
1183 2 : if memtables[i].containsRangeKeys() {
1184 2 : useLazyCombinedIteration = false
1185 2 : break
1186 : }
1187 : }
1188 : }
1189 :
1190 2 : if useLazyCombinedIteration {
1191 2 : dbi.lazyCombinedIter = lazyCombinedIter{
1192 2 : parent: dbi,
1193 2 : pointIter: dbi.pointIter,
1194 2 : combinedIterState: combinedIterState{
1195 2 : initialized: false,
1196 2 : },
1197 2 : }
1198 2 : dbi.iter = &dbi.lazyCombinedIter
1199 2 : dbi.iter = invalidating.MaybeWrapIfInvariants(dbi.iter)
1200 2 : } else {
1201 2 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{
1202 2 : initialized: true,
1203 2 : }
1204 2 : if dbi.rangeKey == nil {
1205 2 : dbi.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1206 2 : dbi.rangeKey.init(dbi.comparer.Compare, dbi.comparer.Split, &dbi.opts)
1207 2 : dbi.constructRangeKeyIter()
1208 2 : } else {
1209 2 : dbi.rangeKey.iterConfig.SetBounds(dbi.opts.LowerBound, dbi.opts.UpperBound)
1210 2 : }
1211 :
1212 : // Wrap the point iterator (currently dbi.iter) with an interleaving
1213 : // iterator that interleaves range keys pulled from
1214 : // dbi.rangeKey.rangeKeyIter.
1215 : //
1216 : // NB: The interleaving iterator is always reinitialized, even if
1217 : // dbi already had an initialized range key iterator, in case the point
1218 : // iterator changed or the range key masking suffix changed.
1219 2 : dbi.rangeKey.iiter.Init(&dbi.comparer, dbi.iter, dbi.rangeKey.rangeKeyIter,
1220 2 : keyspan.InterleavingIterOpts{
1221 2 : Mask: &dbi.rangeKeyMasking,
1222 2 : LowerBound: dbi.opts.LowerBound,
1223 2 : UpperBound: dbi.opts.UpperBound,
1224 2 : })
1225 2 : dbi.iter = &dbi.rangeKey.iiter
1226 : }
1227 2 : } else {
1228 2 : // !dbi.opts.rangeKeys()
1229 2 : //
1230 2 : // Reset the combined iterator state. The initialized=true ensures the
1231 2 : // iterator doesn't unnecessarily try to switch to combined iteration.
1232 2 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{initialized: true}
1233 2 : }
1234 2 : return dbi
1235 : }
1236 :
1237 : // ScanInternal scans all internal keys within the specified bounds, truncating
1238 : // any rangedels and rangekeys to those bounds if they span past them. For use
1239 : // when an external user needs to be aware of all internal keys that make up a
1240 : // key range.
1241 : //
1242 : // Keys deleted by range deletions must not be returned or exposed by this
1243 : // method, while the range deletion deleting that key must be exposed using
1244 : // visitRangeDel. Keys that would be masked by range key masking (if an
1245 : // appropriate prefix were set) should be exposed, alongside the range key
1246 : // that would have masked it. This method also collapses all point keys into
1247 : // one InternalKey; so only one internal key at most per user key is returned
1248 : // to visitPointKey.
1249 : //
1250 : // If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
1251 : // mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
1252 : // their metadatas truncated to [lower, upper) and passed into visitSharedFile.
1253 : // ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
1254 : // sstable in L5 or L6 is found that is not in shared storage according to
1255 : // provider.IsShared, or an sstable in those levels contains a newer key than the
1256 : // snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
1257 : // of when this could happen could be if Pebble started writing sstables before a
1258 : // creator ID was set (as creator IDs are necessary to enable shared storage)
1259 : // resulting in some lower level SSTs being on non-shared storage. Skip-shared
1260 : // iteration is invalid in those cases.
1261 : func (d *DB) ScanInternal(
1262 : ctx context.Context,
1263 : category sstable.Category,
1264 : lower, upper []byte,
1265 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
1266 : visitRangeDel func(start, end []byte, seqNum SeqNum) error,
1267 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
1268 : visitSharedFile func(sst *SharedSSTMeta) error,
1269 : visitExternalFile func(sst *ExternalFile) error,
1270 2 : ) error {
1271 2 : scanInternalOpts := &scanInternalOptions{
1272 2 : category: category,
1273 2 : visitPointKey: visitPointKey,
1274 2 : visitRangeDel: visitRangeDel,
1275 2 : visitRangeKey: visitRangeKey,
1276 2 : visitSharedFile: visitSharedFile,
1277 2 : visitExternalFile: visitExternalFile,
1278 2 : IterOptions: IterOptions{
1279 2 : KeyTypes: IterKeyTypePointsAndRanges,
1280 2 : LowerBound: lower,
1281 2 : UpperBound: upper,
1282 2 : },
1283 2 : }
1284 2 : iter, err := d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, scanInternalOpts)
1285 2 : if err != nil {
1286 0 : return err
1287 0 : }
1288 2 : defer iter.close()
1289 2 : return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
1290 : }
1291 :
1292 : // newInternalIter constructs and returns a new scanInternalIterator on this db.
1293 : // If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
1294 : // to the internal iterator.
1295 : //
1296 : // TODO(bilal): This method has a lot of similarities with db.newIter as well as
1297 : // finishInitializingIter. Both pairs of methods should be refactored to reduce
1298 : // this duplication.
1299 : func (d *DB) newInternalIter(
1300 : ctx context.Context, sOpts snapshotIterOpts, o *scanInternalOptions,
1301 2 : ) (*scanInternalIterator, error) {
1302 2 : if err := d.closed.Load(); err != nil {
1303 0 : panic(err)
1304 : }
1305 : // Grab and reference the current readState. This prevents the underlying
1306 : // files in the associated version from being deleted if there is a current
1307 : // compaction. The readState is unref'd by Iterator.Close().
1308 2 : var readState *readState
1309 2 : if sOpts.vers == nil {
1310 2 : if sOpts.readState != nil {
1311 0 : readState = sOpts.readState
1312 0 : readState.ref()
1313 2 : } else {
1314 2 : readState = d.loadReadState()
1315 2 : }
1316 : }
1317 2 : if sOpts.vers != nil {
1318 1 : sOpts.vers.Ref()
1319 1 : }
1320 :
1321 : // Determine the seqnum to read at after grabbing the read state (current and
1322 : // memtables) above.
1323 2 : seqNum := sOpts.seqNum
1324 2 : if seqNum == 0 {
1325 2 : seqNum = d.mu.versions.visibleSeqNum.Load()
1326 2 : }
1327 :
1328 : // Bundle various structures under a single umbrella in order to allocate
1329 : // them together.
1330 2 : buf := iterAllocPool.Get().(*iterAlloc)
1331 2 : dbi := &scanInternalIterator{
1332 2 : ctx: ctx,
1333 2 : db: d,
1334 2 : comparer: d.opts.Comparer,
1335 2 : merge: d.opts.Merger.Merge,
1336 2 : readState: readState,
1337 2 : version: sOpts.vers,
1338 2 : alloc: buf,
1339 2 : newIters: d.newIters,
1340 2 : newIterRangeKey: d.tableNewRangeKeyIter,
1341 2 : seqNum: seqNum,
1342 2 : mergingIter: &buf.merging,
1343 2 : }
1344 2 : dbi.opts = *o
1345 2 : dbi.opts.logger = d.opts.Logger
1346 2 : if d.opts.private.disableLazyCombinedIteration {
1347 1 : dbi.opts.disableLazyCombinedIteration = true
1348 1 : }
1349 2 : return finishInitializingInternalIter(buf, dbi)
1350 : }
1351 :
1352 : func finishInitializingInternalIter(
1353 : buf *iterAlloc, i *scanInternalIterator,
1354 2 : ) (*scanInternalIterator, error) {
1355 2 : // Short-hand.
1356 2 : var memtables flushableList
1357 2 : if i.readState != nil {
1358 2 : memtables = i.readState.memtables
1359 2 : }
1360 : // We only need to read from memtables which contain sequence numbers older
1361 : // than seqNum. Trim off newer memtables.
1362 2 : for j := len(memtables) - 1; j >= 0; j-- {
1363 2 : if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
1364 2 : break
1365 : }
1366 2 : memtables = memtables[:j]
1367 : }
1368 2 : i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
1369 2 :
1370 2 : if err := i.constructPointIter(i.opts.category, memtables, buf); err != nil {
1371 0 : return nil, err
1372 0 : }
1373 :
1374 : // For internal iterators, we skip the lazy combined iteration optimization
1375 : // entirely, and create the range key iterator stack directly.
1376 2 : i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1377 2 : i.rangeKey.init(i.comparer.Compare, i.comparer.Split, &i.opts.IterOptions)
1378 2 : if err := i.constructRangeKeyIter(); err != nil {
1379 0 : return nil, err
1380 0 : }
1381 :
1382 : // Wrap the point iterator (currently i.iter) with an interleaving
1383 : // iterator that interleaves range keys pulled from
1384 : // i.rangeKey.rangeKeyIter.
1385 2 : i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
1386 2 : keyspan.InterleavingIterOpts{
1387 2 : LowerBound: i.opts.LowerBound,
1388 2 : UpperBound: i.opts.UpperBound,
1389 2 : })
1390 2 : i.iter = &i.rangeKey.iiter
1391 2 :
1392 2 : return i, nil
1393 : }
1394 :
1395 : func (i *Iterator) constructPointIter(
1396 : ctx context.Context, memtables flushableList, buf *iterAlloc,
1397 2 : ) {
1398 2 : if i.pointIter != nil {
1399 2 : // Already have one.
1400 2 : return
1401 2 : }
1402 2 : internalOpts := internalIterOpts{
1403 2 : stats: &i.stats.InternalStats,
1404 2 : }
1405 2 : // If the table cache has a sstable stats collector, ask it for an
1406 2 : // accumulator for this iterator's configured category and QoS. All SSTable
1407 2 : // iterators created by this Iterator will accumulate their stats to it as
1408 2 : // they Close during iteration.
1409 2 : if collector := i.tc.dbOpts.sstStatsCollector; collector != nil {
1410 2 : internalOpts.iterStatsAccumulator = collector.Accumulator(
1411 2 : uint64(uintptr(unsafe.Pointer(i))),
1412 2 : i.opts.Category,
1413 2 : )
1414 2 : }
1415 2 : if i.opts.RangeKeyMasking.Filter != nil {
1416 2 : internalOpts.boundLimitedFilter = &i.rangeKeyMasking
1417 2 : }
1418 :
1419 : // Merging levels and levels from iterAlloc.
1420 2 : mlevels := buf.mlevels[:0]
1421 2 : levels := buf.levels[:0]
1422 2 :
1423 2 : // We compute the number of levels needed ahead of time and reallocate a slice if
1424 2 : // the array from the iterAlloc isn't large enough. Doing this allocation once
1425 2 : // should improve the performance.
1426 2 : numMergingLevels := 0
1427 2 : numLevelIters := 0
1428 2 : if i.batch != nil {
1429 2 : numMergingLevels++
1430 2 : }
1431 :
1432 2 : var current *version
1433 2 : if !i.batchOnlyIter {
1434 2 : numMergingLevels += len(memtables)
1435 2 :
1436 2 : current = i.version
1437 2 : if current == nil {
1438 2 : current = i.readState.current
1439 2 : }
1440 2 : numMergingLevels += len(current.L0SublevelFiles)
1441 2 : numLevelIters += len(current.L0SublevelFiles)
1442 2 : for level := 1; level < len(current.Levels); level++ {
1443 2 : if current.Levels[level].Empty() {
1444 2 : continue
1445 : }
1446 2 : numMergingLevels++
1447 2 : numLevelIters++
1448 : }
1449 : }
1450 :
1451 2 : if numMergingLevels > cap(mlevels) {
1452 2 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
1453 2 : }
1454 2 : if numLevelIters > cap(levels) {
1455 2 : levels = make([]levelIter, 0, numLevelIters)
1456 2 : }
1457 :
1458 : // Top-level is the batch, if any.
1459 2 : if i.batch != nil {
1460 2 : if i.batch.index == nil {
1461 0 : // This isn't an indexed batch. We shouldn't have gotten this far.
1462 0 : panic(errors.AssertionFailedf("creating an iterator over an unindexed batch"))
1463 2 : } else {
1464 2 : i.batch.initInternalIter(&i.opts, &i.batchPointIter)
1465 2 : i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum)
1466 2 : // Only include the batch's rangedel iterator if it's non-empty.
1467 2 : // This requires some subtle logic in the case a rangedel is later
1468 2 : // written to the batch and the view of the batch is refreshed
1469 2 : // during a call to SetOptions—in this case, we need to reconstruct
1470 2 : // the point iterator to add the batch rangedel iterator.
1471 2 : var rangeDelIter keyspan.FragmentIterator
1472 2 : if i.batchRangeDelIter.Count() > 0 {
1473 2 : rangeDelIter = &i.batchRangeDelIter
1474 2 : }
1475 2 : mlevels = append(mlevels, mergingIterLevel{
1476 2 : iter: &i.batchPointIter,
1477 2 : rangeDelIter: rangeDelIter,
1478 2 : })
1479 : }
1480 : }
1481 :
1482 2 : if !i.batchOnlyIter {
1483 2 : // Next are the memtables.
1484 2 : for j := len(memtables) - 1; j >= 0; j-- {
1485 2 : mem := memtables[j]
1486 2 : mlevels = append(mlevels, mergingIterLevel{
1487 2 : iter: mem.newIter(&i.opts),
1488 2 : rangeDelIter: mem.newRangeDelIter(&i.opts),
1489 2 : })
1490 2 : }
1491 :
1492 : // Next are the file levels: L0 sub-levels followed by lower levels.
1493 2 : mlevelsIndex := len(mlevels)
1494 2 : levelsIndex := len(levels)
1495 2 : mlevels = mlevels[:numMergingLevels]
1496 2 : levels = levels[:numLevelIters]
1497 2 : i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
1498 2 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Layer) {
1499 2 : li := &levels[levelsIndex]
1500 2 :
1501 2 : li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
1502 2 : li.initRangeDel(&mlevels[mlevelsIndex])
1503 2 : li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
1504 2 : mlevels[mlevelsIndex].levelIter = li
1505 2 : mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
1506 2 :
1507 2 : levelsIndex++
1508 2 : mlevelsIndex++
1509 2 : }
1510 :
1511 : // Add level iterators for the L0 sublevels, iterating from newest to
1512 : // oldest.
1513 2 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
1514 2 : addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
1515 2 : }
1516 :
1517 : // Add level iterators for the non-empty non-L0 levels.
1518 2 : for level := 1; level < len(current.Levels); level++ {
1519 2 : if current.Levels[level].Empty() {
1520 2 : continue
1521 : }
1522 2 : addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
1523 : }
1524 : }
1525 2 : buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
1526 2 : if len(mlevels) <= cap(buf.levelsPositioned) {
1527 2 : buf.merging.levelsPositioned = buf.levelsPositioned[:len(mlevels)]
1528 2 : }
1529 2 : buf.merging.snapshot = i.seqNum
1530 2 : buf.merging.batchSnapshot = i.batchSeqNum
1531 2 : buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
1532 2 : i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging).(topLevelIterator)
1533 2 : i.merging = &buf.merging
1534 : }
1535 :
1536 : // NewBatch returns a new empty write-only batch. Any reads on the batch will
1537 : // return an error. If the batch is committed it will be applied to the DB.
1538 2 : func (d *DB) NewBatch(opts ...BatchOption) *Batch {
1539 2 : return newBatch(d, opts...)
1540 2 : }
1541 :
1542 : // NewBatchWithSize is mostly identical to NewBatch, but it will allocate the
1543 : // the specified memory space for the internal slice in advance.
1544 0 : func (d *DB) NewBatchWithSize(size int, opts ...BatchOption) *Batch {
1545 0 : return newBatchWithSize(d, size, opts...)
1546 0 : }
1547 :
1548 : // NewIndexedBatch returns a new empty read-write batch. Any reads on the batch
1549 : // will read from both the batch and the DB. If the batch is committed it will
1550 : // be applied to the DB. An indexed batch is slower that a non-indexed batch
1551 : // for insert operations. If you do not need to perform reads on the batch, use
1552 : // NewBatch instead.
1553 2 : func (d *DB) NewIndexedBatch() *Batch {
1554 2 : return newIndexedBatch(d, d.opts.Comparer)
1555 2 : }
1556 :
1557 : // NewIndexedBatchWithSize is mostly identical to NewIndexedBatch, but it will
1558 : // allocate the specified memory space for the internal slice in advance.
1559 0 : func (d *DB) NewIndexedBatchWithSize(size int) *Batch {
1560 0 : return newIndexedBatchWithSize(d, d.opts.Comparer, size)
1561 0 : }
1562 :
1563 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
1564 : // return false). The iterator can be positioned via a call to SeekGE, SeekLT,
1565 : // First or Last. The iterator provides a point-in-time view of the current DB
1566 : // state. This view is maintained by preventing file deletions and preventing
1567 : // memtables referenced by the iterator from being deleted. Using an iterator
1568 : // to maintain a long-lived point-in-time view of the DB state can lead to an
1569 : // apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for
1570 : // point-in-time snapshots which avoids these problems.
1571 2 : func (d *DB) NewIter(o *IterOptions) (*Iterator, error) {
1572 2 : return d.NewIterWithContext(context.Background(), o)
1573 2 : }
1574 :
1575 : // NewIterWithContext is like NewIter, and additionally accepts a context for
1576 : // tracing.
1577 2 : func (d *DB) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
1578 2 : return d.newIter(ctx, nil /* batch */, newIterOpts{}, o), nil
1579 2 : }
1580 :
1581 : // NewSnapshot returns a point-in-time view of the current DB state. Iterators
1582 : // created with this handle will all observe a stable snapshot of the current
1583 : // DB state. The caller must call Snapshot.Close() when the snapshot is no
1584 : // longer needed. Snapshots are not persisted across DB restarts (close ->
1585 : // open). Unlike the implicit snapshot maintained by an iterator, a snapshot
1586 : // will not prevent memtables from being released or sstables from being
1587 : // deleted. Instead, a snapshot prevents deletion of sequence numbers
1588 : // referenced by the snapshot.
1589 2 : func (d *DB) NewSnapshot() *Snapshot {
1590 2 : if err := d.closed.Load(); err != nil {
1591 1 : panic(err)
1592 : }
1593 :
1594 2 : d.mu.Lock()
1595 2 : s := &Snapshot{
1596 2 : db: d,
1597 2 : seqNum: d.mu.versions.visibleSeqNum.Load(),
1598 2 : }
1599 2 : d.mu.snapshots.pushBack(s)
1600 2 : d.mu.Unlock()
1601 2 : return s
1602 : }
1603 :
1604 : // NewEventuallyFileOnlySnapshot returns a point-in-time view of the current DB
1605 : // state, similar to NewSnapshot, but with consistency constrained to the
1606 : // provided set of key ranges. See the comment at EventuallyFileOnlySnapshot for
1607 : // its semantics.
1608 2 : func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
1609 2 : if err := d.closed.Load(); err != nil {
1610 0 : panic(err)
1611 : }
1612 2 : for i := range keyRanges {
1613 2 : if i > 0 && d.cmp(keyRanges[i-1].End, keyRanges[i].Start) > 0 {
1614 0 : panic("pebble: key ranges for eventually-file-only-snapshot not in order")
1615 : }
1616 : }
1617 2 : return d.makeEventuallyFileOnlySnapshot(keyRanges)
1618 : }
1619 :
1620 : // Close closes the DB.
1621 : //
1622 : // It is not safe to close a DB until all outstanding iterators are closed
1623 : // or to call Close concurrently with any other DB method. It is not valid
1624 : // to call any of a DB's methods after the DB has been closed.
1625 2 : func (d *DB) Close() error {
1626 2 : // Lock the commit pipeline for the duration of Close. This prevents a race
1627 2 : // with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires
1628 2 : // dropping d.mu several times for I/O. If Close only holds d.mu, an
1629 2 : // in-progress WAL rotation may re-acquire d.mu only once the database is
1630 2 : // closed.
1631 2 : //
1632 2 : // Additionally, locking the commit pipeline makes it more likely that
1633 2 : // (illegal) concurrent writes will observe d.closed.Load() != nil, creating
1634 2 : // more understable panics if the database is improperly used concurrently
1635 2 : // during Close.
1636 2 : d.commit.mu.Lock()
1637 2 : defer d.commit.mu.Unlock()
1638 2 : d.mu.Lock()
1639 2 : defer d.mu.Unlock()
1640 2 : if err := d.closed.Load(); err != nil {
1641 1 : panic(err)
1642 : }
1643 :
1644 : // Clear the finalizer that is used to check that an unreferenced DB has been
1645 : // closed. We're closing the DB here, so the check performed by that
1646 : // finalizer isn't necessary.
1647 : //
1648 : // Note: this is a no-op if invariants are disabled or race is enabled.
1649 2 : invariants.SetFinalizer(d.closed, nil)
1650 2 :
1651 2 : d.closed.Store(errors.WithStack(ErrClosed))
1652 2 : close(d.closedCh)
1653 2 :
1654 2 : defer d.opts.Cache.Unref()
1655 2 :
1656 2 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
1657 2 : d.mu.compact.cond.Wait()
1658 2 : }
1659 2 : for d.mu.tableStats.loading {
1660 2 : d.mu.tableStats.cond.Wait()
1661 2 : }
1662 2 : for d.mu.tableValidation.validating {
1663 1 : d.mu.tableValidation.cond.Wait()
1664 1 : }
1665 :
1666 2 : var err error
1667 2 : if n := len(d.mu.compact.inProgress); n > 0 {
1668 1 : err = errors.Errorf("pebble: %d unexpected in-progress compactions", errors.Safe(n))
1669 1 : }
1670 2 : err = firstError(err, d.mu.formatVers.marker.Close())
1671 2 : err = firstError(err, d.tableCache.close())
1672 2 : if !d.opts.ReadOnly {
1673 2 : if d.mu.log.writer != nil {
1674 2 : _, err2 := d.mu.log.writer.Close()
1675 2 : err = firstError(err, err2)
1676 2 : }
1677 1 : } else if d.mu.log.writer != nil {
1678 0 : panic("pebble: log-writer should be nil in read-only mode")
1679 : }
1680 2 : err = firstError(err, d.mu.log.manager.Close())
1681 2 : err = firstError(err, d.fileLock.Close())
1682 2 :
1683 2 : // Note that versionSet.close() only closes the MANIFEST. The versions list
1684 2 : // is still valid for the checks below.
1685 2 : err = firstError(err, d.mu.versions.close())
1686 2 :
1687 2 : err = firstError(err, d.dataDir.Close())
1688 2 :
1689 2 : d.readState.val.unrefLocked()
1690 2 :
1691 2 : current := d.mu.versions.currentVersion()
1692 2 : for v := d.mu.versions.versions.Front(); true; v = v.Next() {
1693 2 : refs := v.Refs()
1694 2 : if v == current {
1695 2 : if refs != 1 {
1696 1 : err = firstError(err, errors.Errorf("leaked iterators: current\n%s", v))
1697 1 : }
1698 2 : break
1699 : }
1700 0 : if refs != 0 {
1701 0 : err = firstError(err, errors.Errorf("leaked iterators:\n%s", v))
1702 0 : }
1703 : }
1704 :
1705 2 : for _, mem := range d.mu.mem.queue {
1706 2 : // Usually, we'd want to delete the files returned by readerUnref. But
1707 2 : // in this case, even if we're unreferencing the flushables, the
1708 2 : // flushables aren't obsolete. They will be reconstructed during WAL
1709 2 : // replay.
1710 2 : mem.readerUnrefLocked(false)
1711 2 : }
1712 : // If there's an unused, recycled memtable, we need to release its memory.
1713 2 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
1714 2 : d.freeMemTable(obsoleteMemTable)
1715 2 : }
1716 2 : if reserved := d.memTableReserved.Load(); reserved != 0 {
1717 1 : err = firstError(err, errors.Errorf("leaked memtable reservation: %d", errors.Safe(reserved)))
1718 1 : }
1719 :
1720 : // Since we called d.readState.val.unrefLocked() above, we are expected to
1721 : // manually schedule deletion of obsolete files.
1722 2 : if len(d.mu.versions.obsoleteTables) > 0 {
1723 2 : d.deleteObsoleteFiles(d.newJobIDLocked())
1724 2 : }
1725 :
1726 2 : d.mu.Unlock()
1727 2 : d.compactionSchedulers.Wait()
1728 2 :
1729 2 : // Wait for all cleaning jobs to finish.
1730 2 : d.cleanupManager.Close()
1731 2 :
1732 2 : // Sanity check metrics.
1733 2 : if invariants.Enabled {
1734 2 : m := d.Metrics()
1735 2 : if m.Compact.NumInProgress > 0 || m.Compact.InProgressBytes > 0 {
1736 0 : d.mu.Lock()
1737 0 : panic(fmt.Sprintf("invalid metrics on close:\n%s", m))
1738 : }
1739 : }
1740 :
1741 2 : d.mu.Lock()
1742 2 :
1743 2 : // As a sanity check, ensure that there are no zombie tables. A non-zero count
1744 2 : // hints at a reference count leak.
1745 2 : if ztbls := len(d.mu.versions.zombieTables); ztbls > 0 {
1746 0 : err = firstError(err, errors.Errorf("non-zero zombie file count: %d", ztbls))
1747 0 : }
1748 :
1749 2 : err = firstError(err, d.objProvider.Close())
1750 2 :
1751 2 : // If the options include a closer to 'close' the filesystem, close it.
1752 2 : if d.opts.private.fsCloser != nil {
1753 2 : d.opts.private.fsCloser.Close()
1754 2 : }
1755 :
1756 : // Return an error if the user failed to close all open snapshots.
1757 2 : if v := d.mu.snapshots.count(); v > 0 {
1758 0 : err = firstError(err, errors.Errorf("leaked snapshots: %d open snapshots on DB %p", v, d))
1759 0 : }
1760 :
1761 2 : return err
1762 : }
1763 :
1764 : // Compact the specified range of keys in the database.
1765 2 : func (d *DB) Compact(start, end []byte, parallelize bool) error {
1766 2 : if err := d.closed.Load(); err != nil {
1767 1 : panic(err)
1768 : }
1769 2 : if d.opts.ReadOnly {
1770 1 : return ErrReadOnly
1771 1 : }
1772 2 : if d.cmp(start, end) >= 0 {
1773 2 : return errors.Errorf("Compact start %s is not less than end %s",
1774 2 : d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
1775 2 : }
1776 :
1777 2 : d.mu.Lock()
1778 2 : maxLevelWithFiles := 1
1779 2 : cur := d.mu.versions.currentVersion()
1780 2 : for level := 0; level < numLevels; level++ {
1781 2 : overlaps := cur.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
1782 2 : if !overlaps.Empty() {
1783 2 : maxLevelWithFiles = level + 1
1784 2 : }
1785 : }
1786 :
1787 : // Determine if any memtable overlaps with the compaction range. We wait for
1788 : // any such overlap to flush (initiating a flush if necessary).
1789 2 : mem, err := func() (*flushableEntry, error) {
1790 2 : // Check to see if any files overlap with any of the memtables. The queue
1791 2 : // is ordered from oldest to newest with the mutable memtable being the
1792 2 : // last element in the slice. We want to wait for the newest table that
1793 2 : // overlaps.
1794 2 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1795 2 : mem := d.mu.mem.queue[i]
1796 2 : var anyOverlaps bool
1797 2 : mem.computePossibleOverlaps(func(b bounded) shouldContinue {
1798 2 : anyOverlaps = true
1799 2 : return stopIteration
1800 2 : }, KeyRange{Start: start, End: end})
1801 2 : if !anyOverlaps {
1802 2 : continue
1803 : }
1804 2 : var err error
1805 2 : if mem.flushable == d.mu.mem.mutable {
1806 2 : // We have to hold both commitPipeline.mu and DB.mu when calling
1807 2 : // makeRoomForWrite(). Lock order requirements elsewhere force us to
1808 2 : // unlock DB.mu in order to grab commitPipeline.mu first.
1809 2 : d.mu.Unlock()
1810 2 : d.commit.mu.Lock()
1811 2 : d.mu.Lock()
1812 2 : defer d.commit.mu.Unlock()
1813 2 : if mem.flushable == d.mu.mem.mutable {
1814 2 : // Only flush if the active memtable is unchanged.
1815 2 : err = d.makeRoomForWrite(nil)
1816 2 : }
1817 : }
1818 2 : mem.flushForced = true
1819 2 : d.maybeScheduleFlush()
1820 2 : return mem, err
1821 : }
1822 2 : return nil, nil
1823 : }()
1824 :
1825 2 : d.mu.Unlock()
1826 2 :
1827 2 : if err != nil {
1828 0 : return err
1829 0 : }
1830 2 : if mem != nil {
1831 2 : <-mem.flushed
1832 2 : }
1833 :
1834 2 : for level := 0; level < maxLevelWithFiles; {
1835 2 : for {
1836 2 : if err := d.manualCompact(
1837 2 : start, end, level, parallelize); err != nil {
1838 1 : if errors.Is(err, ErrCancelledCompaction) {
1839 1 : continue
1840 : }
1841 1 : return err
1842 : }
1843 2 : break
1844 : }
1845 2 : level++
1846 2 : if level == numLevels-1 {
1847 2 : // A manual compaction of the bottommost level occurred.
1848 2 : // There is no next level to try and compact.
1849 2 : break
1850 : }
1851 : }
1852 2 : return nil
1853 : }
1854 :
1855 2 : func (d *DB) manualCompact(start, end []byte, level int, parallelize bool) error {
1856 2 : d.mu.Lock()
1857 2 : curr := d.mu.versions.currentVersion()
1858 2 : files := curr.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
1859 2 : if files.Empty() {
1860 2 : d.mu.Unlock()
1861 2 : return nil
1862 2 : }
1863 :
1864 2 : var compactions []*manualCompaction
1865 2 : if parallelize {
1866 2 : compactions = append(compactions, d.splitManualCompaction(start, end, level)...)
1867 2 : } else {
1868 2 : compactions = append(compactions, &manualCompaction{
1869 2 : level: level,
1870 2 : done: make(chan error, 1),
1871 2 : start: start,
1872 2 : end: end,
1873 2 : })
1874 2 : }
1875 2 : d.mu.compact.manual = append(d.mu.compact.manual, compactions...)
1876 2 : d.maybeScheduleCompaction()
1877 2 : d.mu.Unlock()
1878 2 :
1879 2 : // Each of the channels is guaranteed to be eventually sent to once. After a
1880 2 : // compaction is possibly picked in d.maybeScheduleCompaction(), either the
1881 2 : // compaction is dropped, executed after being scheduled, or retried later.
1882 2 : // Assuming eventual progress when a compaction is retried, all outcomes send
1883 2 : // a value to the done channel. Since the channels are buffered, it is not
1884 2 : // necessary to read from each channel, and so we can exit early in the event
1885 2 : // of an error.
1886 2 : for _, compaction := range compactions {
1887 2 : if err := <-compaction.done; err != nil {
1888 1 : return err
1889 1 : }
1890 : }
1891 2 : return nil
1892 : }
1893 :
1894 : // splitManualCompaction splits a manual compaction over [start,end] on level
1895 : // such that the resulting compactions have no key overlap.
1896 : func (d *DB) splitManualCompaction(
1897 : start, end []byte, level int,
1898 2 : ) (splitCompactions []*manualCompaction) {
1899 2 : curr := d.mu.versions.currentVersion()
1900 2 : endLevel := level + 1
1901 2 : baseLevel := d.mu.versions.picker.getBaseLevel()
1902 2 : if level == 0 {
1903 2 : endLevel = baseLevel
1904 2 : }
1905 2 : keyRanges := curr.CalculateInuseKeyRanges(level, endLevel, start, end)
1906 2 : for _, keyRange := range keyRanges {
1907 2 : splitCompactions = append(splitCompactions, &manualCompaction{
1908 2 : level: level,
1909 2 : done: make(chan error, 1),
1910 2 : start: keyRange.Start,
1911 2 : end: keyRange.End.Key,
1912 2 : split: true,
1913 2 : })
1914 2 : }
1915 2 : return splitCompactions
1916 : }
1917 :
1918 : // Flush the memtable to stable storage.
1919 2 : func (d *DB) Flush() error {
1920 2 : flushDone, err := d.AsyncFlush()
1921 2 : if err != nil {
1922 1 : return err
1923 1 : }
1924 2 : <-flushDone
1925 2 : return nil
1926 : }
1927 :
1928 : // AsyncFlush asynchronously flushes the memtable to stable storage.
1929 : //
1930 : // If no error is returned, the caller can receive from the returned channel in
1931 : // order to wait for the flush to complete.
1932 2 : func (d *DB) AsyncFlush() (<-chan struct{}, error) {
1933 2 : if err := d.closed.Load(); err != nil {
1934 1 : panic(err)
1935 : }
1936 2 : if d.opts.ReadOnly {
1937 1 : return nil, ErrReadOnly
1938 1 : }
1939 :
1940 2 : d.commit.mu.Lock()
1941 2 : defer d.commit.mu.Unlock()
1942 2 : d.mu.Lock()
1943 2 : defer d.mu.Unlock()
1944 2 : flushed := d.mu.mem.queue[len(d.mu.mem.queue)-1].flushed
1945 2 : err := d.makeRoomForWrite(nil)
1946 2 : if err != nil {
1947 0 : return nil, err
1948 0 : }
1949 2 : return flushed, nil
1950 : }
1951 :
1952 : // Metrics returns metrics about the database.
1953 2 : func (d *DB) Metrics() *Metrics {
1954 2 : metrics := &Metrics{}
1955 2 : walStats := d.mu.log.manager.Stats()
1956 2 :
1957 2 : d.mu.Lock()
1958 2 : vers := d.mu.versions.currentVersion()
1959 2 : *metrics = d.mu.versions.metrics
1960 2 : metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt(0)
1961 2 : metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load()
1962 2 : // TODO(radu): split this to separate the download compactions.
1963 2 : metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount + d.mu.compact.downloadingCount)
1964 2 : metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction
1965 2 : metrics.Compact.Duration = d.mu.compact.duration
1966 2 : for c := range d.mu.compact.inProgress {
1967 1 : if c.kind != compactionKindFlush && c.kind != compactionKindIngestedFlushable {
1968 1 : metrics.Compact.Duration += d.timeNow().Sub(c.beganAt)
1969 1 : }
1970 : }
1971 :
1972 2 : for _, m := range d.mu.mem.queue {
1973 2 : metrics.MemTable.Size += m.totalBytes()
1974 2 : }
1975 2 : metrics.Snapshots.Count = d.mu.snapshots.count()
1976 2 : if metrics.Snapshots.Count > 0 {
1977 0 : metrics.Snapshots.EarliestSeqNum = d.mu.snapshots.earliest()
1978 0 : }
1979 2 : metrics.Snapshots.PinnedKeys = d.mu.snapshots.cumulativePinnedCount
1980 2 : metrics.Snapshots.PinnedSize = d.mu.snapshots.cumulativePinnedSize
1981 2 : metrics.MemTable.Count = int64(len(d.mu.mem.queue))
1982 2 : metrics.MemTable.ZombieCount = d.memTableCount.Load() - metrics.MemTable.Count
1983 2 : metrics.MemTable.ZombieSize = uint64(d.memTableReserved.Load()) - metrics.MemTable.Size
1984 2 : metrics.WAL.ObsoleteFiles = int64(walStats.ObsoleteFileCount)
1985 2 : metrics.WAL.ObsoletePhysicalSize = walStats.ObsoleteFileSize
1986 2 : metrics.WAL.Files = int64(walStats.LiveFileCount)
1987 2 : // The current WAL's size (d.logSize) is the logical size, which may be less
1988 2 : // than the WAL's physical size if it was recycled. walStats.LiveFileSize
1989 2 : // includes the physical size of all live WALs, but for the current WAL it
1990 2 : // reflects the physical size when it was opened. So it is possible that
1991 2 : // d.atomic.logSize has exceeded that physical size. We allow for this
1992 2 : // anomaly.
1993 2 : metrics.WAL.PhysicalSize = walStats.LiveFileSize
1994 2 : metrics.WAL.BytesIn = d.logBytesIn.Load()
1995 2 : metrics.WAL.Size = d.logSize.Load()
1996 2 : for i, n := 0, len(d.mu.mem.queue)-1; i < n; i++ {
1997 2 : metrics.WAL.Size += d.mu.mem.queue[i].logSize
1998 2 : }
1999 2 : metrics.WAL.BytesWritten = metrics.Levels[0].BytesIn + metrics.WAL.Size
2000 2 : metrics.WAL.Failover = walStats.Failover
2001 2 :
2002 2 : if p := d.mu.versions.picker; p != nil {
2003 2 : compactions := d.getInProgressCompactionInfoLocked(nil)
2004 2 : for level, score := range p.getScores(compactions) {
2005 2 : metrics.Levels[level].Score = score
2006 2 : }
2007 : }
2008 2 : metrics.Table.ZombieCount = int64(len(d.mu.versions.zombieTables))
2009 2 : for _, info := range d.mu.versions.zombieTables {
2010 1 : metrics.Table.ZombieSize += info.FileSize
2011 1 : if info.isLocal {
2012 1 : metrics.Table.Local.ZombieSize += info.FileSize
2013 1 : }
2014 : }
2015 2 : metrics.private.optionsFileSize = d.optionsFileSize
2016 2 :
2017 2 : // TODO(jackson): Consider making these metrics optional.
2018 2 : metrics.Keys.RangeKeySetsCount = *rangeKeySetsAnnotator.MultiLevelAnnotation(vers.RangeKeyLevels[:])
2019 2 : metrics.Keys.TombstoneCount = *tombstonesAnnotator.MultiLevelAnnotation(vers.Levels[:])
2020 2 :
2021 2 : d.mu.versions.logLock()
2022 2 : metrics.private.manifestFileSize = uint64(d.mu.versions.manifest.Size())
2023 2 : backingCount, backingTotalSize := d.mu.versions.virtualBackings.Stats()
2024 2 : metrics.Table.BackingTableCount = uint64(backingCount)
2025 2 : metrics.Table.BackingTableSize = backingTotalSize
2026 2 : d.mu.versions.logUnlock()
2027 2 :
2028 2 : metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
2029 2 : if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
2030 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2031 0 : }
2032 2 : metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput
2033 2 : if d.mu.compact.flushing {
2034 1 : metrics.Flush.NumInProgress = 1
2035 1 : }
2036 2 : for i := 0; i < numLevels; i++ {
2037 2 : metrics.Levels[i].Additional.ValueBlocksSize = *valueBlockSizeAnnotator.LevelAnnotation(vers.Levels[i])
2038 2 : compressionTypes := compressionTypeAnnotator.LevelAnnotation(vers.Levels[i])
2039 2 : metrics.Table.CompressedCountUnknown += int64(compressionTypes.unknown)
2040 2 : metrics.Table.CompressedCountSnappy += int64(compressionTypes.snappy)
2041 2 : metrics.Table.CompressedCountZstd += int64(compressionTypes.zstd)
2042 2 : metrics.Table.CompressedCountNone += int64(compressionTypes.none)
2043 2 : }
2044 :
2045 2 : d.mu.Unlock()
2046 2 :
2047 2 : metrics.BlockCache = d.opts.Cache.Metrics()
2048 2 : metrics.TableCache, metrics.Filter = d.tableCache.metrics()
2049 2 : metrics.TableIters = int64(d.tableCache.iterCount())
2050 2 : metrics.CategoryStats = d.tableCache.dbOpts.sstStatsCollector.GetStats()
2051 2 :
2052 2 : metrics.SecondaryCacheMetrics = d.objProvider.Metrics()
2053 2 :
2054 2 : metrics.Uptime = d.timeNow().Sub(d.openedAt)
2055 2 :
2056 2 : metrics.manualMemory = manual.GetMetrics()
2057 2 :
2058 2 : return metrics
2059 : }
2060 :
2061 : // sstablesOptions hold the optional parameters to retrieve TableInfo for all sstables.
2062 : type sstablesOptions struct {
2063 : // set to true will return the sstable properties in TableInfo
2064 : withProperties bool
2065 :
2066 : // if set, return sstables that overlap the key range (end-exclusive)
2067 : start []byte
2068 : end []byte
2069 :
2070 : withApproximateSpanBytes bool
2071 : }
2072 :
2073 : // SSTablesOption set optional parameter used by `DB.SSTables`.
2074 : type SSTablesOption func(*sstablesOptions)
2075 :
2076 : // WithProperties enable return sstable properties in each TableInfo.
2077 : //
2078 : // NOTE: if most of the sstable properties need to be read from disk,
2079 : // this options may make method `SSTables` quite slow.
2080 1 : func WithProperties() SSTablesOption {
2081 1 : return func(opt *sstablesOptions) {
2082 1 : opt.withProperties = true
2083 1 : }
2084 : }
2085 :
2086 : // WithKeyRangeFilter ensures returned sstables overlap start and end (end-exclusive)
2087 : // if start and end are both nil these properties have no effect.
2088 1 : func WithKeyRangeFilter(start, end []byte) SSTablesOption {
2089 1 : return func(opt *sstablesOptions) {
2090 1 : opt.end = end
2091 1 : opt.start = start
2092 1 : }
2093 : }
2094 :
2095 : // WithApproximateSpanBytes enables capturing the approximate number of bytes that
2096 : // overlap the provided key span for each sstable.
2097 : // NOTE: This option requires WithKeyRangeFilter.
2098 1 : func WithApproximateSpanBytes() SSTablesOption {
2099 1 : return func(opt *sstablesOptions) {
2100 1 : opt.withApproximateSpanBytes = true
2101 1 : }
2102 : }
2103 :
2104 : // BackingType denotes the type of storage backing a given sstable.
2105 : type BackingType int
2106 :
2107 : const (
2108 : // BackingTypeLocal denotes an sstable stored on local disk according to the
2109 : // objprovider. This file is completely owned by us.
2110 : BackingTypeLocal BackingType = iota
2111 : // BackingTypeShared denotes an sstable stored on shared storage, created
2112 : // by this Pebble instance and possibly shared by other Pebble instances.
2113 : // These types of files have lifecycle managed by Pebble.
2114 : BackingTypeShared
2115 : // BackingTypeSharedForeign denotes an sstable stored on shared storage,
2116 : // created by a Pebble instance other than this one. These types of files have
2117 : // lifecycle managed by Pebble.
2118 : BackingTypeSharedForeign
2119 : // BackingTypeExternal denotes an sstable stored on external storage,
2120 : // not owned by any Pebble instance and with no refcounting/cleanup methods
2121 : // or lifecycle management. An example of an external file is a file restored
2122 : // from a backup.
2123 : BackingTypeExternal
2124 : backingTypeCount
2125 : )
2126 :
2127 : var backingTypeToString = [backingTypeCount]string{
2128 : BackingTypeLocal: "local",
2129 : BackingTypeShared: "shared",
2130 : BackingTypeSharedForeign: "shared-foreign",
2131 : BackingTypeExternal: "external",
2132 : }
2133 :
2134 : // String implements fmt.Stringer.
2135 0 : func (b BackingType) String() string {
2136 0 : return backingTypeToString[b]
2137 0 : }
2138 :
2139 : // SSTableInfo export manifest.TableInfo with sstable.Properties alongside
2140 : // other file backing info.
2141 : type SSTableInfo struct {
2142 : manifest.TableInfo
2143 : // Virtual indicates whether the sstable is virtual.
2144 : Virtual bool
2145 : // BackingSSTNum is the disk file number associated with the backing sstable.
2146 : // If Virtual is false, BackingSSTNum == PhysicalTableDiskFileNum(FileNum).
2147 : BackingSSTNum base.DiskFileNum
2148 : // BackingType is the type of storage backing this sstable.
2149 : BackingType BackingType
2150 : // Locator is the remote.Locator backing this sstable, if the backing type is
2151 : // not BackingTypeLocal.
2152 : Locator remote.Locator
2153 : // ApproximateSpanBytes describes the approximate number of bytes within the
2154 : // sstable that fall within a particular span. It's populated only when the
2155 : // ApproximateSpanBytes option is passed into DB.SSTables.
2156 : ApproximateSpanBytes uint64 `json:"ApproximateSpanBytes,omitempty"`
2157 :
2158 : // Properties is the sstable properties of this table. If Virtual is true,
2159 : // then the Properties are associated with the backing sst.
2160 : Properties *sstable.Properties
2161 : }
2162 :
2163 : // SSTables retrieves the current sstables. The returned slice is indexed by
2164 : // level and each level is indexed by the position of the sstable within the
2165 : // level. Note that this information may be out of date due to concurrent
2166 : // flushes and compactions.
2167 1 : func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
2168 1 : opt := &sstablesOptions{}
2169 1 : for _, fn := range opts {
2170 1 : fn(opt)
2171 1 : }
2172 :
2173 1 : if opt.withApproximateSpanBytes && (opt.start == nil || opt.end == nil) {
2174 1 : return nil, errors.Errorf("cannot use WithApproximateSpanBytes without WithKeyRangeFilter option")
2175 1 : }
2176 :
2177 : // Grab and reference the current readState.
2178 1 : readState := d.loadReadState()
2179 1 : defer readState.unref()
2180 1 :
2181 1 : // TODO(peter): This is somewhat expensive, especially on a large
2182 1 : // database. It might be worthwhile to unify TableInfo and FileMetadata and
2183 1 : // then we could simply return current.Files. Note that RocksDB is doing
2184 1 : // something similar to the current code, so perhaps it isn't too bad.
2185 1 : srcLevels := readState.current.Levels
2186 1 : var totalTables int
2187 1 : for i := range srcLevels {
2188 1 : totalTables += srcLevels[i].Len()
2189 1 : }
2190 :
2191 1 : destTables := make([]SSTableInfo, totalTables)
2192 1 : destLevels := make([][]SSTableInfo, len(srcLevels))
2193 1 : for i := range destLevels {
2194 1 : iter := srcLevels[i].Iter()
2195 1 : j := 0
2196 1 : for m := iter.First(); m != nil; m = iter.Next() {
2197 1 : if opt.start != nil && opt.end != nil {
2198 1 : b := base.UserKeyBoundsEndExclusive(opt.start, opt.end)
2199 1 : if !m.Overlaps(d.opts.Comparer.Compare, &b) {
2200 1 : continue
2201 : }
2202 : }
2203 1 : destTables[j] = SSTableInfo{TableInfo: m.TableInfo()}
2204 1 : if opt.withProperties {
2205 1 : p, err := d.tableCache.getTableProperties(
2206 1 : m,
2207 1 : )
2208 1 : if err != nil {
2209 0 : return nil, err
2210 0 : }
2211 1 : destTables[j].Properties = p
2212 : }
2213 1 : destTables[j].Virtual = m.Virtual
2214 1 : destTables[j].BackingSSTNum = m.FileBacking.DiskFileNum
2215 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, m.FileBacking.DiskFileNum)
2216 1 : if err != nil {
2217 0 : return nil, err
2218 0 : }
2219 1 : if objMeta.IsRemote() {
2220 0 : if objMeta.IsShared() {
2221 0 : if d.objProvider.IsSharedForeign(objMeta) {
2222 0 : destTables[j].BackingType = BackingTypeSharedForeign
2223 0 : } else {
2224 0 : destTables[j].BackingType = BackingTypeShared
2225 0 : }
2226 0 : } else {
2227 0 : destTables[j].BackingType = BackingTypeExternal
2228 0 : }
2229 0 : destTables[j].Locator = objMeta.Remote.Locator
2230 1 : } else {
2231 1 : destTables[j].BackingType = BackingTypeLocal
2232 1 : }
2233 :
2234 1 : if opt.withApproximateSpanBytes {
2235 1 : if m.ContainedWithinSpan(d.opts.Comparer.Compare, opt.start, opt.end) {
2236 1 : destTables[j].ApproximateSpanBytes = m.Size
2237 1 : } else {
2238 1 : size, err := d.tableCache.estimateSize(m, opt.start, opt.end)
2239 1 : if err != nil {
2240 0 : return nil, err
2241 0 : }
2242 1 : destTables[j].ApproximateSpanBytes = size
2243 : }
2244 : }
2245 1 : j++
2246 : }
2247 1 : destLevels[i] = destTables[:j]
2248 1 : destTables = destTables[j:]
2249 : }
2250 :
2251 1 : return destLevels, nil
2252 : }
2253 :
2254 : // makeFileSizeAnnotator returns an annotator that computes the total size of
2255 : // files that meet some criteria defined by filter.
2256 2 : func (d *DB) makeFileSizeAnnotator(filter func(f *fileMetadata) bool) *manifest.Annotator[uint64] {
2257 2 : return &manifest.Annotator[uint64]{
2258 2 : Aggregator: manifest.SumAggregator{
2259 2 : AccumulateFunc: func(f *fileMetadata) (uint64, bool) {
2260 1 : if filter(f) {
2261 1 : return f.Size, true
2262 1 : }
2263 1 : return 0, true
2264 : },
2265 1 : AccumulatePartialOverlapFunc: func(f *fileMetadata, bounds base.UserKeyBounds) uint64 {
2266 1 : if filter(f) {
2267 1 : size, err := d.tableCache.estimateSize(f, bounds.Start, bounds.End.Key)
2268 1 : if err != nil {
2269 0 : return 0
2270 0 : }
2271 1 : return size
2272 : }
2273 1 : return 0
2274 : },
2275 : },
2276 : }
2277 : }
2278 :
2279 : // EstimateDiskUsage returns the estimated filesystem space used in bytes for
2280 : // storing the range `[start, end]`. The estimation is computed as follows:
2281 : //
2282 : // - For sstables fully contained in the range the whole file size is included.
2283 : // - For sstables partially contained in the range the overlapping data block sizes
2284 : // are included. Even if a data block partially overlaps, or we cannot determine
2285 : // overlap due to abbreviated index keys, the full data block size is included in
2286 : // the estimation. Note that unlike fully contained sstables, none of the
2287 : // meta-block space is counted for partially overlapped files.
2288 : // - For virtual sstables, we use the overlap between start, end and the virtual
2289 : // sstable bounds to determine disk usage.
2290 : // - There may also exist WAL entries for unflushed keys in this range. This
2291 : // estimation currently excludes space used for the range in the WAL.
2292 1 : func (d *DB) EstimateDiskUsage(start, end []byte) (uint64, error) {
2293 1 : bytes, _, _, err := d.EstimateDiskUsageByBackingType(start, end)
2294 1 : return bytes, err
2295 1 : }
2296 :
2297 : // EstimateDiskUsageByBackingType is like EstimateDiskUsage but additionally
2298 : // returns the subsets of that size in remote ane external files.
2299 : func (d *DB) EstimateDiskUsageByBackingType(
2300 : start, end []byte,
2301 1 : ) (totalSize, remoteSize, externalSize uint64, _ error) {
2302 1 : if err := d.closed.Load(); err != nil {
2303 0 : panic(err)
2304 : }
2305 :
2306 1 : bounds := base.UserKeyBoundsInclusive(start, end)
2307 1 : if !bounds.Valid(d.cmp) {
2308 0 : return 0, 0, 0, errors.New("invalid key-range specified (start > end)")
2309 0 : }
2310 :
2311 : // Grab and reference the current readState. This prevents the underlying
2312 : // files in the associated version from being deleted if there is a concurrent
2313 : // compaction.
2314 1 : readState := d.loadReadState()
2315 1 : defer readState.unref()
2316 1 :
2317 1 : totalSize = *d.mu.annotators.totalSize.VersionRangeAnnotation(readState.current, bounds)
2318 1 : remoteSize = *d.mu.annotators.remoteSize.VersionRangeAnnotation(readState.current, bounds)
2319 1 : externalSize = *d.mu.annotators.externalSize.VersionRangeAnnotation(readState.current, bounds)
2320 1 : return
2321 : }
2322 :
2323 2 : func (d *DB) walPreallocateSize() int {
2324 2 : // Set the WAL preallocate size to 110% of the memtable size. Note that there
2325 2 : // is a bit of apples and oranges in units here as the memtabls size
2326 2 : // corresponds to the memory usage of the memtable while the WAL size is the
2327 2 : // size of the batches (plus overhead) stored in the WAL.
2328 2 : //
2329 2 : // TODO(peter): 110% of the memtable size is quite hefty for a block
2330 2 : // size. This logic is taken from GetWalPreallocateBlockSize in
2331 2 : // RocksDB. Could a smaller preallocation block size be used?
2332 2 : size := d.opts.MemTableSize
2333 2 : size = (size / 10) + size
2334 2 : return int(size)
2335 2 : }
2336 :
2337 : func (d *DB) newMemTable(
2338 : logNum base.DiskFileNum, logSeqNum base.SeqNum, minSize uint64,
2339 2 : ) (*memTable, *flushableEntry) {
2340 2 : targetSize := minSize + uint64(memTableEmptySize)
2341 2 : // The targetSize should be less than MemTableSize, because any batch >=
2342 2 : // MemTableSize/2 should be treated as a large flushable batch.
2343 2 : if targetSize > d.opts.MemTableSize {
2344 0 : panic(errors.AssertionFailedf("attempting to allocate memtable larger than MemTableSize"))
2345 : }
2346 : // Double until the next memtable size is at least large enough to fit
2347 : // minSize.
2348 2 : for d.mu.mem.nextSize < targetSize {
2349 1 : d.mu.mem.nextSize = min(2*d.mu.mem.nextSize, d.opts.MemTableSize)
2350 1 : }
2351 2 : size := d.mu.mem.nextSize
2352 2 : // The next memtable should be double the size, up to Options.MemTableSize.
2353 2 : if d.mu.mem.nextSize < d.opts.MemTableSize {
2354 2 : d.mu.mem.nextSize = min(2*d.mu.mem.nextSize, d.opts.MemTableSize)
2355 2 : }
2356 :
2357 2 : memtblOpts := memTableOptions{
2358 2 : Options: d.opts,
2359 2 : logSeqNum: logSeqNum,
2360 2 : }
2361 2 :
2362 2 : // Before attempting to allocate a new memtable, check if there's one
2363 2 : // available for recycling in memTableRecycle. Large contiguous allocations
2364 2 : // can be costly as fragmentation makes it more difficult to find a large
2365 2 : // contiguous free space. We've observed 64MB allocations taking 10ms+.
2366 2 : //
2367 2 : // To reduce these costly allocations, up to 1 obsolete memtable is stashed
2368 2 : // in `d.memTableRecycle` to allow a future memtable rotation to reuse
2369 2 : // existing memory.
2370 2 : var mem *memTable
2371 2 : mem = d.memTableRecycle.Swap(nil)
2372 2 : if mem != nil && uint64(len(mem.arenaBuf)) != size {
2373 2 : d.freeMemTable(mem)
2374 2 : mem = nil
2375 2 : }
2376 2 : if mem != nil {
2377 2 : // Carry through the existing buffer and memory reservation.
2378 2 : memtblOpts.arenaBuf = mem.arenaBuf
2379 2 : memtblOpts.releaseAccountingReservation = mem.releaseAccountingReservation
2380 2 : } else {
2381 2 : mem = new(memTable)
2382 2 : memtblOpts.arenaBuf = manual.New(manual.MemTable, int(size))
2383 2 : memtblOpts.releaseAccountingReservation = d.opts.Cache.Reserve(int(size))
2384 2 : d.memTableCount.Add(1)
2385 2 : d.memTableReserved.Add(int64(size))
2386 2 :
2387 2 : // Note: this is a no-op if invariants are disabled or race is enabled.
2388 2 : invariants.SetFinalizer(mem, checkMemTable)
2389 2 : }
2390 2 : mem.init(memtblOpts)
2391 2 :
2392 2 : entry := d.newFlushableEntry(mem, logNum, logSeqNum)
2393 2 : entry.releaseMemAccounting = func() {
2394 2 : // If the user leaks iterators, we may be releasing the memtable after
2395 2 : // the DB is already closed. In this case, we want to just release the
2396 2 : // memory because DB.Close won't come along to free it for us.
2397 2 : if err := d.closed.Load(); err != nil {
2398 2 : d.freeMemTable(mem)
2399 2 : return
2400 2 : }
2401 :
2402 : // The next memtable allocation might be able to reuse this memtable.
2403 : // Stash it on d.memTableRecycle.
2404 2 : if unusedMem := d.memTableRecycle.Swap(mem); unusedMem != nil {
2405 2 : // There was already a memtable waiting to be recycled. We're now
2406 2 : // responsible for freeing it.
2407 2 : d.freeMemTable(unusedMem)
2408 2 : }
2409 : }
2410 2 : return mem, entry
2411 : }
2412 :
2413 2 : func (d *DB) freeMemTable(m *memTable) {
2414 2 : d.memTableCount.Add(-1)
2415 2 : d.memTableReserved.Add(-int64(len(m.arenaBuf)))
2416 2 : m.free()
2417 2 : }
2418 :
2419 : func (d *DB) newFlushableEntry(
2420 : f flushable, logNum base.DiskFileNum, logSeqNum base.SeqNum,
2421 2 : ) *flushableEntry {
2422 2 : fe := &flushableEntry{
2423 2 : flushable: f,
2424 2 : flushed: make(chan struct{}),
2425 2 : logNum: logNum,
2426 2 : logSeqNum: logSeqNum,
2427 2 : deleteFn: d.mu.versions.addObsolete,
2428 2 : deleteFnLocked: d.mu.versions.addObsoleteLocked,
2429 2 : }
2430 2 : fe.readerRefs.Store(1)
2431 2 : return fe
2432 2 : }
2433 :
2434 : // maybeInduceWriteStall is called before performing a memtable rotation in
2435 : // makeRoomForWrite. In some conditions, we prefer to stall the user's write
2436 : // workload rather than continuing to accept writes that may result in resource
2437 : // exhaustion or prohibitively slow reads.
2438 : //
2439 : // There are a couple reasons we might wait to rotate the memtable and
2440 : // instead induce a write stall:
2441 : // 1. If too many memtables have queued, we wait for a flush to finish before
2442 : // creating another memtable.
2443 : // 2. If L0 read amplification has grown too high, we wait for compactions
2444 : // to reduce the read amplification before accepting more writes that will
2445 : // increase write pressure.
2446 : //
2447 : // maybeInduceWriteStall checks these stall conditions, and if present, waits
2448 : // for them to abate.
2449 2 : func (d *DB) maybeInduceWriteStall(b *Batch) {
2450 2 : stalled := false
2451 2 : // This function will call EventListener.WriteStallBegin at most once. If
2452 2 : // it does call it, it will call EventListener.WriteStallEnd once before
2453 2 : // returning.
2454 2 : for {
2455 2 : var size uint64
2456 2 : for i := range d.mu.mem.queue {
2457 2 : size += d.mu.mem.queue[i].totalBytes()
2458 2 : }
2459 : // If ElevateWriteStallThresholdForFailover is true, we give an
2460 : // unlimited memory budget for memtables. This is simpler than trying to
2461 : // configure an explicit value, given that memory resources can vary.
2462 : // When using WAL failover in CockroachDB, an OOM risk is worth
2463 : // tolerating for workloads that have a strict latency SLO. Also, an
2464 : // unlimited budget here does not mean that the disk stall in the
2465 : // primary will go unnoticed until the OOM -- CockroachDB is monitoring
2466 : // disk stalls, and we expect it to fail the node after ~60s if the
2467 : // primary is stalled.
2468 2 : if size >= uint64(d.opts.MemTableStopWritesThreshold)*d.opts.MemTableSize &&
2469 2 : !d.mu.log.manager.ElevateWriteStallThresholdForFailover() {
2470 2 : // We have filled up the current memtable, but already queued memtables
2471 2 : // are still flushing, so we wait.
2472 2 : if !stalled {
2473 2 : stalled = true
2474 2 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2475 2 : Reason: "memtable count limit reached",
2476 2 : })
2477 2 : }
2478 2 : beforeWait := crtime.NowMono()
2479 2 : d.mu.compact.cond.Wait()
2480 2 : if b != nil {
2481 2 : b.commitStats.MemTableWriteStallDuration += beforeWait.Elapsed()
2482 2 : }
2483 2 : continue
2484 : }
2485 2 : l0ReadAmp := d.mu.versions.currentVersion().L0Sublevels.ReadAmplification()
2486 2 : if l0ReadAmp >= d.opts.L0StopWritesThreshold {
2487 2 : // There are too many level-0 files, so we wait.
2488 2 : if !stalled {
2489 2 : stalled = true
2490 2 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2491 2 : Reason: "L0 file count limit exceeded",
2492 2 : })
2493 2 : }
2494 2 : beforeWait := crtime.NowMono()
2495 2 : d.mu.compact.cond.Wait()
2496 2 : if b != nil {
2497 2 : b.commitStats.L0ReadAmpWriteStallDuration += beforeWait.Elapsed()
2498 2 : }
2499 2 : continue
2500 : }
2501 : // Not stalled.
2502 2 : if stalled {
2503 2 : d.opts.EventListener.WriteStallEnd()
2504 2 : }
2505 2 : return
2506 : }
2507 : }
2508 :
2509 : // makeRoomForWrite rotates the current mutable memtable, ensuring that the
2510 : // resulting mutable memtable has room to hold the contents of the provided
2511 : // Batch. The current memtable is rotated (marked as immutable) and a new
2512 : // mutable memtable is allocated. It reserves space in the new memtable and adds
2513 : // a reference to the memtable. The caller must later ensure that the memtable
2514 : // is unreferenced. This memtable rotation also causes a log rotation.
2515 : //
2516 : // If the current memtable is not full but the caller wishes to trigger a
2517 : // rotation regardless, the caller may pass a nil Batch, and no space in the
2518 : // resulting mutable memtable will be reserved.
2519 : //
2520 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2521 : // may be released and reacquired.
2522 2 : func (d *DB) makeRoomForWrite(b *Batch) error {
2523 2 : if b != nil && b.ingestedSSTBatch {
2524 0 : panic("pebble: invalid function call")
2525 : }
2526 2 : d.maybeInduceWriteStall(b)
2527 2 :
2528 2 : var newLogNum base.DiskFileNum
2529 2 : var prevLogSize uint64
2530 2 : if !d.opts.DisableWAL {
2531 2 : beforeRotate := crtime.NowMono()
2532 2 : newLogNum, prevLogSize = d.rotateWAL()
2533 2 : if b != nil {
2534 2 : b.commitStats.WALRotationDuration += beforeRotate.Elapsed()
2535 2 : }
2536 : }
2537 2 : immMem := d.mu.mem.mutable
2538 2 : imm := d.mu.mem.queue[len(d.mu.mem.queue)-1]
2539 2 : imm.logSize = prevLogSize
2540 2 :
2541 2 : var logSeqNum base.SeqNum
2542 2 : var minSize uint64
2543 2 : if b != nil {
2544 2 : logSeqNum = b.SeqNum()
2545 2 : if b.flushable != nil {
2546 2 : logSeqNum += base.SeqNum(b.Count())
2547 2 : // The batch is too large to fit in the memtable so add it directly to
2548 2 : // the immutable queue. The flushable batch is associated with the same
2549 2 : // log as the immutable memtable, but logically occurs after it in
2550 2 : // seqnum space. We ensure while flushing that the flushable batch
2551 2 : // is flushed along with the previous memtable in the flushable
2552 2 : // queue. See the top level comment in DB.flush1 to learn how this
2553 2 : // is ensured.
2554 2 : //
2555 2 : // See DB.commitWrite for the special handling of log writes for large
2556 2 : // batches. In particular, the large batch has already written to
2557 2 : // imm.logNum.
2558 2 : entry := d.newFlushableEntry(b.flushable, imm.logNum, b.SeqNum())
2559 2 : // The large batch is by definition large. Reserve space from the cache
2560 2 : // for it until it is flushed.
2561 2 : entry.releaseMemAccounting = d.opts.Cache.Reserve(int(b.flushable.totalBytes()))
2562 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2563 2 : } else {
2564 2 : minSize = b.memTableSize
2565 2 : }
2566 2 : } else {
2567 2 : // b == nil
2568 2 : //
2569 2 : // This is a manual forced flush.
2570 2 : logSeqNum = base.SeqNum(d.mu.versions.logSeqNum.Load())
2571 2 : imm.flushForced = true
2572 2 : // If we are manually flushing and we used less than half of the bytes in
2573 2 : // the memtable, don't increase the size for the next memtable. This
2574 2 : // reduces memtable memory pressure when an application is frequently
2575 2 : // manually flushing.
2576 2 : if uint64(immMem.availBytes()) > immMem.totalBytes()/2 {
2577 2 : d.mu.mem.nextSize = immMem.totalBytes()
2578 2 : }
2579 : }
2580 2 : d.rotateMemtable(newLogNum, logSeqNum, immMem, minSize)
2581 2 : if b != nil && b.flushable == nil {
2582 2 : err := d.mu.mem.mutable.prepare(b)
2583 2 : // Reserving enough space for the batch after rotation must never fail.
2584 2 : // We pass in a minSize that's equal to b.memtableSize to ensure that
2585 2 : // memtable rotation allocates a memtable sufficiently large. We also
2586 2 : // held d.commit.mu for the entirety of this function, ensuring that no
2587 2 : // other committers may have reserved memory in the new memtable yet.
2588 2 : if err == arenaskl.ErrArenaFull {
2589 0 : panic(errors.AssertionFailedf("memtable still full after rotation"))
2590 : }
2591 2 : return err
2592 : }
2593 2 : return nil
2594 : }
2595 :
2596 : // Both DB.mu and commitPipeline.mu must be held by the caller.
2597 : func (d *DB) rotateMemtable(
2598 : newLogNum base.DiskFileNum, logSeqNum base.SeqNum, prev *memTable, minSize uint64,
2599 2 : ) {
2600 2 : // Create a new memtable, scheduling the previous one for flushing. We do
2601 2 : // this even if the previous memtable was empty because the DB.Flush
2602 2 : // mechanism is dependent on being able to wait for the empty memtable to
2603 2 : // flush. We can't just mark the empty memtable as flushed here because we
2604 2 : // also have to wait for all previous immutable tables to
2605 2 : // flush. Additionally, the memtable is tied to particular WAL file and we
2606 2 : // want to go through the flush path in order to recycle that WAL file.
2607 2 : //
2608 2 : // NB: newLogNum corresponds to the WAL that contains mutations that are
2609 2 : // present in the new memtable. When immutable memtables are flushed to
2610 2 : // disk, a VersionEdit will be created telling the manifest the minimum
2611 2 : // unflushed log number (which will be the next one in d.mu.mem.mutable
2612 2 : // that was not flushed).
2613 2 : //
2614 2 : // NB: prev should be the current mutable memtable.
2615 2 : var entry *flushableEntry
2616 2 : d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum, minSize)
2617 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2618 2 : // d.logSize tracks the log size of the WAL file corresponding to the most
2619 2 : // recent flushable. The log size of the previous mutable memtable no longer
2620 2 : // applies to the current mutable memtable.
2621 2 : //
2622 2 : // It's tempting to perform this update in rotateWAL, but that would not be
2623 2 : // atomic with the enqueue of the new flushable. A call to DB.Metrics()
2624 2 : // could acquire DB.mu after the WAL has been rotated but before the new
2625 2 : // memtable has been appended; this would result in omitting the log size of
2626 2 : // the most recent flushable.
2627 2 : d.logSize.Store(0)
2628 2 : d.updateReadStateLocked(nil)
2629 2 : if prev.writerUnref() {
2630 2 : d.maybeScheduleFlush()
2631 2 : }
2632 : }
2633 :
2634 : // rotateWAL creates a new write-ahead log, possibly recycling a previous WAL's
2635 : // files. It returns the file number assigned to the new WAL, and the size of
2636 : // the previous WAL file.
2637 : //
2638 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2639 : // may be released and reacquired.
2640 2 : func (d *DB) rotateWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
2641 2 : if d.opts.DisableWAL {
2642 0 : panic("pebble: invalid function call")
2643 : }
2644 2 : jobID := d.newJobIDLocked()
2645 2 : newLogNum = d.mu.versions.getNextDiskFileNum()
2646 2 :
2647 2 : d.mu.Unlock()
2648 2 : // Close the previous log first. This writes an EOF trailer
2649 2 : // signifying the end of the file and syncs it to disk. We must
2650 2 : // close the previous log before linking the new log file,
2651 2 : // otherwise a crash could leave both logs with unclean tails, and
2652 2 : // Open will treat the previous log as corrupt.
2653 2 : offset, err := d.mu.log.writer.Close()
2654 2 : if err != nil {
2655 0 : // What to do here? Stumbling on doesn't seem worthwhile. If we failed to
2656 0 : // close the previous log it is possible we lost a write.
2657 0 : panic(err)
2658 : }
2659 2 : prevLogSize = uint64(offset)
2660 2 : metrics := d.mu.log.writer.Metrics()
2661 2 :
2662 2 : d.mu.Lock()
2663 2 : if err := d.mu.log.metrics.LogWriterMetrics.Merge(&metrics); err != nil {
2664 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2665 0 : }
2666 :
2667 2 : d.mu.Unlock()
2668 2 : writer, err := d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
2669 2 : if err != nil {
2670 0 : panic(err)
2671 : }
2672 :
2673 2 : d.mu.Lock()
2674 2 : d.mu.log.writer = writer
2675 2 : return newLogNum, prevLogSize
2676 : }
2677 :
2678 2 : func (d *DB) getEarliestUnflushedSeqNumLocked() base.SeqNum {
2679 2 : seqNum := base.SeqNumMax
2680 2 : for i := range d.mu.mem.queue {
2681 2 : logSeqNum := d.mu.mem.queue[i].logSeqNum
2682 2 : if seqNum > logSeqNum {
2683 2 : seqNum = logSeqNum
2684 2 : }
2685 : }
2686 2 : return seqNum
2687 : }
2688 :
2689 2 : func (d *DB) getInProgressCompactionInfoLocked(finishing *compaction) (rv []compactionInfo) {
2690 2 : for c := range d.mu.compact.inProgress {
2691 2 : if len(c.flushing) == 0 && (finishing == nil || c != finishing) {
2692 2 : info := compactionInfo{
2693 2 : versionEditApplied: c.versionEditApplied,
2694 2 : inputs: c.inputs,
2695 2 : smallest: c.smallest,
2696 2 : largest: c.largest,
2697 2 : outputLevel: -1,
2698 2 : }
2699 2 : if c.outputLevel != nil {
2700 2 : info.outputLevel = c.outputLevel.level
2701 2 : }
2702 2 : rv = append(rv, info)
2703 : }
2704 : }
2705 2 : return
2706 : }
2707 :
2708 2 : func inProgressL0Compactions(inProgress []compactionInfo) []manifest.L0Compaction {
2709 2 : var compactions []manifest.L0Compaction
2710 2 : for _, info := range inProgress {
2711 2 : // Skip in-progress compactions that have already committed; the L0
2712 2 : // sublevels initialization code requires the set of in-progress
2713 2 : // compactions to be consistent with the current version. Compactions
2714 2 : // with versionEditApplied=true are already applied to the current
2715 2 : // version and but are performing cleanup without the database mutex.
2716 2 : if info.versionEditApplied {
2717 2 : continue
2718 : }
2719 2 : l0 := false
2720 2 : for _, cl := range info.inputs {
2721 2 : l0 = l0 || cl.level == 0
2722 2 : }
2723 2 : if !l0 {
2724 2 : continue
2725 : }
2726 2 : compactions = append(compactions, manifest.L0Compaction{
2727 2 : Smallest: info.smallest,
2728 2 : Largest: info.largest,
2729 2 : IsIntraL0: info.outputLevel == 0,
2730 2 : })
2731 : }
2732 2 : return compactions
2733 : }
2734 :
2735 : // firstError returns the first non-nil error of err0 and err1, or nil if both
2736 : // are nil.
2737 2 : func firstError(err0, err1 error) error {
2738 2 : if err0 != nil {
2739 2 : return err0
2740 2 : }
2741 2 : return err1
2742 : }
2743 :
2744 : // SetCreatorID sets the CreatorID which is needed in order to use shared objects.
2745 : // Remote object usage is disabled until this method is called the first time.
2746 : // Once set, the Creator ID is persisted and cannot change.
2747 : //
2748 : // Does nothing if SharedStorage was not set in the options when the DB was
2749 : // opened or if the DB is in read-only mode.
2750 2 : func (d *DB) SetCreatorID(creatorID uint64) error {
2751 2 : if d.opts.Experimental.RemoteStorage == nil || d.opts.ReadOnly {
2752 0 : return nil
2753 0 : }
2754 2 : return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
2755 : }
2756 :
2757 : // KeyStatistics keeps track of the number of keys that have been pinned by a
2758 : // snapshot as well as counts of the different key kinds in the lsm.
2759 : //
2760 : // One way of using the accumulated stats, when we only have sets and dels,
2761 : // and say the counts are represented as del_count, set_count,
2762 : // del_latest_count, set_latest_count, snapshot_pinned_count.
2763 : //
2764 : // - del_latest_count + set_latest_count is the set of unique user keys
2765 : // (unique).
2766 : //
2767 : // - set_latest_count is the set of live unique user keys (live_unique).
2768 : //
2769 : // - Garbage is del_count + set_count - live_unique.
2770 : //
2771 : // - If everything were in the LSM, del_count+set_count-snapshot_pinned_count
2772 : // would also be the set of unique user keys (note that
2773 : // snapshot_pinned_count is counting something different -- see comment below).
2774 : // But snapshot_pinned_count only counts keys in the LSM so the excess here
2775 : // must be keys in memtables.
2776 : type KeyStatistics struct {
2777 : // TODO(sumeer): the SnapshotPinned* are incorrect in that these older
2778 : // versions can be in a different level. Either fix the accounting or
2779 : // rename these fields.
2780 :
2781 : // SnapshotPinnedKeys represents obsolete keys that cannot be elided during
2782 : // a compaction, because they are required by an open snapshot.
2783 : SnapshotPinnedKeys int
2784 : // SnapshotPinnedKeysBytes is the total number of bytes of all snapshot
2785 : // pinned keys.
2786 : SnapshotPinnedKeysBytes uint64
2787 : // KindsCount is the count for each kind of key. It includes point keys,
2788 : // range deletes and range keys.
2789 : KindsCount [InternalKeyKindMax + 1]int
2790 : // LatestKindsCount is the count for each kind of key when it is the latest
2791 : // kind for a user key. It is only populated for point keys.
2792 : LatestKindsCount [InternalKeyKindMax + 1]int
2793 : }
2794 :
2795 : // LSMKeyStatistics is used by DB.ScanStatistics.
2796 : type LSMKeyStatistics struct {
2797 : Accumulated KeyStatistics
2798 : // Levels contains statistics only for point keys. Range deletions and range keys will
2799 : // appear in Accumulated but not Levels.
2800 : Levels [numLevels]KeyStatistics
2801 : // BytesRead represents the logical, pre-compression size of keys and values read
2802 : BytesRead uint64
2803 : }
2804 :
2805 : // ScanStatisticsOptions is used by DB.ScanStatistics.
2806 : type ScanStatisticsOptions struct {
2807 : // LimitBytesPerSecond indicates the number of bytes that are able to be read
2808 : // per second using ScanInternal.
2809 : // A value of 0 indicates that there is no limit set.
2810 : LimitBytesPerSecond int64
2811 : }
2812 :
2813 : // ScanStatistics returns the count of different key kinds within the lsm for a
2814 : // key span [lower, upper) as well as the number of snapshot keys.
2815 : func (d *DB) ScanStatistics(
2816 : ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions,
2817 1 : ) (LSMKeyStatistics, error) {
2818 1 : stats := LSMKeyStatistics{}
2819 1 : var prevKey InternalKey
2820 1 : var rateLimitFunc func(key *InternalKey, val LazyValue) error
2821 1 : tb := tokenbucket.TokenBucket{}
2822 1 :
2823 1 : if opts.LimitBytesPerSecond != 0 {
2824 0 : // Each "token" roughly corresponds to a byte that was read.
2825 0 : tb.Init(tokenbucket.TokensPerSecond(opts.LimitBytesPerSecond), tokenbucket.Tokens(1024))
2826 0 : rateLimitFunc = func(key *InternalKey, val LazyValue) error {
2827 0 : return tb.WaitCtx(ctx, tokenbucket.Tokens(key.Size()+val.Len()))
2828 0 : }
2829 : }
2830 :
2831 1 : scanInternalOpts := &scanInternalOptions{
2832 1 : visitPointKey: func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
2833 1 : // If the previous key is equal to the current point key, the current key was
2834 1 : // pinned by a snapshot.
2835 1 : size := uint64(key.Size())
2836 1 : kind := key.Kind()
2837 1 : sameKey := d.equal(prevKey.UserKey, key.UserKey)
2838 1 : if iterInfo.Kind == IteratorLevelLSM && sameKey {
2839 1 : stats.Levels[iterInfo.Level].SnapshotPinnedKeys++
2840 1 : stats.Levels[iterInfo.Level].SnapshotPinnedKeysBytes += size
2841 1 : stats.Accumulated.SnapshotPinnedKeys++
2842 1 : stats.Accumulated.SnapshotPinnedKeysBytes += size
2843 1 : }
2844 1 : if iterInfo.Kind == IteratorLevelLSM {
2845 1 : stats.Levels[iterInfo.Level].KindsCount[kind]++
2846 1 : }
2847 1 : if !sameKey {
2848 1 : if iterInfo.Kind == IteratorLevelLSM {
2849 1 : stats.Levels[iterInfo.Level].LatestKindsCount[kind]++
2850 1 : }
2851 1 : stats.Accumulated.LatestKindsCount[kind]++
2852 : }
2853 :
2854 1 : stats.Accumulated.KindsCount[kind]++
2855 1 : prevKey.CopyFrom(*key)
2856 1 : stats.BytesRead += uint64(key.Size() + value.Len())
2857 1 : return nil
2858 : },
2859 0 : visitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
2860 0 : stats.Accumulated.KindsCount[InternalKeyKindRangeDelete]++
2861 0 : stats.BytesRead += uint64(len(start) + len(end))
2862 0 : return nil
2863 0 : },
2864 0 : visitRangeKey: func(start, end []byte, keys []rangekey.Key) error {
2865 0 : stats.BytesRead += uint64(len(start) + len(end))
2866 0 : for _, key := range keys {
2867 0 : stats.Accumulated.KindsCount[key.Kind()]++
2868 0 : stats.BytesRead += uint64(len(key.Value) + len(key.Suffix))
2869 0 : }
2870 0 : return nil
2871 : },
2872 : includeObsoleteKeys: true,
2873 : IterOptions: IterOptions{
2874 : KeyTypes: IterKeyTypePointsAndRanges,
2875 : LowerBound: lower,
2876 : UpperBound: upper,
2877 : },
2878 : rateLimitFunc: rateLimitFunc,
2879 : }
2880 1 : iter, err := d.newInternalIter(ctx, snapshotIterOpts{}, scanInternalOpts)
2881 1 : if err != nil {
2882 0 : return LSMKeyStatistics{}, err
2883 0 : }
2884 1 : defer iter.close()
2885 1 :
2886 1 : err = scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
2887 1 :
2888 1 : if err != nil {
2889 0 : return LSMKeyStatistics{}, err
2890 0 : }
2891 :
2892 1 : return stats, nil
2893 : }
2894 :
2895 : // ObjProvider returns the objstorage.Provider for this database. Meant to be
2896 : // used for internal purposes only.
2897 2 : func (d *DB) ObjProvider() objstorage.Provider {
2898 2 : return d.objProvider
2899 2 : }
2900 :
2901 1 : func (d *DB) checkVirtualBounds(m *fileMetadata) {
2902 1 : if !invariants.Enabled {
2903 0 : return
2904 0 : }
2905 :
2906 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, m.FileBacking.DiskFileNum)
2907 1 : if err != nil {
2908 0 : panic(err)
2909 : }
2910 1 : if objMeta.IsExternal() {
2911 0 : // Nothing to do; bounds are expected to be loose.
2912 0 : return
2913 0 : }
2914 :
2915 1 : iters, err := d.newIters(context.TODO(), m, nil, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
2916 1 : if err != nil {
2917 0 : panic(errors.Wrap(err, "pebble: error creating iterators"))
2918 : }
2919 1 : defer iters.CloseAll()
2920 1 :
2921 1 : if m.HasPointKeys {
2922 1 : pointIter := iters.Point()
2923 1 : rangeDelIter := iters.RangeDeletion()
2924 1 :
2925 1 : // Check that the lower bound is tight.
2926 1 : pointKV := pointIter.First()
2927 1 : rangeDel, err := rangeDelIter.First()
2928 1 : if err != nil {
2929 0 : panic(err)
2930 : }
2931 1 : if (rangeDel == nil || d.cmp(rangeDel.SmallestKey().UserKey, m.SmallestPointKey.UserKey) != 0) &&
2932 1 : (pointKV == nil || d.cmp(pointKV.K.UserKey, m.SmallestPointKey.UserKey) != 0) {
2933 0 : panic(errors.Newf("pebble: virtual sstable %s lower point key bound is not tight", m.FileNum))
2934 : }
2935 :
2936 : // Check that the upper bound is tight.
2937 1 : pointKV = pointIter.Last()
2938 1 : rangeDel, err = rangeDelIter.Last()
2939 1 : if err != nil {
2940 0 : panic(err)
2941 : }
2942 1 : if (rangeDel == nil || d.cmp(rangeDel.LargestKey().UserKey, m.LargestPointKey.UserKey) != 0) &&
2943 1 : (pointKV == nil || d.cmp(pointKV.K.UserKey, m.LargestPointKey.UserKey) != 0) {
2944 0 : panic(errors.Newf("pebble: virtual sstable %s upper point key bound is not tight", m.FileNum))
2945 : }
2946 :
2947 : // Check that iterator keys are within bounds.
2948 1 : for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
2949 1 : if d.cmp(kv.K.UserKey, m.SmallestPointKey.UserKey) < 0 || d.cmp(kv.K.UserKey, m.LargestPointKey.UserKey) > 0 {
2950 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, kv.K.UserKey))
2951 : }
2952 : }
2953 1 : s, err := rangeDelIter.First()
2954 1 : for ; s != nil; s, err = rangeDelIter.Next() {
2955 1 : if d.cmp(s.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
2956 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.SmallestKey().UserKey))
2957 : }
2958 1 : if d.cmp(s.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
2959 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.LargestKey().UserKey))
2960 : }
2961 : }
2962 1 : if err != nil {
2963 0 : panic(err)
2964 : }
2965 : }
2966 :
2967 1 : if !m.HasRangeKeys {
2968 1 : return
2969 1 : }
2970 0 : rangeKeyIter := iters.RangeKey()
2971 0 :
2972 0 : // Check that the lower bound is tight.
2973 0 : if s, err := rangeKeyIter.First(); err != nil {
2974 0 : panic(err)
2975 0 : } else if d.cmp(s.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) != 0 {
2976 0 : panic(errors.Newf("pebble: virtual sstable %s lower range key bound is not tight", m.FileNum))
2977 : }
2978 :
2979 : // Check that upper bound is tight.
2980 0 : if s, err := rangeKeyIter.Last(); err != nil {
2981 0 : panic(err)
2982 0 : } else if d.cmp(s.LargestKey().UserKey, m.LargestRangeKey.UserKey) != 0 {
2983 0 : panic(errors.Newf("pebble: virtual sstable %s upper range key bound is not tight", m.FileNum))
2984 : }
2985 :
2986 0 : s, err := rangeKeyIter.First()
2987 0 : for ; s != nil; s, err = rangeKeyIter.Next() {
2988 0 : if d.cmp(s.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) < 0 {
2989 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.SmallestKey().UserKey))
2990 : }
2991 0 : if d.cmp(s.LargestKey().UserKey, m.LargestRangeKey.UserKey) > 0 {
2992 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.LargestKey().UserKey))
2993 : }
2994 : }
2995 0 : if err != nil {
2996 0 : panic(err)
2997 : }
2998 : }
2999 :
3000 : // DebugString returns a debugging string describing the LSM.
3001 0 : func (d *DB) DebugString() string {
3002 0 : return d.DebugCurrentVersion().DebugString()
3003 0 : }
3004 :
3005 : // DebugCurrentVersion returns the current LSM tree metadata. Should only be
3006 : // used for testing/debugging.
3007 0 : func (d *DB) DebugCurrentVersion() *manifest.Version {
3008 0 : d.mu.Lock()
3009 0 : defer d.mu.Unlock()
3010 0 : return d.mu.versions.currentVersion()
3011 0 : }
|