Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package pebble provides an ordered key/value store.
6 : package pebble // import "github.com/cockroachdb/pebble"
7 :
8 : import (
9 : "context"
10 : "fmt"
11 : "io"
12 : "strconv"
13 : "sync"
14 : "sync/atomic"
15 : "time"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/arenaskl"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invalidating"
21 : "github.com/cockroachdb/pebble/internal/invariants"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/cockroachdb/pebble/internal/manual"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/remote"
28 : "github.com/cockroachdb/pebble/rangekey"
29 : "github.com/cockroachdb/pebble/record"
30 : "github.com/cockroachdb/pebble/sstable"
31 : "github.com/cockroachdb/pebble/vfs"
32 : "github.com/cockroachdb/pebble/vfs/atomicfs"
33 : "github.com/cockroachdb/pebble/wal"
34 : "github.com/cockroachdb/tokenbucket"
35 : "github.com/prometheus/client_golang/prometheus"
36 : )
37 :
38 : const (
39 : // minTableCacheSize is the minimum size of the table cache, for a single db.
40 : minTableCacheSize = 64
41 :
42 : // numNonTableCacheFiles is an approximation for the number of files
43 : // that we don't use for table caches, for a given db.
44 : numNonTableCacheFiles = 10
45 : )
46 :
47 : var (
48 : // ErrNotFound is returned when a get operation does not find the requested
49 : // key.
50 : ErrNotFound = base.ErrNotFound
51 : // ErrClosed is panicked when an operation is performed on a closed snapshot or
52 : // DB. Use errors.Is(err, ErrClosed) to check for this error.
53 : ErrClosed = errors.New("pebble: closed")
54 : // ErrReadOnly is returned when a write operation is performed on a read-only
55 : // database.
56 : ErrReadOnly = errors.New("pebble: read-only")
57 : // errNoSplit indicates that the user is trying to perform a range key
58 : // operation but the configured Comparer does not provide a Split
59 : // implementation.
60 : errNoSplit = errors.New("pebble: Comparer.Split required for range key operations")
61 : )
62 :
63 : // Reader is a readable key/value store.
64 : //
65 : // It is safe to call Get and NewIter from concurrent goroutines.
66 : type Reader interface {
67 : // Get gets the value for the given key. It returns ErrNotFound if the DB
68 : // does not contain the key.
69 : //
70 : // The caller should not modify the contents of the returned slice, but it is
71 : // safe to modify the contents of the argument after Get returns. The
72 : // returned slice will remain valid until the returned Closer is closed. On
73 : // success, the caller MUST call closer.Close() or a memory leak will occur.
74 : Get(key []byte) (value []byte, closer io.Closer, err error)
75 :
76 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
77 : // return false). The iterator can be positioned via a call to SeekGE,
78 : // SeekLT, First or Last.
79 : NewIter(o *IterOptions) (*Iterator, error)
80 :
81 : // NewIterWithContext is like NewIter, and additionally accepts a context
82 : // for tracing.
83 : NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error)
84 :
85 : // Close closes the Reader. It may or may not close any underlying io.Reader
86 : // or io.Writer, depending on how the DB was created.
87 : //
88 : // It is not safe to close a DB until all outstanding iterators are closed.
89 : // It is valid to call Close multiple times. Other methods should not be
90 : // called after the DB has been closed.
91 : Close() error
92 : }
93 :
94 : // Writer is a writable key/value store.
95 : //
96 : // Goroutine safety is dependent on the specific implementation.
97 : type Writer interface {
98 : // Apply the operations contained in the batch to the DB.
99 : //
100 : // It is safe to modify the contents of the arguments after Apply returns.
101 : Apply(batch *Batch, o *WriteOptions) error
102 :
103 : // Delete deletes the value for the given key. Deletes are blind all will
104 : // succeed even if the given key does not exist.
105 : //
106 : // It is safe to modify the contents of the arguments after Delete returns.
107 : Delete(key []byte, o *WriteOptions) error
108 :
109 : // DeleteSized behaves identically to Delete, but takes an additional
110 : // argument indicating the size of the value being deleted. DeleteSized
111 : // should be preferred when the caller has the expectation that there exists
112 : // a single internal KV pair for the key (eg, the key has not been
113 : // overwritten recently), and the caller knows the size of its value.
114 : //
115 : // DeleteSized will record the value size within the tombstone and use it to
116 : // inform compaction-picking heuristics which strive to reduce space
117 : // amplification in the LSM. This "calling your shot" mechanic allows the
118 : // storage engine to more accurately estimate and reduce space
119 : // amplification.
120 : //
121 : // It is safe to modify the contents of the arguments after DeleteSized
122 : // returns.
123 : DeleteSized(key []byte, valueSize uint32, _ *WriteOptions) error
124 :
125 : // SingleDelete is similar to Delete in that it deletes the value for the given key. Like Delete,
126 : // it is a blind operation that will succeed even if the given key does not exist.
127 : //
128 : // WARNING: Undefined (non-deterministic) behavior will result if a key is overwritten and
129 : // then deleted using SingleDelete. The record may appear deleted immediately, but be
130 : // resurrected at a later time after compactions have been performed. Or the record may
131 : // be deleted permanently. A Delete operation lays down a "tombstone" which shadows all
132 : // previous versions of a key. The SingleDelete operation is akin to "anti-matter" and will
133 : // only delete the most recently written version for a key. These different semantics allow
134 : // the DB to avoid propagating a SingleDelete operation during a compaction as soon as the
135 : // corresponding Set operation is encountered. These semantics require extreme care to handle
136 : // properly. Only use if you have a workload where the performance gain is critical and you
137 : // can guarantee that a record is written once and then deleted once.
138 : //
139 : // SingleDelete is internally transformed into a Delete if the most recent record for a key is either
140 : // a Merge or Delete record.
141 : //
142 : // It is safe to modify the contents of the arguments after SingleDelete returns.
143 : SingleDelete(key []byte, o *WriteOptions) error
144 :
145 : // DeleteRange deletes all of the point keys (and values) in the range
146 : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
147 : // delete overlapping range keys (eg, keys set via RangeKeySet).
148 : //
149 : // It is safe to modify the contents of the arguments after DeleteRange
150 : // returns.
151 : DeleteRange(start, end []byte, o *WriteOptions) error
152 :
153 : // LogData adds the specified to the batch. The data will be written to the
154 : // WAL, but not added to memtables or sstables. Log data is never indexed,
155 : // which makes it useful for testing WAL performance.
156 : //
157 : // It is safe to modify the contents of the argument after LogData returns.
158 : LogData(data []byte, opts *WriteOptions) error
159 :
160 : // Merge merges the value for the given key. The details of the merge are
161 : // dependent upon the configured merge operation.
162 : //
163 : // It is safe to modify the contents of the arguments after Merge returns.
164 : Merge(key, value []byte, o *WriteOptions) error
165 :
166 : // Set sets the value for the given key. It overwrites any previous value
167 : // for that key; a DB is not a multi-map.
168 : //
169 : // It is safe to modify the contents of the arguments after Set returns.
170 : Set(key, value []byte, o *WriteOptions) error
171 :
172 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
173 : // timestamp suffix to value. The suffix is optional. If any portion of the key
174 : // range [start, end) is already set by a range key with the same suffix value,
175 : // RangeKeySet overrides it.
176 : //
177 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
178 : RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error
179 :
180 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
181 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
182 : // range key. RangeKeyUnset only removes portions of range keys that fall within
183 : // the [start, end) key span, and only range keys with suffixes that exactly
184 : // match the unset suffix.
185 : //
186 : // It is safe to modify the contents of the arguments after RangeKeyUnset
187 : // returns.
188 : RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error
189 :
190 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
191 : // (inclusive on start, exclusive on end). It does not delete point keys (for
192 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
193 : // bounds, including those with or without suffixes.
194 : //
195 : // It is safe to modify the contents of the arguments after RangeKeyDelete
196 : // returns.
197 : RangeKeyDelete(start, end []byte, opts *WriteOptions) error
198 : }
199 :
200 : // CPUWorkHandle represents a handle used by the CPUWorkPermissionGranter API.
201 : type CPUWorkHandle interface {
202 : // Permitted indicates whether Pebble can use additional CPU resources.
203 : Permitted() bool
204 : }
205 :
206 : // CPUWorkPermissionGranter is used to request permission to opportunistically
207 : // use additional CPUs to speed up internal background work.
208 : type CPUWorkPermissionGranter interface {
209 : // GetPermission returns a handle regardless of whether permission is granted
210 : // or not. In the latter case, the handle is only useful for recording
211 : // the CPU time actually spent on this calling goroutine.
212 : GetPermission(time.Duration) CPUWorkHandle
213 : // CPUWorkDone must be called regardless of whether CPUWorkHandle.Permitted
214 : // returns true or false.
215 : CPUWorkDone(CPUWorkHandle)
216 : }
217 :
218 : // Use a default implementation for the CPU work granter to avoid excessive nil
219 : // checks in the code.
220 : type defaultCPUWorkHandle struct{}
221 :
222 1 : func (d defaultCPUWorkHandle) Permitted() bool {
223 1 : return false
224 1 : }
225 :
226 : type defaultCPUWorkGranter struct{}
227 :
228 2 : func (d defaultCPUWorkGranter) GetPermission(_ time.Duration) CPUWorkHandle {
229 2 : return defaultCPUWorkHandle{}
230 2 : }
231 :
232 2 : func (d defaultCPUWorkGranter) CPUWorkDone(_ CPUWorkHandle) {}
233 :
234 : // DB provides a concurrent, persistent ordered key/value store.
235 : //
236 : // A DB's basic operations (Get, Set, Delete) should be self-explanatory. Get
237 : // and Delete will return ErrNotFound if the requested key is not in the store.
238 : // Callers are free to ignore this error.
239 : //
240 : // A DB also allows for iterating over the key/value pairs in key order. If d
241 : // is a DB, the code below prints all key/value pairs whose keys are 'greater
242 : // than or equal to' k:
243 : //
244 : // iter := d.NewIter(readOptions)
245 : // for iter.SeekGE(k); iter.Valid(); iter.Next() {
246 : // fmt.Printf("key=%q value=%q\n", iter.Key(), iter.Value())
247 : // }
248 : // return iter.Close()
249 : //
250 : // The Options struct holds the optional parameters for the DB, including a
251 : // Comparer to define a 'less than' relationship over keys. It is always valid
252 : // to pass a nil *Options, which means to use the default parameter values. Any
253 : // zero field of a non-nil *Options also means to use the default value for
254 : // that parameter. Thus, the code below uses a custom Comparer, but the default
255 : // values for every other parameter:
256 : //
257 : // db := pebble.Open(&Options{
258 : // Comparer: myComparer,
259 : // })
260 : type DB struct {
261 : // The count and size of referenced memtables. This includes memtables
262 : // present in DB.mu.mem.queue, as well as memtables that have been flushed
263 : // but are still referenced by an inuse readState, as well as up to one
264 : // memTable waiting to be reused and stored in d.memTableRecycle.
265 : memTableCount atomic.Int64
266 : memTableReserved atomic.Int64 // number of bytes reserved in the cache for memtables
267 : // memTableRecycle holds a pointer to an obsolete memtable. The next
268 : // memtable allocation will reuse this memtable if it has not already been
269 : // recycled.
270 : memTableRecycle atomic.Pointer[memTable]
271 :
272 : // The logical size of the current WAL.
273 : logSize atomic.Uint64
274 :
275 : // The number of bytes available on disk.
276 : diskAvailBytes atomic.Uint64
277 :
278 : cacheID uint64
279 : dirname string
280 : opts *Options
281 : cmp Compare
282 : equal Equal
283 : merge Merge
284 : split Split
285 : abbreviatedKey AbbreviatedKey
286 : // The threshold for determining when a batch is "large" and will skip being
287 : // inserted into a memtable.
288 : largeBatchThreshold uint64
289 : // The current OPTIONS file number.
290 : optionsFileNum base.DiskFileNum
291 : // The on-disk size of the current OPTIONS file.
292 : optionsFileSize uint64
293 :
294 : // objProvider is used to access and manage SSTs.
295 : objProvider objstorage.Provider
296 :
297 : fileLock *Lock
298 : dataDir vfs.File
299 :
300 : tableCache *tableCacheContainer
301 : newIters tableNewIters
302 : tableNewRangeKeyIter keyspanimpl.TableNewSpanIter
303 :
304 : commit *commitPipeline
305 :
306 : // readState provides access to the state needed for reading without needing
307 : // to acquire DB.mu.
308 : readState struct {
309 : sync.RWMutex
310 : val *readState
311 : }
312 :
313 : closed *atomic.Value
314 : closedCh chan struct{}
315 :
316 : cleanupManager *cleanupManager
317 :
318 : // During an iterator close, we may asynchronously schedule read compactions.
319 : // We want to wait for those goroutines to finish, before closing the DB.
320 : // compactionShedulers.Wait() should not be called while the DB.mu is held.
321 : compactionSchedulers sync.WaitGroup
322 :
323 : // The main mutex protecting internal DB state. This mutex encompasses many
324 : // fields because those fields need to be accessed and updated atomically. In
325 : // particular, the current version, log.*, mem.*, and snapshot list need to
326 : // be accessed and updated atomically during compaction.
327 : //
328 : // Care is taken to avoid holding DB.mu during IO operations. Accomplishing
329 : // this sometimes requires releasing DB.mu in a method that was called with
330 : // it held. See versionSet.logAndApply() and DB.makeRoomForWrite() for
331 : // examples. This is a common pattern, so be careful about expectations that
332 : // DB.mu will be held continuously across a set of calls.
333 : mu struct {
334 : sync.Mutex
335 :
336 : formatVers struct {
337 : // vers is the database's current format major version.
338 : // Backwards-incompatible features are gated behind new
339 : // format major versions and not enabled until a database's
340 : // version is ratcheted upwards.
341 : //
342 : // Although this is under the `mu` prefix, readers may read vers
343 : // atomically without holding d.mu. Writers must only write to this
344 : // value through finalizeFormatVersUpgrade which requires d.mu is
345 : // held.
346 : vers atomic.Uint64
347 : // marker is the atomic marker for the format major version.
348 : // When a database's version is ratcheted upwards, the
349 : // marker is moved in order to atomically record the new
350 : // version.
351 : marker *atomicfs.Marker
352 : // ratcheting when set to true indicates that the database is
353 : // currently in the process of ratcheting the format major version
354 : // to vers + 1. As a part of ratcheting the format major version,
355 : // migrations may drop and re-acquire the mutex.
356 : ratcheting bool
357 : }
358 :
359 : // The ID of the next job. Job IDs are passed to event listener
360 : // notifications and act as a mechanism for tying together the events and
361 : // log messages for a single job such as a flush, compaction, or file
362 : // ingestion. Job IDs are not serialized to disk or used for correctness.
363 : nextJobID int
364 :
365 : // The collection of immutable versions and state about the log and visible
366 : // sequence numbers. Use the pointer here to ensure the atomic fields in
367 : // version set are aligned properly.
368 : versions *versionSet
369 :
370 : log struct {
371 : // manager is not protected by mu, but calls to Create must be
372 : // serialized, and happen after the previous writer is closed.
373 : manager wal.Manager
374 : // The number of input bytes to the log. This is the raw size of the
375 : // batches written to the WAL, without the overhead of the record
376 : // envelopes.
377 : bytesIn uint64
378 : // The Writer is protected by commitPipeline.mu. This allows log writes
379 : // to be performed without holding DB.mu, but requires both
380 : // commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
381 : // (i.e. makeRoomForWrite). Can be nil.
382 : writer wal.Writer
383 : metrics struct {
384 : // fsyncLatency has its own internal synchronization, and is not
385 : // protected by mu.
386 : fsyncLatency prometheus.Histogram
387 : // Updated whenever a wal.Writer is closed.
388 : record.LogWriterMetrics
389 : }
390 : }
391 :
392 : mem struct {
393 : // The current mutable memTable.
394 : mutable *memTable
395 : // Queue of flushables (the mutable memtable is at end). Elements are
396 : // added to the end of the slice and removed from the beginning. Once an
397 : // index is set it is never modified making a fixed slice immutable and
398 : // safe for concurrent reads.
399 : queue flushableList
400 : // nextSize is the size of the next memtable. The memtable size starts at
401 : // min(256KB,Options.MemTableSize) and doubles each time a new memtable
402 : // is allocated up to Options.MemTableSize. This reduces the memory
403 : // footprint of memtables when lots of DB instances are used concurrently
404 : // in test environments.
405 : nextSize uint64
406 : }
407 :
408 : compact struct {
409 : // Condition variable used to signal when a flush or compaction has
410 : // completed. Used by the write-stall mechanism to wait for the stall
411 : // condition to clear. See DB.makeRoomForWrite().
412 : cond sync.Cond
413 : // True when a flush is in progress.
414 : flushing bool
415 : // The number of ongoing compactions.
416 : compactingCount int
417 : // The list of deletion hints, suggesting ranges for delete-only
418 : // compactions.
419 : deletionHints []deleteCompactionHint
420 : // The list of manual compactions. The next manual compaction to perform
421 : // is at the start of the list. New entries are added to the end.
422 : manual []*manualCompaction
423 : // downloads is the list of suggested download tasks. The next download to
424 : // perform is at the start of the list. New entries are added to the end.
425 : downloads []*downloadSpan
426 : // inProgress is the set of in-progress flushes and compactions.
427 : // It's used in the calculation of some metrics and to initialize L0
428 : // sublevels' state. Some of the compactions contained within this
429 : // map may have already committed an edit to the version but are
430 : // lingering performing cleanup, like deleting obsolete files.
431 : inProgress map[*compaction]struct{}
432 :
433 : // rescheduleReadCompaction indicates to an iterator that a read compaction
434 : // should be scheduled.
435 : rescheduleReadCompaction bool
436 :
437 : // readCompactions is a readCompactionQueue which keeps track of the
438 : // compactions which we might have to perform.
439 : readCompactions readCompactionQueue
440 :
441 : // The cumulative duration of all completed compactions since Open.
442 : // Does not include flushes.
443 : duration time.Duration
444 : // Flush throughput metric.
445 : flushWriteThroughput ThroughputMetric
446 : // The idle start time for the flush "loop", i.e., when the flushing
447 : // bool above transitions to false.
448 : noOngoingFlushStartTime time.Time
449 : }
450 :
451 : // Non-zero when file cleaning is disabled. The disabled count acts as a
452 : // reference count to prohibit file cleaning. See
453 : // DB.{disable,Enable}FileDeletions().
454 : disableFileDeletions int
455 :
456 : snapshots struct {
457 : // The list of active snapshots.
458 : snapshotList
459 :
460 : // The cumulative count and size of snapshot-pinned keys written to
461 : // sstables.
462 : cumulativePinnedCount uint64
463 : cumulativePinnedSize uint64
464 : }
465 :
466 : tableStats struct {
467 : // Condition variable used to signal the completion of a
468 : // job to collect table stats.
469 : cond sync.Cond
470 : // True when a stat collection operation is in progress.
471 : loading bool
472 : // True if stat collection has loaded statistics for all tables
473 : // other than those listed explicitly in pending. This flag starts
474 : // as false when a database is opened and flips to true once stat
475 : // collection has caught up.
476 : loadedInitial bool
477 : // A slice of files for which stats have not been computed.
478 : // Compactions, ingests, flushes append files to be processed. An
479 : // active stat collection goroutine clears the list and processes
480 : // them.
481 : pending []manifest.NewFileEntry
482 : }
483 :
484 : tableValidation struct {
485 : // cond is a condition variable used to signal the completion of a
486 : // job to validate one or more sstables.
487 : cond sync.Cond
488 : // pending is a slice of metadata for sstables waiting to be
489 : // validated. Only physical sstables should be added to the pending
490 : // queue.
491 : pending []newFileEntry
492 : // validating is set to true when validation is running.
493 : validating bool
494 : }
495 : }
496 :
497 : // Normally equal to time.Now() but may be overridden in tests.
498 : timeNow func() time.Time
499 : // the time at database Open; may be used to compute metrics like effective
500 : // compaction concurrency
501 : openedAt time.Time
502 : }
503 :
504 : var _ Reader = (*DB)(nil)
505 : var _ Writer = (*DB)(nil)
506 :
507 : // TestOnlyWaitForCleaning MUST only be used in tests.
508 1 : func (d *DB) TestOnlyWaitForCleaning() {
509 1 : d.cleanupManager.Wait()
510 1 : }
511 :
512 : // Get gets the value for the given key. It returns ErrNotFound if the DB does
513 : // not contain the key.
514 : //
515 : // The caller should not modify the contents of the returned slice, but it is
516 : // safe to modify the contents of the argument after Get returns. The returned
517 : // slice will remain valid until the returned Closer is closed. On success, the
518 : // caller MUST call closer.Close() or a memory leak will occur.
519 2 : func (d *DB) Get(key []byte) ([]byte, io.Closer, error) {
520 2 : return d.getInternal(key, nil /* batch */, nil /* snapshot */)
521 2 : }
522 :
523 : type getIterAlloc struct {
524 : dbi Iterator
525 : keyBuf []byte
526 : get getIter
527 : }
528 :
529 : var getIterAllocPool = sync.Pool{
530 2 : New: func() interface{} {
531 2 : return &getIterAlloc{}
532 2 : },
533 : }
534 :
535 2 : func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer, error) {
536 2 : if err := d.closed.Load(); err != nil {
537 1 : panic(err)
538 : }
539 :
540 : // Grab and reference the current readState. This prevents the underlying
541 : // files in the associated version from being deleted if there is a current
542 : // compaction. The readState is unref'd by Iterator.Close().
543 2 : readState := d.loadReadState()
544 2 :
545 2 : // Determine the seqnum to read at after grabbing the read state (current and
546 2 : // memtables) above.
547 2 : var seqNum uint64
548 2 : if s != nil {
549 2 : seqNum = s.seqNum
550 2 : } else {
551 2 : seqNum = d.mu.versions.visibleSeqNum.Load()
552 2 : }
553 :
554 2 : buf := getIterAllocPool.Get().(*getIterAlloc)
555 2 :
556 2 : get := &buf.get
557 2 : *get = getIter{
558 2 : logger: d.opts.Logger,
559 2 : comparer: d.opts.Comparer,
560 2 : newIters: d.newIters,
561 2 : snapshot: seqNum,
562 2 : key: key,
563 2 : batch: b,
564 2 : mem: readState.memtables,
565 2 : l0: readState.current.L0SublevelFiles,
566 2 : version: readState.current,
567 2 : }
568 2 :
569 2 : // Strip off memtables which cannot possibly contain the seqNum being read
570 2 : // at.
571 2 : for len(get.mem) > 0 {
572 2 : n := len(get.mem)
573 2 : if logSeqNum := get.mem[n-1].logSeqNum; logSeqNum < seqNum {
574 2 : break
575 : }
576 2 : get.mem = get.mem[:n-1]
577 : }
578 :
579 2 : i := &buf.dbi
580 2 : pointIter := get
581 2 : *i = Iterator{
582 2 : ctx: context.Background(),
583 2 : getIterAlloc: buf,
584 2 : iter: pointIter,
585 2 : pointIter: pointIter,
586 2 : merge: d.merge,
587 2 : comparer: *d.opts.Comparer,
588 2 : readState: readState,
589 2 : keyBuf: buf.keyBuf,
590 2 : }
591 2 :
592 2 : if !i.First() {
593 2 : err := i.Close()
594 2 : if err != nil {
595 1 : return nil, nil, err
596 1 : }
597 2 : return nil, nil, ErrNotFound
598 : }
599 2 : return i.Value(), i, nil
600 : }
601 :
602 : // Set sets the value for the given key. It overwrites any previous value
603 : // for that key; a DB is not a multi-map.
604 : //
605 : // It is safe to modify the contents of the arguments after Set returns.
606 2 : func (d *DB) Set(key, value []byte, opts *WriteOptions) error {
607 2 : b := newBatch(d)
608 2 : _ = b.Set(key, value, opts)
609 2 : if err := d.Apply(b, opts); err != nil {
610 1 : return err
611 1 : }
612 : // Only release the batch on success.
613 2 : return b.Close()
614 : }
615 :
616 : // Delete deletes the value for the given key. Deletes are blind all will
617 : // succeed even if the given key does not exist.
618 : //
619 : // It is safe to modify the contents of the arguments after Delete returns.
620 2 : func (d *DB) Delete(key []byte, opts *WriteOptions) error {
621 2 : b := newBatch(d)
622 2 : _ = b.Delete(key, opts)
623 2 : if err := d.Apply(b, opts); err != nil {
624 1 : return err
625 1 : }
626 : // Only release the batch on success.
627 2 : return b.Close()
628 : }
629 :
630 : // DeleteSized behaves identically to Delete, but takes an additional
631 : // argument indicating the size of the value being deleted. DeleteSized
632 : // should be preferred when the caller has the expectation that there exists
633 : // a single internal KV pair for the key (eg, the key has not been
634 : // overwritten recently), and the caller knows the size of its value.
635 : //
636 : // DeleteSized will record the value size within the tombstone and use it to
637 : // inform compaction-picking heuristics which strive to reduce space
638 : // amplification in the LSM. This "calling your shot" mechanic allows the
639 : // storage engine to more accurately estimate and reduce space amplification.
640 : //
641 : // It is safe to modify the contents of the arguments after DeleteSized
642 : // returns.
643 2 : func (d *DB) DeleteSized(key []byte, valueSize uint32, opts *WriteOptions) error {
644 2 : b := newBatch(d)
645 2 : _ = b.DeleteSized(key, valueSize, opts)
646 2 : if err := d.Apply(b, opts); err != nil {
647 0 : return err
648 0 : }
649 : // Only release the batch on success.
650 2 : return b.Close()
651 : }
652 :
653 : // SingleDelete adds an action to the batch that single deletes the entry for key.
654 : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
655 : //
656 : // It is safe to modify the contents of the arguments after SingleDelete returns.
657 2 : func (d *DB) SingleDelete(key []byte, opts *WriteOptions) error {
658 2 : b := newBatch(d)
659 2 : _ = b.SingleDelete(key, opts)
660 2 : if err := d.Apply(b, opts); err != nil {
661 0 : return err
662 0 : }
663 : // Only release the batch on success.
664 2 : return b.Close()
665 : }
666 :
667 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
668 : // (inclusive on start, exclusive on end).
669 : //
670 : // It is safe to modify the contents of the arguments after DeleteRange
671 : // returns.
672 2 : func (d *DB) DeleteRange(start, end []byte, opts *WriteOptions) error {
673 2 : b := newBatch(d)
674 2 : _ = b.DeleteRange(start, end, opts)
675 2 : if err := d.Apply(b, opts); err != nil {
676 1 : return err
677 1 : }
678 : // Only release the batch on success.
679 2 : return b.Close()
680 : }
681 :
682 : // Merge adds an action to the DB that merges the value at key with the new
683 : // value. The details of the merge are dependent upon the configured merge
684 : // operator.
685 : //
686 : // It is safe to modify the contents of the arguments after Merge returns.
687 2 : func (d *DB) Merge(key, value []byte, opts *WriteOptions) error {
688 2 : b := newBatch(d)
689 2 : _ = b.Merge(key, value, opts)
690 2 : if err := d.Apply(b, opts); err != nil {
691 1 : return err
692 1 : }
693 : // Only release the batch on success.
694 2 : return b.Close()
695 : }
696 :
697 : // LogData adds the specified to the batch. The data will be written to the
698 : // WAL, but not added to memtables or sstables. Log data is never indexed,
699 : // which makes it useful for testing WAL performance.
700 : //
701 : // It is safe to modify the contents of the argument after LogData returns.
702 2 : func (d *DB) LogData(data []byte, opts *WriteOptions) error {
703 2 : b := newBatch(d)
704 2 : _ = b.LogData(data, opts)
705 2 : if err := d.Apply(b, opts); err != nil {
706 1 : return err
707 1 : }
708 : // Only release the batch on success.
709 2 : return b.Close()
710 : }
711 :
712 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
713 : // timestamp suffix to value. The suffix is optional. If any portion of the key
714 : // range [start, end) is already set by a range key with the same suffix value,
715 : // RangeKeySet overrides it.
716 : //
717 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
718 2 : func (d *DB) RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error {
719 2 : b := newBatch(d)
720 2 : _ = b.RangeKeySet(start, end, suffix, value, opts)
721 2 : if err := d.Apply(b, opts); err != nil {
722 0 : return err
723 0 : }
724 : // Only release the batch on success.
725 2 : return b.Close()
726 : }
727 :
728 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
729 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
730 : // range key. RangeKeyUnset only removes portions of range keys that fall within
731 : // the [start, end) key span, and only range keys with suffixes that exactly
732 : // match the unset suffix.
733 : //
734 : // It is safe to modify the contents of the arguments after RangeKeyUnset
735 : // returns.
736 2 : func (d *DB) RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error {
737 2 : b := newBatch(d)
738 2 : _ = b.RangeKeyUnset(start, end, suffix, opts)
739 2 : if err := d.Apply(b, opts); err != nil {
740 0 : return err
741 0 : }
742 : // Only release the batch on success.
743 2 : return b.Close()
744 : }
745 :
746 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
747 : // (inclusive on start, exclusive on end). It does not delete point keys (for
748 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
749 : // bounds, including those with or without suffixes.
750 : //
751 : // It is safe to modify the contents of the arguments after RangeKeyDelete
752 : // returns.
753 2 : func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error {
754 2 : b := newBatch(d)
755 2 : _ = b.RangeKeyDelete(start, end, opts)
756 2 : if err := d.Apply(b, opts); err != nil {
757 0 : return err
758 0 : }
759 : // Only release the batch on success.
760 2 : return b.Close()
761 : }
762 :
763 : // Apply the operations contained in the batch to the DB. If the batch is large
764 : // the contents of the batch may be retained by the database. If that occurs
765 : // the batch contents will be cleared preventing the caller from attempting to
766 : // reuse them.
767 : //
768 : // It is safe to modify the contents of the arguments after Apply returns.
769 : //
770 : // Apply returns ErrInvalidBatch if the provided batch is invalid in any way.
771 2 : func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
772 2 : return d.applyInternal(batch, opts, false)
773 2 : }
774 :
775 : // ApplyNoSyncWait must only be used when opts.Sync is true and the caller
776 : // does not want to wait for the WAL fsync to happen. The method will return
777 : // once the mutation is applied to the memtable and is visible (note that a
778 : // mutation is visible before the WAL sync even in the wait case, so we have
779 : // not weakened the durability semantics). The caller must call Batch.SyncWait
780 : // to wait for the WAL fsync. The caller must not Close the batch without
781 : // first calling Batch.SyncWait.
782 : //
783 : // RECOMMENDATION: Prefer using Apply unless you really understand why you
784 : // need ApplyNoSyncWait.
785 : // EXPERIMENTAL: API/feature subject to change. Do not yet use outside
786 : // CockroachDB.
787 2 : func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error {
788 2 : if !opts.Sync {
789 0 : return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false")
790 0 : }
791 2 : return d.applyInternal(batch, opts, true)
792 : }
793 :
794 : // REQUIRES: noSyncWait => opts.Sync
795 2 : func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error {
796 2 : if err := d.closed.Load(); err != nil {
797 1 : panic(err)
798 : }
799 2 : if batch.committing {
800 0 : panic("pebble: batch already committing")
801 : }
802 2 : if batch.applied.Load() {
803 0 : panic("pebble: batch already applied")
804 : }
805 2 : if d.opts.ReadOnly {
806 1 : return ErrReadOnly
807 1 : }
808 2 : if batch.db != nil && batch.db != d {
809 1 : panic(fmt.Sprintf("pebble: batch db mismatch: %p != %p", batch.db, d))
810 : }
811 :
812 2 : sync := opts.GetSync()
813 2 : if sync && d.opts.DisableWAL {
814 0 : return errors.New("pebble: WAL disabled")
815 0 : }
816 :
817 2 : if fmv := d.FormatMajorVersion(); fmv < batch.minimumFormatMajorVersion {
818 0 : panic(fmt.Sprintf(
819 0 : "pebble: batch requires at least format major version %d (current: %d)",
820 0 : batch.minimumFormatMajorVersion, fmv,
821 0 : ))
822 : }
823 :
824 2 : if batch.countRangeKeys > 0 {
825 2 : if d.split == nil {
826 0 : return errNoSplit
827 0 : }
828 : }
829 2 : batch.committing = true
830 2 :
831 2 : if batch.db == nil {
832 1 : if err := batch.refreshMemTableSize(); err != nil {
833 0 : return err
834 0 : }
835 : }
836 2 : if batch.memTableSize >= d.largeBatchThreshold {
837 2 : var err error
838 2 : batch.flushable, err = newFlushableBatch(batch, d.opts.Comparer)
839 2 : if err != nil {
840 0 : return err
841 0 : }
842 : }
843 2 : if err := d.commit.Commit(batch, sync, noSyncWait); err != nil {
844 0 : // There isn't much we can do on an error here. The commit pipeline will be
845 0 : // horked at this point.
846 0 : d.opts.Logger.Fatalf("pebble: fatal commit error: %v", err)
847 0 : }
848 : // If this is a large batch, we need to clear the batch contents as the
849 : // flushable batch may still be present in the flushables queue.
850 : //
851 : // TODO(peter): Currently large batches are written to the WAL. We could
852 : // skip the WAL write and instead wait for the large batch to be flushed to
853 : // an sstable. For a 100 MB batch, this might actually be faster. For a 1
854 : // GB batch this is almost certainly faster.
855 2 : if batch.flushable != nil {
856 2 : batch.data = nil
857 2 : }
858 2 : return nil
859 : }
860 :
861 2 : func (d *DB) commitApply(b *Batch, mem *memTable) error {
862 2 : if b.flushable != nil {
863 2 : // This is a large batch which was already added to the immutable queue.
864 2 : return nil
865 2 : }
866 2 : err := mem.apply(b, b.SeqNum())
867 2 : if err != nil {
868 0 : return err
869 0 : }
870 :
871 : // If the batch contains range tombstones and the database is configured
872 : // to flush range deletions, schedule a delayed flush so that disk space
873 : // may be reclaimed without additional writes or an explicit flush.
874 2 : if b.countRangeDels > 0 && d.opts.FlushDelayDeleteRange > 0 {
875 2 : d.mu.Lock()
876 2 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayDeleteRange)
877 2 : d.mu.Unlock()
878 2 : }
879 :
880 : // If the batch contains range keys and the database is configured to flush
881 : // range keys, schedule a delayed flush so that the range keys are cleared
882 : // from the memtable.
883 2 : if b.countRangeKeys > 0 && d.opts.FlushDelayRangeKey > 0 {
884 2 : d.mu.Lock()
885 2 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayRangeKey)
886 2 : d.mu.Unlock()
887 2 : }
888 :
889 2 : if mem.writerUnref() {
890 2 : d.mu.Lock()
891 2 : d.maybeScheduleFlush()
892 2 : d.mu.Unlock()
893 2 : }
894 2 : return nil
895 : }
896 :
897 2 : func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
898 2 : var size int64
899 2 : repr := b.Repr()
900 2 :
901 2 : if b.flushable != nil {
902 2 : // We have a large batch. Such batches are special in that they don't get
903 2 : // added to the memtable, and are instead inserted into the queue of
904 2 : // memtables. The call to makeRoomForWrite with this batch will force the
905 2 : // current memtable to be flushed. We want the large batch to be part of
906 2 : // the same log, so we add it to the WAL here, rather than after the call
907 2 : // to makeRoomForWrite().
908 2 : //
909 2 : // Set the sequence number since it was not set to the correct value earlier
910 2 : // (see comment in newFlushableBatch()).
911 2 : b.flushable.setSeqNum(b.SeqNum())
912 2 : if !d.opts.DisableWAL {
913 2 : var err error
914 2 : size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b.refData)
915 2 : if err != nil {
916 0 : panic(err)
917 : }
918 : }
919 : }
920 :
921 2 : d.mu.Lock()
922 2 :
923 2 : var err error
924 2 : if !b.ingestedSSTBatch {
925 2 : // Batches which contain keys of kind InternalKeyKindIngestSST will
926 2 : // never be applied to the memtable, so we don't need to make room for
927 2 : // write. For the other cases, switch out the memtable if there was not
928 2 : // enough room to store the batch.
929 2 : err = d.makeRoomForWrite(b)
930 2 : }
931 :
932 2 : if err == nil && !d.opts.DisableWAL {
933 2 : d.mu.log.bytesIn += uint64(len(repr))
934 2 : }
935 :
936 : // Grab a reference to the memtable while holding DB.mu. Note that for
937 : // non-flushable batches (b.flushable == nil) makeRoomForWrite() added a
938 : // reference to the memtable which will prevent it from being flushed until
939 : // we unreference it. This reference is dropped in DB.commitApply().
940 2 : mem := d.mu.mem.mutable
941 2 :
942 2 : d.mu.Unlock()
943 2 : if err != nil {
944 0 : return nil, err
945 0 : }
946 :
947 2 : if d.opts.DisableWAL {
948 2 : return mem, nil
949 2 : }
950 :
951 2 : if b.flushable == nil {
952 2 : size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b.refData)
953 2 : if err != nil {
954 0 : panic(err)
955 : }
956 : }
957 :
958 2 : d.logSize.Store(uint64(size))
959 2 : return mem, err
960 : }
961 :
962 : type iterAlloc struct {
963 : dbi Iterator
964 : keyBuf []byte
965 : boundsBuf [2][]byte
966 : prefixOrFullSeekKey []byte
967 : merging mergingIter
968 : mlevels [3 + numLevels]mergingIterLevel
969 : levels [3 + numLevels]levelIter
970 : levelsPositioned [3 + numLevels]bool
971 : }
972 :
973 : var iterAllocPool = sync.Pool{
974 2 : New: func() interface{} {
975 2 : return &iterAlloc{}
976 2 : },
977 : }
978 :
979 : // snapshotIterOpts denotes snapshot-related iterator options when calling
980 : // newIter. These are the possible cases for a snapshotIterOpts:
981 : // - No snapshot: All fields are zero values.
982 : // - Classic snapshot: Only `seqNum` is set. The latest readState will be used
983 : // and the specified seqNum will be used as the snapshot seqNum.
984 : // - EventuallyFileOnlySnapshot (EFOS) behaving as a classic snapshot. Only
985 : // the `seqNum` is set. The latest readState will be used
986 : // and the specified seqNum will be used as the snapshot seqNum.
987 : // - EFOS in file-only state: Only `seqNum` and `vers` are set. All the
988 : // relevant SSTs are referenced by the *version.
989 : // - EFOS that has been excised but is in alwaysCreateIters mode (tests only).
990 : // Only `seqNum` and `readState` are set.
991 : type snapshotIterOpts struct {
992 : seqNum uint64
993 : vers *version
994 : readState *readState
995 : }
996 :
997 : type batchIterOpts struct {
998 : batchOnly bool
999 : }
1000 : type newIterOpts struct {
1001 : snapshot snapshotIterOpts
1002 : batch batchIterOpts
1003 : }
1004 :
1005 : // newIter constructs a new iterator, merging in batch iterators as an extra
1006 : // level.
1007 : func (d *DB) newIter(
1008 : ctx context.Context, batch *Batch, internalOpts newIterOpts, o *IterOptions,
1009 2 : ) *Iterator {
1010 2 : if internalOpts.batch.batchOnly {
1011 1 : if batch == nil {
1012 0 : panic("batchOnly is true, but batch is nil")
1013 : }
1014 1 : if internalOpts.snapshot.vers != nil {
1015 0 : panic("batchOnly is true, but snapshotIterOpts is initialized")
1016 : }
1017 : }
1018 2 : if err := d.closed.Load(); err != nil {
1019 1 : panic(err)
1020 : }
1021 2 : seqNum := internalOpts.snapshot.seqNum
1022 2 : if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
1023 0 : panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
1024 : }
1025 2 : if (batch != nil || seqNum != 0) && (o != nil && o.OnlyReadGuaranteedDurable) {
1026 1 : // We could add support for OnlyReadGuaranteedDurable on snapshots if
1027 1 : // there was a need: this would require checking that the sequence number
1028 1 : // of the snapshot has been flushed, by comparing with
1029 1 : // DB.mem.queue[0].logSeqNum.
1030 1 : panic("OnlyReadGuaranteedDurable is not supported for batches or snapshots")
1031 : }
1032 2 : var readState *readState
1033 2 : var newIters tableNewIters
1034 2 : var newIterRangeKey keyspanimpl.TableNewSpanIter
1035 2 : if !internalOpts.batch.batchOnly {
1036 2 : // Grab and reference the current readState. This prevents the underlying
1037 2 : // files in the associated version from being deleted if there is a current
1038 2 : // compaction. The readState is unref'd by Iterator.Close().
1039 2 : if internalOpts.snapshot.vers == nil {
1040 2 : if internalOpts.snapshot.readState != nil {
1041 0 : readState = internalOpts.snapshot.readState
1042 0 : readState.ref()
1043 2 : } else {
1044 2 : // NB: loadReadState() calls readState.ref().
1045 2 : readState = d.loadReadState()
1046 2 : }
1047 2 : } else {
1048 2 : // vers != nil
1049 2 : internalOpts.snapshot.vers.Ref()
1050 2 : }
1051 :
1052 : // Determine the seqnum to read at after grabbing the read state (current and
1053 : // memtables) above.
1054 2 : if seqNum == 0 {
1055 2 : seqNum = d.mu.versions.visibleSeqNum.Load()
1056 2 : }
1057 2 : newIters = d.newIters
1058 2 : newIterRangeKey = d.tableNewRangeKeyIter
1059 : }
1060 :
1061 : // Bundle various structures under a single umbrella in order to allocate
1062 : // them together.
1063 2 : buf := iterAllocPool.Get().(*iterAlloc)
1064 2 : dbi := &buf.dbi
1065 2 : *dbi = Iterator{
1066 2 : ctx: ctx,
1067 2 : alloc: buf,
1068 2 : merge: d.merge,
1069 2 : comparer: *d.opts.Comparer,
1070 2 : readState: readState,
1071 2 : version: internalOpts.snapshot.vers,
1072 2 : keyBuf: buf.keyBuf,
1073 2 : prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
1074 2 : boundsBuf: buf.boundsBuf,
1075 2 : batch: batch,
1076 2 : newIters: newIters,
1077 2 : newIterRangeKey: newIterRangeKey,
1078 2 : seqNum: seqNum,
1079 2 : batchOnlyIter: internalOpts.batch.batchOnly,
1080 2 : }
1081 2 : if o != nil {
1082 2 : dbi.opts = *o
1083 2 : dbi.processBounds(o.LowerBound, o.UpperBound)
1084 2 : }
1085 2 : dbi.opts.logger = d.opts.Logger
1086 2 : if d.opts.private.disableLazyCombinedIteration {
1087 2 : dbi.opts.disableLazyCombinedIteration = true
1088 2 : }
1089 2 : if batch != nil {
1090 2 : dbi.batchSeqNum = dbi.batch.nextSeqNum()
1091 2 : }
1092 2 : return finishInitializingIter(ctx, buf)
1093 : }
1094 :
1095 : // finishInitializingIter is a helper for doing the non-trivial initialization
1096 : // of an Iterator. It's invoked to perform the initial initialization of an
1097 : // Iterator during NewIter or Clone, and to perform reinitialization due to a
1098 : // change in IterOptions by a call to Iterator.SetOptions.
1099 2 : func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
1100 2 : // Short-hand.
1101 2 : dbi := &buf.dbi
1102 2 : var memtables flushableList
1103 2 : if dbi.readState != nil {
1104 2 : memtables = dbi.readState.memtables
1105 2 : }
1106 2 : if dbi.opts.OnlyReadGuaranteedDurable {
1107 1 : memtables = nil
1108 2 : } else {
1109 2 : // We only need to read from memtables which contain sequence numbers older
1110 2 : // than seqNum. Trim off newer memtables.
1111 2 : for i := len(memtables) - 1; i >= 0; i-- {
1112 2 : if logSeqNum := memtables[i].logSeqNum; logSeqNum < dbi.seqNum {
1113 2 : break
1114 : }
1115 2 : memtables = memtables[:i]
1116 : }
1117 : }
1118 :
1119 2 : if dbi.opts.pointKeys() {
1120 2 : // Construct the point iterator, initializing dbi.pointIter to point to
1121 2 : // dbi.merging. If this is called during a SetOptions call and this
1122 2 : // Iterator has already initialized dbi.merging, constructPointIter is a
1123 2 : // noop and an initialized pointIter already exists in dbi.pointIter.
1124 2 : dbi.constructPointIter(ctx, memtables, buf)
1125 2 : dbi.iter = dbi.pointIter
1126 2 : } else {
1127 2 : dbi.iter = emptyIter
1128 2 : }
1129 :
1130 2 : if dbi.opts.rangeKeys() {
1131 2 : dbi.rangeKeyMasking.init(dbi, dbi.comparer.Compare, dbi.comparer.Split)
1132 2 :
1133 2 : // When iterating over both point and range keys, don't create the
1134 2 : // range-key iterator stack immediately if we can avoid it. This
1135 2 : // optimization takes advantage of the expected sparseness of range
1136 2 : // keys, and configures the point-key iterator to dynamically switch to
1137 2 : // combined iteration when it observes a file containing range keys.
1138 2 : //
1139 2 : // Lazy combined iteration is not possible if a batch or a memtable
1140 2 : // contains any range keys.
1141 2 : useLazyCombinedIteration := dbi.rangeKey == nil &&
1142 2 : dbi.opts.KeyTypes == IterKeyTypePointsAndRanges &&
1143 2 : (dbi.batch == nil || dbi.batch.countRangeKeys == 0) &&
1144 2 : !dbi.opts.disableLazyCombinedIteration
1145 2 : if useLazyCombinedIteration {
1146 2 : // The user requested combined iteration, and there's no indexed
1147 2 : // batch currently containing range keys that would prevent lazy
1148 2 : // combined iteration. Check the memtables to see if they contain
1149 2 : // any range keys.
1150 2 : for i := range memtables {
1151 2 : if memtables[i].containsRangeKeys() {
1152 2 : useLazyCombinedIteration = false
1153 2 : break
1154 : }
1155 : }
1156 : }
1157 :
1158 2 : if useLazyCombinedIteration {
1159 2 : dbi.lazyCombinedIter = lazyCombinedIter{
1160 2 : parent: dbi,
1161 2 : pointIter: dbi.pointIter,
1162 2 : combinedIterState: combinedIterState{
1163 2 : initialized: false,
1164 2 : },
1165 2 : }
1166 2 : dbi.iter = &dbi.lazyCombinedIter
1167 2 : dbi.iter = invalidating.MaybeWrapIfInvariants(dbi.iter)
1168 2 : } else {
1169 2 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{
1170 2 : initialized: true,
1171 2 : }
1172 2 : if dbi.rangeKey == nil {
1173 2 : dbi.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1174 2 : dbi.rangeKey.init(dbi.comparer.Compare, dbi.comparer.Split, &dbi.opts)
1175 2 : dbi.constructRangeKeyIter()
1176 2 : } else {
1177 2 : dbi.rangeKey.iterConfig.SetBounds(dbi.opts.LowerBound, dbi.opts.UpperBound)
1178 2 : }
1179 :
1180 : // Wrap the point iterator (currently dbi.iter) with an interleaving
1181 : // iterator that interleaves range keys pulled from
1182 : // dbi.rangeKey.rangeKeyIter.
1183 : //
1184 : // NB: The interleaving iterator is always reinitialized, even if
1185 : // dbi already had an initialized range key iterator, in case the point
1186 : // iterator changed or the range key masking suffix changed.
1187 2 : dbi.rangeKey.iiter.Init(&dbi.comparer, dbi.iter, dbi.rangeKey.rangeKeyIter,
1188 2 : keyspan.InterleavingIterOpts{
1189 2 : Mask: &dbi.rangeKeyMasking,
1190 2 : LowerBound: dbi.opts.LowerBound,
1191 2 : UpperBound: dbi.opts.UpperBound,
1192 2 : })
1193 2 : dbi.iter = &dbi.rangeKey.iiter
1194 : }
1195 2 : } else {
1196 2 : // !dbi.opts.rangeKeys()
1197 2 : //
1198 2 : // Reset the combined iterator state. The initialized=true ensures the
1199 2 : // iterator doesn't unnecessarily try to switch to combined iteration.
1200 2 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{initialized: true}
1201 2 : }
1202 2 : return dbi
1203 : }
1204 :
1205 : // ScanInternal scans all internal keys within the specified bounds, truncating
1206 : // any rangedels and rangekeys to those bounds if they span past them. For use
1207 : // when an external user needs to be aware of all internal keys that make up a
1208 : // key range.
1209 : //
1210 : // Keys deleted by range deletions must not be returned or exposed by this
1211 : // method, while the range deletion deleting that key must be exposed using
1212 : // visitRangeDel. Keys that would be masked by range key masking (if an
1213 : // appropriate prefix were set) should be exposed, alongside the range key
1214 : // that would have masked it. This method also collapses all point keys into
1215 : // one InternalKey; so only one internal key at most per user key is returned
1216 : // to visitPointKey.
1217 : //
1218 : // If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
1219 : // mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
1220 : // their metadatas truncated to [lower, upper) and passed into visitSharedFile.
1221 : // ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
1222 : // sstable in L5 or L6 is found that is not in shared storage according to
1223 : // provider.IsShared, or an sstable in those levels contains a newer key than the
1224 : // snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
1225 : // of when this could happen could be if Pebble started writing sstables before a
1226 : // creator ID was set (as creator IDs are necessary to enable shared storage)
1227 : // resulting in some lower level SSTs being on non-shared storage. Skip-shared
1228 : // iteration is invalid in those cases.
1229 : func (d *DB) ScanInternal(
1230 : ctx context.Context,
1231 : categoryAndQoS sstable.CategoryAndQoS,
1232 : lower, upper []byte,
1233 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
1234 : visitRangeDel func(start, end []byte, seqNum uint64) error,
1235 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
1236 : visitSharedFile func(sst *SharedSSTMeta) error,
1237 : visitExternalFile func(sst *ExternalFile) error,
1238 2 : ) error {
1239 2 : scanInternalOpts := &scanInternalOptions{
1240 2 : CategoryAndQoS: categoryAndQoS,
1241 2 : visitPointKey: visitPointKey,
1242 2 : visitRangeDel: visitRangeDel,
1243 2 : visitRangeKey: visitRangeKey,
1244 2 : visitSharedFile: visitSharedFile,
1245 2 : visitExternalFile: visitExternalFile,
1246 2 : IterOptions: IterOptions{
1247 2 : KeyTypes: IterKeyTypePointsAndRanges,
1248 2 : LowerBound: lower,
1249 2 : UpperBound: upper,
1250 2 : },
1251 2 : }
1252 2 : iter, err := d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, scanInternalOpts)
1253 2 : if err != nil {
1254 0 : return err
1255 0 : }
1256 2 : defer iter.close()
1257 2 : return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
1258 : }
1259 :
1260 : // newInternalIter constructs and returns a new scanInternalIterator on this db.
1261 : // If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
1262 : // to the internal iterator.
1263 : //
1264 : // TODO(bilal): This method has a lot of similarities with db.newIter as well as
1265 : // finishInitializingIter. Both pairs of methods should be refactored to reduce
1266 : // this duplication.
1267 : func (d *DB) newInternalIter(
1268 : ctx context.Context, sOpts snapshotIterOpts, o *scanInternalOptions,
1269 2 : ) (*scanInternalIterator, error) {
1270 2 : if err := d.closed.Load(); err != nil {
1271 0 : panic(err)
1272 : }
1273 : // Grab and reference the current readState. This prevents the underlying
1274 : // files in the associated version from being deleted if there is a current
1275 : // compaction. The readState is unref'd by Iterator.Close().
1276 2 : var readState *readState
1277 2 : if sOpts.vers == nil {
1278 2 : if sOpts.readState != nil {
1279 0 : readState = sOpts.readState
1280 0 : readState.ref()
1281 2 : } else {
1282 2 : readState = d.loadReadState()
1283 2 : }
1284 : }
1285 2 : if sOpts.vers != nil {
1286 1 : sOpts.vers.Ref()
1287 1 : }
1288 :
1289 : // Determine the seqnum to read at after grabbing the read state (current and
1290 : // memtables) above.
1291 2 : seqNum := sOpts.seqNum
1292 2 : if seqNum == 0 {
1293 2 : seqNum = d.mu.versions.visibleSeqNum.Load()
1294 2 : }
1295 :
1296 : // Bundle various structures under a single umbrella in order to allocate
1297 : // them together.
1298 2 : buf := iterAllocPool.Get().(*iterAlloc)
1299 2 : dbi := &scanInternalIterator{
1300 2 : ctx: ctx,
1301 2 : db: d,
1302 2 : comparer: d.opts.Comparer,
1303 2 : merge: d.opts.Merger.Merge,
1304 2 : readState: readState,
1305 2 : version: sOpts.vers,
1306 2 : alloc: buf,
1307 2 : newIters: d.newIters,
1308 2 : newIterRangeKey: d.tableNewRangeKeyIter,
1309 2 : seqNum: seqNum,
1310 2 : mergingIter: &buf.merging,
1311 2 : }
1312 2 : dbi.opts = *o
1313 2 : dbi.opts.logger = d.opts.Logger
1314 2 : if d.opts.private.disableLazyCombinedIteration {
1315 1 : dbi.opts.disableLazyCombinedIteration = true
1316 1 : }
1317 2 : return finishInitializingInternalIter(buf, dbi)
1318 : }
1319 :
1320 : func finishInitializingInternalIter(
1321 : buf *iterAlloc, i *scanInternalIterator,
1322 2 : ) (*scanInternalIterator, error) {
1323 2 : // Short-hand.
1324 2 : var memtables flushableList
1325 2 : if i.readState != nil {
1326 2 : memtables = i.readState.memtables
1327 2 : }
1328 : // We only need to read from memtables which contain sequence numbers older
1329 : // than seqNum. Trim off newer memtables.
1330 2 : for j := len(memtables) - 1; j >= 0; j-- {
1331 2 : if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
1332 2 : break
1333 : }
1334 2 : memtables = memtables[:j]
1335 : }
1336 2 : i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
1337 2 :
1338 2 : if err := i.constructPointIter(i.opts.CategoryAndQoS, memtables, buf); err != nil {
1339 0 : return nil, err
1340 0 : }
1341 :
1342 : // For internal iterators, we skip the lazy combined iteration optimization
1343 : // entirely, and create the range key iterator stack directly.
1344 2 : i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1345 2 : i.rangeKey.init(i.comparer.Compare, i.comparer.Split, &i.opts.IterOptions)
1346 2 : if err := i.constructRangeKeyIter(); err != nil {
1347 0 : return nil, err
1348 0 : }
1349 :
1350 : // Wrap the point iterator (currently i.iter) with an interleaving
1351 : // iterator that interleaves range keys pulled from
1352 : // i.rangeKey.rangeKeyIter.
1353 2 : i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
1354 2 : keyspan.InterleavingIterOpts{
1355 2 : LowerBound: i.opts.LowerBound,
1356 2 : UpperBound: i.opts.UpperBound,
1357 2 : })
1358 2 : i.iter = &i.rangeKey.iiter
1359 2 :
1360 2 : return i, nil
1361 : }
1362 :
1363 : func (i *Iterator) constructPointIter(
1364 : ctx context.Context, memtables flushableList, buf *iterAlloc,
1365 2 : ) {
1366 2 : if i.pointIter != nil {
1367 2 : // Already have one.
1368 2 : return
1369 2 : }
1370 2 : internalOpts := internalIterOpts{stats: &i.stats.InternalStats}
1371 2 : if i.opts.RangeKeyMasking.Filter != nil {
1372 2 : internalOpts.boundLimitedFilter = &i.rangeKeyMasking
1373 2 : }
1374 :
1375 : // Merging levels and levels from iterAlloc.
1376 2 : mlevels := buf.mlevels[:0]
1377 2 : levels := buf.levels[:0]
1378 2 :
1379 2 : // We compute the number of levels needed ahead of time and reallocate a slice if
1380 2 : // the array from the iterAlloc isn't large enough. Doing this allocation once
1381 2 : // should improve the performance.
1382 2 : numMergingLevels := 0
1383 2 : numLevelIters := 0
1384 2 : if i.batch != nil {
1385 2 : numMergingLevels++
1386 2 : }
1387 :
1388 2 : var current *version
1389 2 : if !i.batchOnlyIter {
1390 2 : numMergingLevels += len(memtables)
1391 2 :
1392 2 : current = i.version
1393 2 : if current == nil {
1394 2 : current = i.readState.current
1395 2 : }
1396 2 : numMergingLevels += len(current.L0SublevelFiles)
1397 2 : numLevelIters += len(current.L0SublevelFiles)
1398 2 : for level := 1; level < len(current.Levels); level++ {
1399 2 : if current.Levels[level].Empty() {
1400 2 : continue
1401 : }
1402 2 : numMergingLevels++
1403 2 : numLevelIters++
1404 : }
1405 : }
1406 :
1407 2 : if numMergingLevels > cap(mlevels) {
1408 2 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
1409 2 : }
1410 2 : if numLevelIters > cap(levels) {
1411 2 : levels = make([]levelIter, 0, numLevelIters)
1412 2 : }
1413 :
1414 : // Top-level is the batch, if any.
1415 2 : if i.batch != nil {
1416 2 : if i.batch.index == nil {
1417 0 : // This isn't an indexed batch. We shouldn't have gotten this far.
1418 0 : panic(errors.AssertionFailedf("creating an iterator over an unindexed batch"))
1419 2 : } else {
1420 2 : i.batch.initInternalIter(&i.opts, &i.batchPointIter)
1421 2 : i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum)
1422 2 : // Only include the batch's rangedel iterator if it's non-empty.
1423 2 : // This requires some subtle logic in the case a rangedel is later
1424 2 : // written to the batch and the view of the batch is refreshed
1425 2 : // during a call to SetOptions—in this case, we need to reconstruct
1426 2 : // the point iterator to add the batch rangedel iterator.
1427 2 : var rangeDelIter keyspan.FragmentIterator
1428 2 : if i.batchRangeDelIter.Count() > 0 {
1429 2 : rangeDelIter = &i.batchRangeDelIter
1430 2 : }
1431 2 : mlevels = append(mlevels, mergingIterLevel{
1432 2 : iter: &i.batchPointIter,
1433 2 : rangeDelIter: rangeDelIter,
1434 2 : })
1435 : }
1436 : }
1437 :
1438 2 : if !i.batchOnlyIter {
1439 2 : // Next are the memtables.
1440 2 : for j := len(memtables) - 1; j >= 0; j-- {
1441 2 : mem := memtables[j]
1442 2 : mlevels = append(mlevels, mergingIterLevel{
1443 2 : iter: mem.newIter(&i.opts),
1444 2 : rangeDelIter: mem.newRangeDelIter(&i.opts),
1445 2 : })
1446 2 : }
1447 :
1448 : // Next are the file levels: L0 sub-levels followed by lower levels.
1449 2 : mlevelsIndex := len(mlevels)
1450 2 : levelsIndex := len(levels)
1451 2 : mlevels = mlevels[:numMergingLevels]
1452 2 : levels = levels[:numLevelIters]
1453 2 : i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
1454 2 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
1455 2 : li := &levels[levelsIndex]
1456 2 :
1457 2 : li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
1458 2 : li.initRangeDel(&mlevels[mlevelsIndex].rangeDelIter)
1459 2 : li.initBoundaryContext(&mlevels[mlevelsIndex].levelIterBoundaryContext)
1460 2 : li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
1461 2 : mlevels[mlevelsIndex].levelIter = li
1462 2 : mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
1463 2 :
1464 2 : levelsIndex++
1465 2 : mlevelsIndex++
1466 2 : }
1467 :
1468 : // Add level iterators for the L0 sublevels, iterating from newest to
1469 : // oldest.
1470 2 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
1471 2 : addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
1472 2 : }
1473 :
1474 : // Add level iterators for the non-empty non-L0 levels.
1475 2 : for level := 1; level < len(current.Levels); level++ {
1476 2 : if current.Levels[level].Empty() {
1477 2 : continue
1478 : }
1479 2 : addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
1480 : }
1481 : }
1482 2 : buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
1483 2 : if len(mlevels) <= cap(buf.levelsPositioned) {
1484 2 : buf.merging.levelsPositioned = buf.levelsPositioned[:len(mlevels)]
1485 2 : }
1486 2 : buf.merging.snapshot = i.seqNum
1487 2 : buf.merging.batchSnapshot = i.batchSeqNum
1488 2 : buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
1489 2 : i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging).(topLevelIterator)
1490 2 : i.merging = &buf.merging
1491 : }
1492 :
1493 : // NewBatch returns a new empty write-only batch. Any reads on the batch will
1494 : // return an error. If the batch is committed it will be applied to the DB.
1495 2 : func (d *DB) NewBatch() *Batch {
1496 2 : return newBatch(d)
1497 2 : }
1498 :
1499 : // NewBatchWithSize is mostly identical to NewBatch, but it will allocate the
1500 : // the specified memory space for the internal slice in advance.
1501 0 : func (d *DB) NewBatchWithSize(size int) *Batch {
1502 0 : return newBatchWithSize(d, size)
1503 0 : }
1504 :
1505 : // NewIndexedBatch returns a new empty read-write batch. Any reads on the batch
1506 : // will read from both the batch and the DB. If the batch is committed it will
1507 : // be applied to the DB. An indexed batch is slower that a non-indexed batch
1508 : // for insert operations. If you do not need to perform reads on the batch, use
1509 : // NewBatch instead.
1510 2 : func (d *DB) NewIndexedBatch() *Batch {
1511 2 : return newIndexedBatch(d, d.opts.Comparer)
1512 2 : }
1513 :
1514 : // NewIndexedBatchWithSize is mostly identical to NewIndexedBatch, but it will
1515 : // allocate the the specified memory space for the internal slice in advance.
1516 0 : func (d *DB) NewIndexedBatchWithSize(size int) *Batch {
1517 0 : return newIndexedBatchWithSize(d, d.opts.Comparer, size)
1518 0 : }
1519 :
1520 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
1521 : // return false). The iterator can be positioned via a call to SeekGE, SeekLT,
1522 : // First or Last. The iterator provides a point-in-time view of the current DB
1523 : // state. This view is maintained by preventing file deletions and preventing
1524 : // memtables referenced by the iterator from being deleted. Using an iterator
1525 : // to maintain a long-lived point-in-time view of the DB state can lead to an
1526 : // apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for
1527 : // point-in-time snapshots which avoids these problems.
1528 2 : func (d *DB) NewIter(o *IterOptions) (*Iterator, error) {
1529 2 : return d.NewIterWithContext(context.Background(), o)
1530 2 : }
1531 :
1532 : // NewIterWithContext is like NewIter, and additionally accepts a context for
1533 : // tracing.
1534 2 : func (d *DB) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
1535 2 : return d.newIter(ctx, nil /* batch */, newIterOpts{}, o), nil
1536 2 : }
1537 :
1538 : // NewSnapshot returns a point-in-time view of the current DB state. Iterators
1539 : // created with this handle will all observe a stable snapshot of the current
1540 : // DB state. The caller must call Snapshot.Close() when the snapshot is no
1541 : // longer needed. Snapshots are not persisted across DB restarts (close ->
1542 : // open). Unlike the implicit snapshot maintained by an iterator, a snapshot
1543 : // will not prevent memtables from being released or sstables from being
1544 : // deleted. Instead, a snapshot prevents deletion of sequence numbers
1545 : // referenced by the snapshot.
1546 2 : func (d *DB) NewSnapshot() *Snapshot {
1547 2 : if err := d.closed.Load(); err != nil {
1548 1 : panic(err)
1549 : }
1550 :
1551 2 : d.mu.Lock()
1552 2 : s := &Snapshot{
1553 2 : db: d,
1554 2 : seqNum: d.mu.versions.visibleSeqNum.Load(),
1555 2 : }
1556 2 : d.mu.snapshots.pushBack(s)
1557 2 : d.mu.Unlock()
1558 2 : return s
1559 : }
1560 :
1561 : // NewEventuallyFileOnlySnapshot returns a point-in-time view of the current DB
1562 : // state, similar to NewSnapshot, but with consistency constrained to the
1563 : // provided set of key ranges. See the comment at EventuallyFileOnlySnapshot for
1564 : // its semantics.
1565 2 : func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
1566 2 : if err := d.closed.Load(); err != nil {
1567 0 : panic(err)
1568 : }
1569 2 : for i := range keyRanges {
1570 2 : if i > 0 && d.cmp(keyRanges[i-1].End, keyRanges[i].Start) > 0 {
1571 0 : panic("pebble: key ranges for eventually-file-only-snapshot not in order")
1572 : }
1573 : }
1574 2 : return d.makeEventuallyFileOnlySnapshot(keyRanges)
1575 : }
1576 :
1577 : // Close closes the DB.
1578 : //
1579 : // It is not safe to close a DB until all outstanding iterators are closed
1580 : // or to call Close concurrently with any other DB method. It is not valid
1581 : // to call any of a DB's methods after the DB has been closed.
1582 2 : func (d *DB) Close() error {
1583 2 : // Lock the commit pipeline for the duration of Close. This prevents a race
1584 2 : // with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires
1585 2 : // dropping d.mu several times for I/O. If Close only holds d.mu, an
1586 2 : // in-progress WAL rotation may re-acquire d.mu only once the database is
1587 2 : // closed.
1588 2 : //
1589 2 : // Additionally, locking the commit pipeline makes it more likely that
1590 2 : // (illegal) concurrent writes will observe d.closed.Load() != nil, creating
1591 2 : // more understable panics if the database is improperly used concurrently
1592 2 : // during Close.
1593 2 : d.commit.mu.Lock()
1594 2 : defer d.commit.mu.Unlock()
1595 2 : d.mu.Lock()
1596 2 : defer d.mu.Unlock()
1597 2 : if err := d.closed.Load(); err != nil {
1598 1 : panic(err)
1599 : }
1600 :
1601 : // Clear the finalizer that is used to check that an unreferenced DB has been
1602 : // closed. We're closing the DB here, so the check performed by that
1603 : // finalizer isn't necessary.
1604 : //
1605 : // Note: this is a no-op if invariants are disabled or race is enabled.
1606 2 : invariants.SetFinalizer(d.closed, nil)
1607 2 :
1608 2 : d.closed.Store(errors.WithStack(ErrClosed))
1609 2 : close(d.closedCh)
1610 2 :
1611 2 : defer d.opts.Cache.Unref()
1612 2 :
1613 2 : for d.mu.compact.compactingCount > 0 || d.mu.compact.flushing {
1614 2 : d.mu.compact.cond.Wait()
1615 2 : }
1616 2 : for d.mu.tableStats.loading {
1617 2 : d.mu.tableStats.cond.Wait()
1618 2 : }
1619 2 : for d.mu.tableValidation.validating {
1620 1 : d.mu.tableValidation.cond.Wait()
1621 1 : }
1622 :
1623 2 : var err error
1624 2 : if n := len(d.mu.compact.inProgress); n > 0 {
1625 1 : err = errors.Errorf("pebble: %d unexpected in-progress compactions", errors.Safe(n))
1626 1 : }
1627 2 : err = firstError(err, d.mu.formatVers.marker.Close())
1628 2 : err = firstError(err, d.tableCache.close())
1629 2 : if !d.opts.ReadOnly {
1630 2 : if d.mu.log.writer != nil {
1631 2 : _, err2 := d.mu.log.writer.Close()
1632 2 : err = firstError(err, err2)
1633 2 : }
1634 1 : } else if d.mu.log.writer != nil {
1635 0 : panic("pebble: log-writer should be nil in read-only mode")
1636 : }
1637 2 : err = firstError(err, d.mu.log.manager.Close())
1638 2 : err = firstError(err, d.fileLock.Close())
1639 2 :
1640 2 : // Note that versionSet.close() only closes the MANIFEST. The versions list
1641 2 : // is still valid for the checks below.
1642 2 : err = firstError(err, d.mu.versions.close())
1643 2 :
1644 2 : err = firstError(err, d.dataDir.Close())
1645 2 :
1646 2 : d.readState.val.unrefLocked()
1647 2 :
1648 2 : current := d.mu.versions.currentVersion()
1649 2 : for v := d.mu.versions.versions.Front(); true; v = v.Next() {
1650 2 : refs := v.Refs()
1651 2 : if v == current {
1652 2 : if refs != 1 {
1653 1 : err = firstError(err, errors.Errorf("leaked iterators: current\n%s", v))
1654 1 : }
1655 2 : break
1656 : }
1657 0 : if refs != 0 {
1658 0 : err = firstError(err, errors.Errorf("leaked iterators:\n%s", v))
1659 0 : }
1660 : }
1661 :
1662 2 : for _, mem := range d.mu.mem.queue {
1663 2 : // Usually, we'd want to delete the files returned by readerUnref. But
1664 2 : // in this case, even if we're unreferencing the flushables, the
1665 2 : // flushables aren't obsolete. They will be reconstructed during WAL
1666 2 : // replay.
1667 2 : mem.readerUnrefLocked(false)
1668 2 : }
1669 : // If there's an unused, recycled memtable, we need to release its memory.
1670 2 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
1671 2 : d.freeMemTable(obsoleteMemTable)
1672 2 : }
1673 2 : if reserved := d.memTableReserved.Load(); reserved != 0 {
1674 1 : err = firstError(err, errors.Errorf("leaked memtable reservation: %d", errors.Safe(reserved)))
1675 1 : }
1676 :
1677 : // Since we called d.readState.val.unrefLocked() above, we are expected to
1678 : // manually schedule deletion of obsolete files.
1679 2 : if len(d.mu.versions.obsoleteTables) > 0 {
1680 2 : d.deleteObsoleteFiles(d.mu.nextJobID)
1681 2 : }
1682 :
1683 2 : d.mu.Unlock()
1684 2 : d.compactionSchedulers.Wait()
1685 2 :
1686 2 : // Wait for all cleaning jobs to finish.
1687 2 : d.cleanupManager.Close()
1688 2 :
1689 2 : // Sanity check metrics.
1690 2 : if invariants.Enabled {
1691 2 : m := d.Metrics()
1692 2 : if m.Compact.NumInProgress > 0 || m.Compact.InProgressBytes > 0 {
1693 0 : d.mu.Lock()
1694 0 : panic(fmt.Sprintf("invalid metrics on close:\n%s", m))
1695 : }
1696 : }
1697 :
1698 2 : d.mu.Lock()
1699 2 :
1700 2 : // As a sanity check, ensure that there are no zombie tables. A non-zero count
1701 2 : // hints at a reference count leak.
1702 2 : if ztbls := len(d.mu.versions.zombieTables); ztbls > 0 {
1703 0 : err = firstError(err, errors.Errorf("non-zero zombie file count: %d", ztbls))
1704 0 : }
1705 :
1706 2 : err = firstError(err, d.objProvider.Close())
1707 2 :
1708 2 : // If the options include a closer to 'close' the filesystem, close it.
1709 2 : if d.opts.private.fsCloser != nil {
1710 2 : d.opts.private.fsCloser.Close()
1711 2 : }
1712 :
1713 : // Return an error if the user failed to close all open snapshots.
1714 2 : if v := d.mu.snapshots.count(); v > 0 {
1715 0 : err = firstError(err, errors.Errorf("leaked snapshots: %d open snapshots on DB %p", v, d))
1716 0 : }
1717 :
1718 2 : return err
1719 : }
1720 :
1721 : // Compact the specified range of keys in the database.
1722 2 : func (d *DB) Compact(start, end []byte, parallelize bool) error {
1723 2 : if err := d.closed.Load(); err != nil {
1724 1 : panic(err)
1725 : }
1726 2 : if d.opts.ReadOnly {
1727 1 : return ErrReadOnly
1728 1 : }
1729 2 : if d.cmp(start, end) >= 0 {
1730 2 : return errors.Errorf("Compact start %s is not less than end %s",
1731 2 : d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
1732 2 : }
1733 :
1734 2 : d.mu.Lock()
1735 2 : maxLevelWithFiles := 1
1736 2 : cur := d.mu.versions.currentVersion()
1737 2 : for level := 0; level < numLevels; level++ {
1738 2 : overlaps := cur.Overlaps(level, start, end, false)
1739 2 : if !overlaps.Empty() {
1740 2 : maxLevelWithFiles = level + 1
1741 2 : }
1742 : }
1743 :
1744 : // Determine if any memtable overlaps with the compaction range. We wait for
1745 : // any such overlap to flush (initiating a flush if necessary).
1746 2 : mem, err := func() (*flushableEntry, error) {
1747 2 : // Check to see if any files overlap with any of the memtables. The queue
1748 2 : // is ordered from oldest to newest with the mutable memtable being the
1749 2 : // last element in the slice. We want to wait for the newest table that
1750 2 : // overlaps.
1751 2 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1752 2 : mem := d.mu.mem.queue[i]
1753 2 : var anyOverlaps bool
1754 2 : mem.computePossibleOverlaps(func(b bounded) shouldContinue {
1755 2 : anyOverlaps = true
1756 2 : return stopIteration
1757 2 : }, KeyRange{Start: start, End: end})
1758 2 : if !anyOverlaps {
1759 2 : continue
1760 : }
1761 2 : var err error
1762 2 : if mem.flushable == d.mu.mem.mutable {
1763 2 : // We have to hold both commitPipeline.mu and DB.mu when calling
1764 2 : // makeRoomForWrite(). Lock order requirements elsewhere force us to
1765 2 : // unlock DB.mu in order to grab commitPipeline.mu first.
1766 2 : d.mu.Unlock()
1767 2 : d.commit.mu.Lock()
1768 2 : d.mu.Lock()
1769 2 : defer d.commit.mu.Unlock()
1770 2 : if mem.flushable == d.mu.mem.mutable {
1771 2 : // Only flush if the active memtable is unchanged.
1772 2 : err = d.makeRoomForWrite(nil)
1773 2 : }
1774 : }
1775 2 : mem.flushForced = true
1776 2 : d.maybeScheduleFlush()
1777 2 : return mem, err
1778 : }
1779 2 : return nil, nil
1780 : }()
1781 :
1782 2 : d.mu.Unlock()
1783 2 :
1784 2 : if err != nil {
1785 0 : return err
1786 0 : }
1787 2 : if mem != nil {
1788 2 : <-mem.flushed
1789 2 : }
1790 :
1791 2 : for level := 0; level < maxLevelWithFiles; {
1792 2 : for {
1793 2 : if err := d.manualCompact(
1794 2 : start, end, level, parallelize); err != nil {
1795 1 : if errors.Is(err, ErrCancelledCompaction) {
1796 1 : continue
1797 : }
1798 1 : return err
1799 : }
1800 2 : break
1801 : }
1802 2 : level++
1803 2 : if level == numLevels-1 {
1804 2 : // A manual compaction of the bottommost level occurred.
1805 2 : // There is no next level to try and compact.
1806 2 : break
1807 : }
1808 : }
1809 2 : return nil
1810 : }
1811 :
1812 2 : func (d *DB) manualCompact(start, end []byte, level int, parallelize bool) error {
1813 2 : d.mu.Lock()
1814 2 : curr := d.mu.versions.currentVersion()
1815 2 : files := curr.Overlaps(level, start, end, false)
1816 2 : if files.Empty() {
1817 2 : d.mu.Unlock()
1818 2 : return nil
1819 2 : }
1820 :
1821 2 : var compactions []*manualCompaction
1822 2 : if parallelize {
1823 2 : compactions = append(compactions, d.splitManualCompaction(start, end, level)...)
1824 2 : } else {
1825 2 : compactions = append(compactions, &manualCompaction{
1826 2 : level: level,
1827 2 : done: make(chan error, 1),
1828 2 : start: start,
1829 2 : end: end,
1830 2 : })
1831 2 : }
1832 2 : d.mu.compact.manual = append(d.mu.compact.manual, compactions...)
1833 2 : d.maybeScheduleCompaction()
1834 2 : d.mu.Unlock()
1835 2 :
1836 2 : // Each of the channels is guaranteed to be eventually sent to once. After a
1837 2 : // compaction is possibly picked in d.maybeScheduleCompaction(), either the
1838 2 : // compaction is dropped, executed after being scheduled, or retried later.
1839 2 : // Assuming eventual progress when a compaction is retried, all outcomes send
1840 2 : // a value to the done channel. Since the channels are buffered, it is not
1841 2 : // necessary to read from each channel, and so we can exit early in the event
1842 2 : // of an error.
1843 2 : for _, compaction := range compactions {
1844 2 : if err := <-compaction.done; err != nil {
1845 1 : return err
1846 1 : }
1847 : }
1848 2 : return nil
1849 : }
1850 :
1851 : // splitManualCompaction splits a manual compaction over [start,end] on level
1852 : // such that the resulting compactions have no key overlap.
1853 : func (d *DB) splitManualCompaction(
1854 : start, end []byte, level int,
1855 2 : ) (splitCompactions []*manualCompaction) {
1856 2 : curr := d.mu.versions.currentVersion()
1857 2 : endLevel := level + 1
1858 2 : baseLevel := d.mu.versions.picker.getBaseLevel()
1859 2 : if level == 0 {
1860 2 : endLevel = baseLevel
1861 2 : }
1862 2 : keyRanges := curr.CalculateInuseKeyRanges(level, endLevel, start, end)
1863 2 : for _, keyRange := range keyRanges {
1864 2 : splitCompactions = append(splitCompactions, &manualCompaction{
1865 2 : level: level,
1866 2 : done: make(chan error, 1),
1867 2 : start: keyRange.Start,
1868 2 : end: keyRange.End,
1869 2 : split: true,
1870 2 : })
1871 2 : }
1872 2 : return splitCompactions
1873 : }
1874 :
1875 : // DownloadSpan is a key range passed to the Download method.
1876 : type DownloadSpan struct {
1877 : StartKey []byte
1878 : // EndKey is exclusive.
1879 : EndKey []byte
1880 : // ViaBackingFileDownload, if true, indicates the span should be downloaded by
1881 : // downloading any remote backing files byte-for-byte and replacing them with
1882 : // the downloaded local files, while otherwise leaving the virtual SSTables
1883 : // as-is. If false, a "normal" rewriting compaction of the span, that iterates
1884 : // the keys and produces a new SSTable, is used instead. Downloading raw files
1885 : // can be faster when the whole file is being downloaded, as it avoids some
1886 : // cpu-intensive steps involved in iteration and new file construction such as
1887 : // compression, however it can also be wasteful when only a small portion of a
1888 : // larger backing file is being used by a virtual file. Additionally, if the
1889 : // virtual file has expensive read-time transformations, such as prefix
1890 : // replacement, rewriting once can persist the result of these for future use
1891 : // while copying only the backing file will obligate future reads to continue
1892 : // to compute such transforms.
1893 : ViaBackingFileDownload bool
1894 : }
1895 :
1896 1 : func (d *DB) downloadSpan(ctx context.Context, span DownloadSpan) error {
1897 1 : dSpan := &downloadSpan{
1898 1 : start: span.StartKey,
1899 1 : end: span.EndKey,
1900 1 : // Protected by d.mu.
1901 1 : doneChans: make([]chan error, 1),
1902 1 : kind: compactionKindRewrite,
1903 1 : }
1904 1 : if span.ViaBackingFileDownload {
1905 1 : dSpan.kind = compactionKindCopy
1906 1 : }
1907 1 : dSpan.doneChans[0] = make(chan error, 1)
1908 1 : doneChan := dSpan.doneChans[0]
1909 1 : compactionIdx := 0
1910 1 :
1911 1 : func() {
1912 1 : d.mu.Lock()
1913 1 : defer d.mu.Unlock()
1914 1 :
1915 1 : d.mu.compact.downloads = append(d.mu.compact.downloads, dSpan)
1916 1 : d.maybeScheduleCompaction()
1917 1 : }()
1918 :
1919 : // Requires d.mu to be held.
1920 1 : noExternalFilesInSpan := func() (noExternalFiles bool) {
1921 1 : vers := d.mu.versions.currentVersion()
1922 1 :
1923 1 : for i := 0; i < len(vers.Levels); i++ {
1924 1 : if vers.Levels[i].Empty() {
1925 1 : continue
1926 : }
1927 1 : overlap := vers.Overlaps(i, span.StartKey, span.EndKey, true /* exclusiveEnd */)
1928 1 : foundExternalFile := false
1929 1 : overlap.Each(func(metadata *manifest.FileMetadata) {
1930 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, metadata.FileBacking.DiskFileNum)
1931 1 : if err != nil {
1932 0 : return
1933 0 : }
1934 1 : if objMeta.IsExternal() {
1935 1 : foundExternalFile = true
1936 1 : }
1937 : })
1938 1 : if foundExternalFile {
1939 1 : return false
1940 1 : }
1941 : }
1942 1 : return true
1943 : }
1944 :
1945 : // Requires d.mu to be held.
1946 1 : removeUsFromList := func() {
1947 1 : // Check where we are in d.mu.compact.downloads. Remove us from the
1948 1 : // list.
1949 1 : for i := range d.mu.compact.downloads {
1950 0 : if d.mu.compact.downloads[i] != dSpan {
1951 0 : continue
1952 : }
1953 0 : copy(d.mu.compact.downloads[i:], d.mu.compact.downloads[i+1:])
1954 0 : d.mu.compact.downloads = d.mu.compact.downloads[:len(d.mu.compact.downloads)-1]
1955 0 : break
1956 : }
1957 : }
1958 :
1959 1 : for {
1960 1 : select {
1961 0 : case <-ctx.Done():
1962 0 : d.mu.Lock()
1963 0 : defer d.mu.Unlock()
1964 0 : removeUsFromList()
1965 0 : return ctx.Err()
1966 1 : case err := <-doneChan:
1967 1 : if err != nil {
1968 0 : d.mu.Lock()
1969 0 : defer d.mu.Unlock()
1970 0 : removeUsFromList()
1971 0 : return err
1972 0 : }
1973 1 : compactionIdx++
1974 1 : // Grab the next doneCh to wait on.
1975 1 : func() {
1976 1 : d.mu.Lock()
1977 1 : defer d.mu.Unlock()
1978 1 : doneChan = dSpan.doneChans[compactionIdx]
1979 1 : }()
1980 1 : default:
1981 1 : doneSpan := func() bool {
1982 1 : d.mu.Lock()
1983 1 : defer d.mu.Unlock()
1984 1 : // It's possible to have downloaded all files without writing to any
1985 1 : // doneChans. This is expected if there are a significant amount
1986 1 : // of overlapping writes that schedule regular, non-download compactions.
1987 1 : if noExternalFilesInSpan() {
1988 1 : removeUsFromList()
1989 1 : return true
1990 1 : }
1991 1 : d.maybeScheduleCompaction()
1992 1 : if d.mu.compact.compactingCount == 0 {
1993 0 : // No compactions were scheduled above. Waiting on the cond lock below
1994 0 : // could possibly lead to a forever-wait. Return true if the db is
1995 0 : // closed so we exit out of this method.
1996 0 : return d.closed.Load() != nil
1997 0 : }
1998 1 : d.mu.compact.cond.Wait()
1999 1 : return false
2000 : }()
2001 1 : if doneSpan {
2002 1 : return nil
2003 1 : }
2004 : }
2005 : }
2006 : }
2007 :
2008 : // Download ensures that the LSM does not use any external sstables for the
2009 : // given key ranges. It does so by performing appropriate compactions so that
2010 : // all external data becomes available locally.
2011 : //
2012 : // Note that calling this method does not imply that all other compactions stop;
2013 : // it simply informs Pebble of a list of spans for which external data should be
2014 : // downloaded with high priority.
2015 : //
2016 : // The method returns once no external sstasbles overlap the given spans, the
2017 : // context is canceled, the db is closed, or an error is hit.
2018 : //
2019 : // TODO(radu): consider passing a priority/impact knob to express how important
2020 : // the download is (versus live traffic performance, LSM health).
2021 1 : func (d *DB) Download(ctx context.Context, spans []DownloadSpan) error {
2022 1 : ctx, cancel := context.WithCancel(ctx)
2023 1 : defer cancel()
2024 1 : if err := d.closed.Load(); err != nil {
2025 0 : panic(err)
2026 : }
2027 1 : if d.opts.ReadOnly {
2028 0 : return ErrReadOnly
2029 0 : }
2030 1 : for i := range spans {
2031 1 : if err := ctx.Err(); err != nil {
2032 0 : return err
2033 0 : }
2034 1 : if err := d.downloadSpan(ctx, spans[i]); err != nil {
2035 0 : return err
2036 0 : }
2037 : }
2038 1 : return nil
2039 : }
2040 :
2041 : // Flush the memtable to stable storage.
2042 2 : func (d *DB) Flush() error {
2043 2 : flushDone, err := d.AsyncFlush()
2044 2 : if err != nil {
2045 1 : return err
2046 1 : }
2047 2 : <-flushDone
2048 2 : return nil
2049 : }
2050 :
2051 : // AsyncFlush asynchronously flushes the memtable to stable storage.
2052 : //
2053 : // If no error is returned, the caller can receive from the returned channel in
2054 : // order to wait for the flush to complete.
2055 2 : func (d *DB) AsyncFlush() (<-chan struct{}, error) {
2056 2 : if err := d.closed.Load(); err != nil {
2057 1 : panic(err)
2058 : }
2059 2 : if d.opts.ReadOnly {
2060 1 : return nil, ErrReadOnly
2061 1 : }
2062 :
2063 2 : d.commit.mu.Lock()
2064 2 : defer d.commit.mu.Unlock()
2065 2 : d.mu.Lock()
2066 2 : defer d.mu.Unlock()
2067 2 : flushed := d.mu.mem.queue[len(d.mu.mem.queue)-1].flushed
2068 2 : err := d.makeRoomForWrite(nil)
2069 2 : if err != nil {
2070 0 : return nil, err
2071 0 : }
2072 2 : return flushed, nil
2073 : }
2074 :
2075 : // Metrics returns metrics about the database.
2076 2 : func (d *DB) Metrics() *Metrics {
2077 2 : metrics := &Metrics{}
2078 2 : walStats := d.mu.log.manager.Stats()
2079 2 :
2080 2 : d.mu.Lock()
2081 2 : vers := d.mu.versions.currentVersion()
2082 2 : *metrics = d.mu.versions.metrics
2083 2 : metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt(0)
2084 2 : metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load()
2085 2 : metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount)
2086 2 : metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction
2087 2 : metrics.Compact.Duration = d.mu.compact.duration
2088 2 : for c := range d.mu.compact.inProgress {
2089 1 : if c.kind != compactionKindFlush {
2090 1 : metrics.Compact.Duration += d.timeNow().Sub(c.beganAt)
2091 1 : }
2092 : }
2093 :
2094 2 : for _, m := range d.mu.mem.queue {
2095 2 : metrics.MemTable.Size += m.totalBytes()
2096 2 : }
2097 2 : metrics.Snapshots.Count = d.mu.snapshots.count()
2098 2 : if metrics.Snapshots.Count > 0 {
2099 0 : metrics.Snapshots.EarliestSeqNum = d.mu.snapshots.earliest()
2100 0 : }
2101 2 : metrics.Snapshots.PinnedKeys = d.mu.snapshots.cumulativePinnedCount
2102 2 : metrics.Snapshots.PinnedSize = d.mu.snapshots.cumulativePinnedSize
2103 2 : metrics.MemTable.Count = int64(len(d.mu.mem.queue))
2104 2 : metrics.MemTable.ZombieCount = d.memTableCount.Load() - metrics.MemTable.Count
2105 2 : metrics.MemTable.ZombieSize = uint64(d.memTableReserved.Load()) - metrics.MemTable.Size
2106 2 : metrics.WAL.ObsoleteFiles = int64(walStats.ObsoleteFileCount)
2107 2 : metrics.WAL.ObsoletePhysicalSize = walStats.ObsoleteFileSize
2108 2 : metrics.WAL.Files = int64(walStats.LiveFileCount)
2109 2 : // The current WAL's size (d.logSize) is the logical size, which may be less
2110 2 : // than the WAL's physical size if it was recycled. walStats.LiveFileSize
2111 2 : // includes the physical size of all live WALs, but for the current WAL it
2112 2 : // reflects the physical size when it was opened. So it is possible that
2113 2 : // d.atomic.logSize has exceeded that physical size. We allow for this
2114 2 : // anomaly.
2115 2 : metrics.WAL.PhysicalSize = walStats.LiveFileSize
2116 2 : metrics.WAL.BytesIn = d.mu.log.bytesIn // protected by d.mu
2117 2 : metrics.WAL.Size = d.logSize.Load()
2118 2 : for i, n := 0, len(d.mu.mem.queue)-1; i < n; i++ {
2119 2 : metrics.WAL.Size += d.mu.mem.queue[i].logSize
2120 2 : }
2121 2 : metrics.WAL.BytesWritten = metrics.Levels[0].BytesIn + metrics.WAL.Size
2122 2 : metrics.WAL.Failover = walStats.Failover
2123 2 :
2124 2 : if p := d.mu.versions.picker; p != nil {
2125 2 : compactions := d.getInProgressCompactionInfoLocked(nil)
2126 2 : for level, score := range p.getScores(compactions) {
2127 2 : metrics.Levels[level].Score = score
2128 2 : }
2129 : }
2130 2 : metrics.Table.ZombieCount = int64(len(d.mu.versions.zombieTables))
2131 2 : for _, size := range d.mu.versions.zombieTables {
2132 1 : metrics.Table.ZombieSize += size
2133 1 : }
2134 2 : metrics.private.optionsFileSize = d.optionsFileSize
2135 2 :
2136 2 : // TODO(jackson): Consider making these metrics optional.
2137 2 : metrics.Keys.RangeKeySetsCount = countRangeKeySetFragments(vers)
2138 2 : metrics.Keys.TombstoneCount = countTombstones(vers)
2139 2 :
2140 2 : d.mu.versions.logLock()
2141 2 : metrics.private.manifestFileSize = uint64(d.mu.versions.manifest.Size())
2142 2 : backingCount, backingTotalSize := d.mu.versions.virtualBackings.Stats()
2143 2 : metrics.Table.BackingTableCount = uint64(backingCount)
2144 2 : metrics.Table.BackingTableSize = backingTotalSize
2145 2 : d.mu.versions.logUnlock()
2146 2 :
2147 2 : metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
2148 2 : if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
2149 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2150 0 : }
2151 2 : metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput
2152 2 : if d.mu.compact.flushing {
2153 1 : metrics.Flush.NumInProgress = 1
2154 1 : }
2155 2 : for i := 0; i < numLevels; i++ {
2156 2 : metrics.Levels[i].Additional.ValueBlocksSize = valueBlocksSizeForLevel(vers, i)
2157 2 : }
2158 :
2159 2 : d.mu.Unlock()
2160 2 :
2161 2 : metrics.BlockCache = d.opts.Cache.Metrics()
2162 2 : metrics.TableCache, metrics.Filter = d.tableCache.metrics()
2163 2 : metrics.TableIters = int64(d.tableCache.iterCount())
2164 2 : metrics.CategoryStats = d.tableCache.dbOpts.sstStatsCollector.GetStats()
2165 2 :
2166 2 : metrics.SecondaryCacheMetrics = d.objProvider.Metrics()
2167 2 :
2168 2 : metrics.Uptime = d.timeNow().Sub(d.openedAt)
2169 2 :
2170 2 : return metrics
2171 : }
2172 :
2173 : // sstablesOptions hold the optional parameters to retrieve TableInfo for all sstables.
2174 : type sstablesOptions struct {
2175 : // set to true will return the sstable properties in TableInfo
2176 : withProperties bool
2177 :
2178 : // if set, return sstables that overlap the key range (end-exclusive)
2179 : start []byte
2180 : end []byte
2181 :
2182 : withApproximateSpanBytes bool
2183 : }
2184 :
2185 : // SSTablesOption set optional parameter used by `DB.SSTables`.
2186 : type SSTablesOption func(*sstablesOptions)
2187 :
2188 : // WithProperties enable return sstable properties in each TableInfo.
2189 : //
2190 : // NOTE: if most of the sstable properties need to be read from disk,
2191 : // this options may make method `SSTables` quite slow.
2192 1 : func WithProperties() SSTablesOption {
2193 1 : return func(opt *sstablesOptions) {
2194 1 : opt.withProperties = true
2195 1 : }
2196 : }
2197 :
2198 : // WithKeyRangeFilter ensures returned sstables overlap start and end (end-exclusive)
2199 : // if start and end are both nil these properties have no effect.
2200 1 : func WithKeyRangeFilter(start, end []byte) SSTablesOption {
2201 1 : return func(opt *sstablesOptions) {
2202 1 : opt.end = end
2203 1 : opt.start = start
2204 1 : }
2205 : }
2206 :
2207 : // WithApproximateSpanBytes enables capturing the approximate number of bytes that
2208 : // overlap the provided key span for each sstable.
2209 : // NOTE: this option can only be used with WithKeyRangeFilter and WithProperties
2210 : // provided.
2211 1 : func WithApproximateSpanBytes() SSTablesOption {
2212 1 : return func(opt *sstablesOptions) {
2213 1 : opt.withApproximateSpanBytes = true
2214 1 : }
2215 : }
2216 :
2217 : // BackingType denotes the type of storage backing a given sstable.
2218 : type BackingType int
2219 :
2220 : const (
2221 : // BackingTypeLocal denotes an sstable stored on local disk according to the
2222 : // objprovider. This file is completely owned by us.
2223 : BackingTypeLocal BackingType = iota
2224 : // BackingTypeShared denotes an sstable stored on shared storage, created
2225 : // by this Pebble instance and possibly shared by other Pebble instances.
2226 : // These types of files have lifecycle managed by Pebble.
2227 : BackingTypeShared
2228 : // BackingTypeSharedForeign denotes an sstable stored on shared storage,
2229 : // created by a Pebble instance other than this one. These types of files have
2230 : // lifecycle managed by Pebble.
2231 : BackingTypeSharedForeign
2232 : // BackingTypeExternal denotes an sstable stored on external storage,
2233 : // not owned by any Pebble instance and with no refcounting/cleanup methods
2234 : // or lifecycle management. An example of an external file is a file restored
2235 : // from a backup.
2236 : BackingTypeExternal
2237 : )
2238 :
2239 : // SSTableInfo export manifest.TableInfo with sstable.Properties alongside
2240 : // other file backing info.
2241 : type SSTableInfo struct {
2242 : manifest.TableInfo
2243 : // Virtual indicates whether the sstable is virtual.
2244 : Virtual bool
2245 : // BackingSSTNum is the disk file number associated with the backing sstable.
2246 : // If Virtual is false, BackingSSTNum == PhysicalTableDiskFileNum(FileNum).
2247 : BackingSSTNum base.DiskFileNum
2248 : // BackingType is the type of storage backing this sstable.
2249 : BackingType BackingType
2250 : // Locator is the remote.Locator backing this sstable, if the backing type is
2251 : // not BackingTypeLocal.
2252 : Locator remote.Locator
2253 :
2254 : // Properties is the sstable properties of this table. If Virtual is true,
2255 : // then the Properties are associated with the backing sst.
2256 : Properties *sstable.Properties
2257 : }
2258 :
2259 : // SSTables retrieves the current sstables. The returned slice is indexed by
2260 : // level and each level is indexed by the position of the sstable within the
2261 : // level. Note that this information may be out of date due to concurrent
2262 : // flushes and compactions.
2263 1 : func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
2264 1 : opt := &sstablesOptions{}
2265 1 : for _, fn := range opts {
2266 1 : fn(opt)
2267 1 : }
2268 :
2269 1 : if opt.withApproximateSpanBytes && !opt.withProperties {
2270 1 : return nil, errors.Errorf("Cannot use WithApproximateSpanBytes without WithProperties option.")
2271 1 : }
2272 1 : if opt.withApproximateSpanBytes && (opt.start == nil || opt.end == nil) {
2273 1 : return nil, errors.Errorf("Cannot use WithApproximateSpanBytes without WithKeyRangeFilter option.")
2274 1 : }
2275 :
2276 : // Grab and reference the current readState.
2277 1 : readState := d.loadReadState()
2278 1 : defer readState.unref()
2279 1 :
2280 1 : // TODO(peter): This is somewhat expensive, especially on a large
2281 1 : // database. It might be worthwhile to unify TableInfo and FileMetadata and
2282 1 : // then we could simply return current.Files. Note that RocksDB is doing
2283 1 : // something similar to the current code, so perhaps it isn't too bad.
2284 1 : srcLevels := readState.current.Levels
2285 1 : var totalTables int
2286 1 : for i := range srcLevels {
2287 1 : totalTables += srcLevels[i].Len()
2288 1 : }
2289 :
2290 1 : destTables := make([]SSTableInfo, totalTables)
2291 1 : destLevels := make([][]SSTableInfo, len(srcLevels))
2292 1 : for i := range destLevels {
2293 1 : iter := srcLevels[i].Iter()
2294 1 : j := 0
2295 1 : for m := iter.First(); m != nil; m = iter.Next() {
2296 1 : if opt.start != nil && opt.end != nil && !m.Overlaps(d.opts.Comparer.Compare, opt.start, opt.end, true /* exclusive end */) {
2297 1 : continue
2298 : }
2299 1 : destTables[j] = SSTableInfo{TableInfo: m.TableInfo()}
2300 1 : if opt.withProperties {
2301 1 : p, err := d.tableCache.getTableProperties(
2302 1 : m,
2303 1 : )
2304 1 : if err != nil {
2305 0 : return nil, err
2306 0 : }
2307 1 : destTables[j].Properties = p
2308 : }
2309 1 : destTables[j].Virtual = m.Virtual
2310 1 : destTables[j].BackingSSTNum = m.FileBacking.DiskFileNum
2311 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, m.FileBacking.DiskFileNum)
2312 1 : if err != nil {
2313 0 : return nil, err
2314 0 : }
2315 1 : if objMeta.IsRemote() {
2316 0 : if objMeta.IsShared() {
2317 0 : if d.objProvider.IsSharedForeign(objMeta) {
2318 0 : destTables[j].BackingType = BackingTypeSharedForeign
2319 0 : } else {
2320 0 : destTables[j].BackingType = BackingTypeShared
2321 0 : }
2322 0 : } else {
2323 0 : destTables[j].BackingType = BackingTypeExternal
2324 0 : }
2325 0 : destTables[j].Locator = objMeta.Remote.Locator
2326 1 : } else {
2327 1 : destTables[j].BackingType = BackingTypeLocal
2328 1 : }
2329 :
2330 1 : if opt.withApproximateSpanBytes {
2331 1 : var spanBytes uint64
2332 1 : if m.ContainedWithinSpan(d.opts.Comparer.Compare, opt.start, opt.end) {
2333 0 : spanBytes = m.Size
2334 1 : } else {
2335 1 : size, err := d.tableCache.estimateSize(m, opt.start, opt.end)
2336 1 : if err != nil {
2337 0 : return nil, err
2338 0 : }
2339 1 : spanBytes = size
2340 : }
2341 1 : propertiesCopy := *destTables[j].Properties
2342 1 :
2343 1 : // Deep copy user properties so approximate span bytes can be added.
2344 1 : propertiesCopy.UserProperties = make(map[string]string, len(destTables[j].Properties.UserProperties)+1)
2345 1 : for k, v := range destTables[j].Properties.UserProperties {
2346 0 : propertiesCopy.UserProperties[k] = v
2347 0 : }
2348 1 : propertiesCopy.UserProperties["approximate-span-bytes"] = strconv.FormatUint(spanBytes, 10)
2349 1 : destTables[j].Properties = &propertiesCopy
2350 : }
2351 1 : j++
2352 : }
2353 1 : destLevels[i] = destTables[:j]
2354 1 : destTables = destTables[j:]
2355 : }
2356 :
2357 1 : return destLevels, nil
2358 : }
2359 :
2360 : // EstimateDiskUsage returns the estimated filesystem space used in bytes for
2361 : // storing the range `[start, end]`. The estimation is computed as follows:
2362 : //
2363 : // - For sstables fully contained in the range the whole file size is included.
2364 : // - For sstables partially contained in the range the overlapping data block sizes
2365 : // are included. Even if a data block partially overlaps, or we cannot determine
2366 : // overlap due to abbreviated index keys, the full data block size is included in
2367 : // the estimation. Note that unlike fully contained sstables, none of the
2368 : // meta-block space is counted for partially overlapped files.
2369 : // - For virtual sstables, we use the overlap between start, end and the virtual
2370 : // sstable bounds to determine disk usage.
2371 : // - There may also exist WAL entries for unflushed keys in this range. This
2372 : // estimation currently excludes space used for the range in the WAL.
2373 1 : func (d *DB) EstimateDiskUsage(start, end []byte) (uint64, error) {
2374 1 : bytes, _, _, err := d.EstimateDiskUsageByBackingType(start, end)
2375 1 : return bytes, err
2376 1 : }
2377 :
2378 : // EstimateDiskUsageByBackingType is like EstimateDiskUsage but additionally
2379 : // returns the subsets of that size in remote ane external files.
2380 : func (d *DB) EstimateDiskUsageByBackingType(
2381 : start, end []byte,
2382 1 : ) (totalSize, remoteSize, externalSize uint64, _ error) {
2383 1 : if err := d.closed.Load(); err != nil {
2384 0 : panic(err)
2385 : }
2386 1 : if d.opts.Comparer.Compare(start, end) > 0 {
2387 0 : return 0, 0, 0, errors.New("invalid key-range specified (start > end)")
2388 0 : }
2389 :
2390 : // Grab and reference the current readState. This prevents the underlying
2391 : // files in the associated version from being deleted if there is a concurrent
2392 : // compaction.
2393 1 : readState := d.loadReadState()
2394 1 : defer readState.unref()
2395 1 :
2396 1 : for level, files := range readState.current.Levels {
2397 1 : iter := files.Iter()
2398 1 : if level > 0 {
2399 1 : // We can only use `Overlaps` to restrict `files` at L1+ since at L0 it
2400 1 : // expands the range iteratively until it has found a set of files that
2401 1 : // do not overlap any other L0 files outside that set.
2402 1 : overlaps := readState.current.Overlaps(level, start, end, false /* exclusiveEnd */)
2403 1 : iter = overlaps.Iter()
2404 1 : }
2405 1 : for file := iter.First(); file != nil; file = iter.Next() {
2406 1 : if d.opts.Comparer.Compare(start, file.Smallest.UserKey) <= 0 &&
2407 1 : d.opts.Comparer.Compare(file.Largest.UserKey, end) <= 0 {
2408 1 : // The range fully contains the file, so skip looking it up in
2409 1 : // table cache/looking at its indexes, and add the full file size.
2410 1 : meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
2411 1 : if err != nil {
2412 0 : return 0, 0, 0, err
2413 0 : }
2414 1 : if meta.IsRemote() {
2415 0 : remoteSize += file.Size
2416 0 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
2417 0 : externalSize += file.Size
2418 0 : }
2419 : }
2420 1 : totalSize += file.Size
2421 1 : } else if d.opts.Comparer.Compare(file.Smallest.UserKey, end) <= 0 &&
2422 1 : d.opts.Comparer.Compare(start, file.Largest.UserKey) <= 0 {
2423 1 : var size uint64
2424 1 : var err error
2425 1 : if file.Virtual {
2426 0 : err = d.tableCache.withVirtualReader(
2427 0 : file.VirtualMeta(),
2428 0 : func(r sstable.VirtualReader) (err error) {
2429 0 : size, err = r.EstimateDiskUsage(start, end)
2430 0 : return err
2431 0 : },
2432 : )
2433 1 : } else {
2434 1 : err = d.tableCache.withReader(
2435 1 : file.PhysicalMeta(),
2436 1 : func(r *sstable.Reader) (err error) {
2437 1 : size, err = r.EstimateDiskUsage(start, end)
2438 1 : return err
2439 1 : },
2440 : )
2441 : }
2442 1 : if err != nil {
2443 0 : return 0, 0, 0, err
2444 0 : }
2445 1 : meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
2446 1 : if err != nil {
2447 0 : return 0, 0, 0, err
2448 0 : }
2449 1 : if meta.IsRemote() {
2450 0 : remoteSize += size
2451 0 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
2452 0 : externalSize += size
2453 0 : }
2454 : }
2455 1 : totalSize += size
2456 : }
2457 : }
2458 : }
2459 1 : return totalSize, remoteSize, externalSize, nil
2460 : }
2461 :
2462 2 : func (d *DB) walPreallocateSize() int {
2463 2 : // Set the WAL preallocate size to 110% of the memtable size. Note that there
2464 2 : // is a bit of apples and oranges in units here as the memtabls size
2465 2 : // corresponds to the memory usage of the memtable while the WAL size is the
2466 2 : // size of the batches (plus overhead) stored in the WAL.
2467 2 : //
2468 2 : // TODO(peter): 110% of the memtable size is quite hefty for a block
2469 2 : // size. This logic is taken from GetWalPreallocateBlockSize in
2470 2 : // RocksDB. Could a smaller preallocation block size be used?
2471 2 : size := d.opts.MemTableSize
2472 2 : size = (size / 10) + size
2473 2 : return int(size)
2474 2 : }
2475 :
2476 2 : func (d *DB) newMemTable(logNum base.DiskFileNum, logSeqNum uint64) (*memTable, *flushableEntry) {
2477 2 : size := d.mu.mem.nextSize
2478 2 : if d.mu.mem.nextSize < d.opts.MemTableSize {
2479 2 : d.mu.mem.nextSize *= 2
2480 2 : if d.mu.mem.nextSize > d.opts.MemTableSize {
2481 1 : d.mu.mem.nextSize = d.opts.MemTableSize
2482 1 : }
2483 : }
2484 :
2485 2 : memtblOpts := memTableOptions{
2486 2 : Options: d.opts,
2487 2 : logSeqNum: logSeqNum,
2488 2 : }
2489 2 :
2490 2 : // Before attempting to allocate a new memtable, check if there's one
2491 2 : // available for recycling in memTableRecycle. Large contiguous allocations
2492 2 : // can be costly as fragmentation makes it more difficult to find a large
2493 2 : // contiguous free space. We've observed 64MB allocations taking 10ms+.
2494 2 : //
2495 2 : // To reduce these costly allocations, up to 1 obsolete memtable is stashed
2496 2 : // in `d.memTableRecycle` to allow a future memtable rotation to reuse
2497 2 : // existing memory.
2498 2 : var mem *memTable
2499 2 : mem = d.memTableRecycle.Swap(nil)
2500 2 : if mem != nil && uint64(len(mem.arenaBuf)) != size {
2501 2 : d.freeMemTable(mem)
2502 2 : mem = nil
2503 2 : }
2504 2 : if mem != nil {
2505 2 : // Carry through the existing buffer and memory reservation.
2506 2 : memtblOpts.arenaBuf = mem.arenaBuf
2507 2 : memtblOpts.releaseAccountingReservation = mem.releaseAccountingReservation
2508 2 : } else {
2509 2 : mem = new(memTable)
2510 2 : memtblOpts.arenaBuf = manual.New(int(size))
2511 2 : memtblOpts.releaseAccountingReservation = d.opts.Cache.Reserve(int(size))
2512 2 : d.memTableCount.Add(1)
2513 2 : d.memTableReserved.Add(int64(size))
2514 2 :
2515 2 : // Note: this is a no-op if invariants are disabled or race is enabled.
2516 2 : invariants.SetFinalizer(mem, checkMemTable)
2517 2 : }
2518 2 : mem.init(memtblOpts)
2519 2 :
2520 2 : entry := d.newFlushableEntry(mem, logNum, logSeqNum)
2521 2 : entry.releaseMemAccounting = func() {
2522 2 : // If the user leaks iterators, we may be releasing the memtable after
2523 2 : // the DB is already closed. In this case, we want to just release the
2524 2 : // memory because DB.Close won't come along to free it for us.
2525 2 : if err := d.closed.Load(); err != nil {
2526 2 : d.freeMemTable(mem)
2527 2 : return
2528 2 : }
2529 :
2530 : // The next memtable allocation might be able to reuse this memtable.
2531 : // Stash it on d.memTableRecycle.
2532 2 : if unusedMem := d.memTableRecycle.Swap(mem); unusedMem != nil {
2533 2 : // There was already a memtable waiting to be recycled. We're now
2534 2 : // responsible for freeing it.
2535 2 : d.freeMemTable(unusedMem)
2536 2 : }
2537 : }
2538 2 : return mem, entry
2539 : }
2540 :
2541 2 : func (d *DB) freeMemTable(m *memTable) {
2542 2 : d.memTableCount.Add(-1)
2543 2 : d.memTableReserved.Add(-int64(len(m.arenaBuf)))
2544 2 : m.free()
2545 2 : }
2546 :
2547 : func (d *DB) newFlushableEntry(
2548 : f flushable, logNum base.DiskFileNum, logSeqNum uint64,
2549 2 : ) *flushableEntry {
2550 2 : fe := &flushableEntry{
2551 2 : flushable: f,
2552 2 : flushed: make(chan struct{}),
2553 2 : logNum: logNum,
2554 2 : logSeqNum: logSeqNum,
2555 2 : deleteFn: d.mu.versions.addObsolete,
2556 2 : deleteFnLocked: d.mu.versions.addObsoleteLocked,
2557 2 : }
2558 2 : fe.readerRefs.Store(1)
2559 2 : return fe
2560 2 : }
2561 :
2562 : // makeRoomForWrite ensures that the memtable has room to hold the contents of
2563 : // Batch. It reserves the space in the memtable and adds a reference to the
2564 : // memtable. The caller must later ensure that the memtable is unreferenced. If
2565 : // the memtable is full, or a nil Batch is provided, the current memtable is
2566 : // rotated (marked as immutable) and a new mutable memtable is allocated. This
2567 : // memtable rotation also causes a log rotation.
2568 : //
2569 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2570 : // may be released and reacquired.
2571 2 : func (d *DB) makeRoomForWrite(b *Batch) error {
2572 2 : if b != nil && b.ingestedSSTBatch {
2573 0 : panic("pebble: invalid function call")
2574 : }
2575 :
2576 2 : force := b == nil || b.flushable != nil
2577 2 : stalled := false
2578 2 : for {
2579 2 : if b != nil && b.flushable == nil {
2580 2 : err := d.mu.mem.mutable.prepare(b)
2581 2 : if err != arenaskl.ErrArenaFull {
2582 2 : if stalled {
2583 2 : d.opts.EventListener.WriteStallEnd()
2584 2 : }
2585 2 : return err
2586 : }
2587 2 : } else if !force {
2588 2 : if stalled {
2589 2 : d.opts.EventListener.WriteStallEnd()
2590 2 : }
2591 2 : return nil
2592 : }
2593 : // force || err == ErrArenaFull, so we need to rotate the current memtable.
2594 2 : {
2595 2 : var size uint64
2596 2 : for i := range d.mu.mem.queue {
2597 2 : size += d.mu.mem.queue[i].totalBytes()
2598 2 : }
2599 : // If ElevateWriteStallThresholdForFailover is true, we give an
2600 : // unlimited memory budget for memtables. This is simpler than trying to
2601 : // configure an explicit value, given that memory resources can vary.
2602 : // When using WAL failover in CockroachDB, an OOM risk is worth
2603 : // tolerating for workloads that have a strict latency SLO. Also, an
2604 : // unlimited budget here does not mean that the disk stall in the
2605 : // primary will go unnoticed until the OOM -- CockroachDB is monitoring
2606 : // disk stalls, and we expect it to fail the node after ~60s if the
2607 : // primary is stalled.
2608 2 : if size >= uint64(d.opts.MemTableStopWritesThreshold)*d.opts.MemTableSize &&
2609 2 : !d.mu.log.manager.ElevateWriteStallThresholdForFailover() {
2610 2 : // We have filled up the current memtable, but already queued memtables
2611 2 : // are still flushing, so we wait.
2612 2 : if !stalled {
2613 2 : stalled = true
2614 2 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2615 2 : Reason: "memtable count limit reached",
2616 2 : })
2617 2 : }
2618 2 : now := time.Now()
2619 2 : d.mu.compact.cond.Wait()
2620 2 : if b != nil {
2621 2 : b.commitStats.MemTableWriteStallDuration += time.Since(now)
2622 2 : }
2623 2 : continue
2624 : }
2625 : }
2626 2 : l0ReadAmp := d.mu.versions.currentVersion().L0Sublevels.ReadAmplification()
2627 2 : if l0ReadAmp >= d.opts.L0StopWritesThreshold {
2628 2 : // There are too many level-0 files, so we wait.
2629 2 : if !stalled {
2630 2 : stalled = true
2631 2 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2632 2 : Reason: "L0 file count limit exceeded",
2633 2 : })
2634 2 : }
2635 2 : now := time.Now()
2636 2 : d.mu.compact.cond.Wait()
2637 2 : if b != nil {
2638 1 : b.commitStats.L0ReadAmpWriteStallDuration += time.Since(now)
2639 1 : }
2640 2 : continue
2641 : }
2642 :
2643 2 : var newLogNum base.DiskFileNum
2644 2 : var prevLogSize uint64
2645 2 : if !d.opts.DisableWAL {
2646 2 : now := time.Now()
2647 2 : newLogNum, prevLogSize = d.recycleWAL()
2648 2 : if b != nil {
2649 2 : b.commitStats.WALRotationDuration += time.Since(now)
2650 2 : }
2651 : }
2652 :
2653 2 : immMem := d.mu.mem.mutable
2654 2 : imm := d.mu.mem.queue[len(d.mu.mem.queue)-1]
2655 2 : imm.logSize = prevLogSize
2656 2 : imm.flushForced = imm.flushForced || (b == nil)
2657 2 :
2658 2 : // If we are manually flushing and we used less than half of the bytes in
2659 2 : // the memtable, don't increase the size for the next memtable. This
2660 2 : // reduces memtable memory pressure when an application is frequently
2661 2 : // manually flushing.
2662 2 : if (b == nil) && uint64(immMem.availBytes()) > immMem.totalBytes()/2 {
2663 2 : d.mu.mem.nextSize = immMem.totalBytes()
2664 2 : }
2665 :
2666 2 : if b != nil && b.flushable != nil {
2667 2 : // The batch is too large to fit in the memtable so add it directly to
2668 2 : // the immutable queue. The flushable batch is associated with the same
2669 2 : // log as the immutable memtable, but logically occurs after it in
2670 2 : // seqnum space. We ensure while flushing that the flushable batch
2671 2 : // is flushed along with the previous memtable in the flushable
2672 2 : // queue. See the top level comment in DB.flush1 to learn how this
2673 2 : // is ensured.
2674 2 : //
2675 2 : // See DB.commitWrite for the special handling of log writes for large
2676 2 : // batches. In particular, the large batch has already written to
2677 2 : // imm.logNum.
2678 2 : entry := d.newFlushableEntry(b.flushable, imm.logNum, b.SeqNum())
2679 2 : // The large batch is by definition large. Reserve space from the cache
2680 2 : // for it until it is flushed.
2681 2 : entry.releaseMemAccounting = d.opts.Cache.Reserve(int(b.flushable.totalBytes()))
2682 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2683 2 : }
2684 :
2685 2 : var logSeqNum uint64
2686 2 : if b != nil {
2687 2 : logSeqNum = b.SeqNum()
2688 2 : if b.flushable != nil {
2689 2 : logSeqNum += uint64(b.Count())
2690 2 : }
2691 2 : } else {
2692 2 : logSeqNum = d.mu.versions.logSeqNum.Load()
2693 2 : }
2694 2 : d.rotateMemtable(newLogNum, logSeqNum, immMem)
2695 2 : force = false
2696 : }
2697 : }
2698 :
2699 : // Both DB.mu and commitPipeline.mu must be held by the caller.
2700 2 : func (d *DB) rotateMemtable(newLogNum base.DiskFileNum, logSeqNum uint64, prev *memTable) {
2701 2 : // Create a new memtable, scheduling the previous one for flushing. We do
2702 2 : // this even if the previous memtable was empty because the DB.Flush
2703 2 : // mechanism is dependent on being able to wait for the empty memtable to
2704 2 : // flush. We can't just mark the empty memtable as flushed here because we
2705 2 : // also have to wait for all previous immutable tables to
2706 2 : // flush. Additionally, the memtable is tied to particular WAL file and we
2707 2 : // want to go through the flush path in order to recycle that WAL file.
2708 2 : //
2709 2 : // NB: newLogNum corresponds to the WAL that contains mutations that are
2710 2 : // present in the new memtable. When immutable memtables are flushed to
2711 2 : // disk, a VersionEdit will be created telling the manifest the minimum
2712 2 : // unflushed log number (which will be the next one in d.mu.mem.mutable
2713 2 : // that was not flushed).
2714 2 : //
2715 2 : // NB: prev should be the current mutable memtable.
2716 2 : var entry *flushableEntry
2717 2 : d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum)
2718 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2719 2 : d.updateReadStateLocked(nil)
2720 2 : if prev.writerUnref() {
2721 2 : d.maybeScheduleFlush()
2722 2 : }
2723 : }
2724 :
2725 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2726 : // may be released and reacquired.
2727 2 : func (d *DB) recycleWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
2728 2 : if d.opts.DisableWAL {
2729 0 : panic("pebble: invalid function call")
2730 : }
2731 2 : jobID := d.mu.nextJobID
2732 2 : d.mu.nextJobID++
2733 2 : newLogNum = d.mu.versions.getNextDiskFileNum()
2734 2 :
2735 2 : d.mu.Unlock()
2736 2 : // Close the previous log first. This writes an EOF trailer
2737 2 : // signifying the end of the file and syncs it to disk. We must
2738 2 : // close the previous log before linking the new log file,
2739 2 : // otherwise a crash could leave both logs with unclean tails, and
2740 2 : // Open will treat the previous log as corrupt.
2741 2 : offset, err := d.mu.log.writer.Close()
2742 2 : if err != nil {
2743 0 : // What to do here? Stumbling on doesn't seem worthwhile. If we failed to
2744 0 : // close the previous log it is possible we lost a write.
2745 0 : panic(err)
2746 : }
2747 2 : prevLogSize = uint64(offset)
2748 2 : metrics := d.mu.log.writer.Metrics()
2749 2 :
2750 2 : d.mu.Lock()
2751 2 : if err := d.mu.log.metrics.LogWriterMetrics.Merge(&metrics); err != nil {
2752 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2753 0 : }
2754 :
2755 2 : d.mu.Unlock()
2756 2 : writer, err := d.mu.log.manager.Create(wal.NumWAL(newLogNum), jobID)
2757 2 : if err != nil {
2758 0 : panic(err)
2759 : }
2760 :
2761 2 : d.mu.Lock()
2762 2 : d.mu.log.writer = writer
2763 2 : return newLogNum, prevLogSize
2764 : }
2765 :
2766 2 : func (d *DB) getEarliestUnflushedSeqNumLocked() uint64 {
2767 2 : seqNum := InternalKeySeqNumMax
2768 2 : for i := range d.mu.mem.queue {
2769 2 : logSeqNum := d.mu.mem.queue[i].logSeqNum
2770 2 : if seqNum > logSeqNum {
2771 2 : seqNum = logSeqNum
2772 2 : }
2773 : }
2774 2 : return seqNum
2775 : }
2776 :
2777 2 : func (d *DB) getInProgressCompactionInfoLocked(finishing *compaction) (rv []compactionInfo) {
2778 2 : for c := range d.mu.compact.inProgress {
2779 2 : if len(c.flushing) == 0 && (finishing == nil || c != finishing) {
2780 2 : info := compactionInfo{
2781 2 : versionEditApplied: c.versionEditApplied,
2782 2 : inputs: c.inputs,
2783 2 : smallest: c.smallest,
2784 2 : largest: c.largest,
2785 2 : outputLevel: -1,
2786 2 : }
2787 2 : if c.outputLevel != nil {
2788 2 : info.outputLevel = c.outputLevel.level
2789 2 : }
2790 2 : rv = append(rv, info)
2791 : }
2792 : }
2793 2 : return
2794 : }
2795 :
2796 2 : func inProgressL0Compactions(inProgress []compactionInfo) []manifest.L0Compaction {
2797 2 : var compactions []manifest.L0Compaction
2798 2 : for _, info := range inProgress {
2799 2 : // Skip in-progress compactions that have already committed; the L0
2800 2 : // sublevels initialization code requires the set of in-progress
2801 2 : // compactions to be consistent with the current version. Compactions
2802 2 : // with versionEditApplied=true are already applied to the current
2803 2 : // version and but are performing cleanup without the database mutex.
2804 2 : if info.versionEditApplied {
2805 2 : continue
2806 : }
2807 2 : l0 := false
2808 2 : for _, cl := range info.inputs {
2809 2 : l0 = l0 || cl.level == 0
2810 2 : }
2811 2 : if !l0 {
2812 2 : continue
2813 : }
2814 2 : compactions = append(compactions, manifest.L0Compaction{
2815 2 : Smallest: info.smallest,
2816 2 : Largest: info.largest,
2817 2 : IsIntraL0: info.outputLevel == 0,
2818 2 : })
2819 : }
2820 2 : return compactions
2821 : }
2822 :
2823 : // firstError returns the first non-nil error of err0 and err1, or nil if both
2824 : // are nil.
2825 2 : func firstError(err0, err1 error) error {
2826 2 : if err0 != nil {
2827 2 : return err0
2828 2 : }
2829 2 : return err1
2830 : }
2831 :
2832 : // SetCreatorID sets the CreatorID which is needed in order to use shared objects.
2833 : // Remote object usage is disabled until this method is called the first time.
2834 : // Once set, the Creator ID is persisted and cannot change.
2835 : //
2836 : // Does nothing if SharedStorage was not set in the options when the DB was
2837 : // opened or if the DB is in read-only mode.
2838 2 : func (d *DB) SetCreatorID(creatorID uint64) error {
2839 2 : if d.opts.Experimental.RemoteStorage == nil || d.opts.ReadOnly {
2840 1 : return nil
2841 1 : }
2842 2 : return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
2843 : }
2844 :
2845 : // KeyStatistics keeps track of the number of keys that have been pinned by a
2846 : // snapshot as well as counts of the different key kinds in the lsm.
2847 : //
2848 : // One way of using the accumulated stats, when we only have sets and dels,
2849 : // and say the counts are represented as del_count, set_count,
2850 : // del_latest_count, set_latest_count, snapshot_pinned_count.
2851 : //
2852 : // - del_latest_count + set_latest_count is the set of unique user keys
2853 : // (unique).
2854 : //
2855 : // - set_latest_count is the set of live unique user keys (live_unique).
2856 : //
2857 : // - Garbage is del_count + set_count - live_unique.
2858 : //
2859 : // - If everything were in the LSM, del_count+set_count-snapshot_pinned_count
2860 : // would also be the set of unique user keys (note that
2861 : // snapshot_pinned_count is counting something different -- see comment below).
2862 : // But snapshot_pinned_count only counts keys in the LSM so the excess here
2863 : // must be keys in memtables.
2864 : type KeyStatistics struct {
2865 : // TODO(sumeer): the SnapshotPinned* are incorrect in that these older
2866 : // versions can be in a different level. Either fix the accounting or
2867 : // rename these fields.
2868 :
2869 : // SnapshotPinnedKeys represents obsolete keys that cannot be elided during
2870 : // a compaction, because they are required by an open snapshot.
2871 : SnapshotPinnedKeys int
2872 : // SnapshotPinnedKeysBytes is the total number of bytes of all snapshot
2873 : // pinned keys.
2874 : SnapshotPinnedKeysBytes uint64
2875 : // KindsCount is the count for each kind of key. It includes point keys,
2876 : // range deletes and range keys.
2877 : KindsCount [InternalKeyKindMax + 1]int
2878 : // LatestKindsCount is the count for each kind of key when it is the latest
2879 : // kind for a user key. It is only populated for point keys.
2880 : LatestKindsCount [InternalKeyKindMax + 1]int
2881 : }
2882 :
2883 : // LSMKeyStatistics is used by DB.ScanStatistics.
2884 : type LSMKeyStatistics struct {
2885 : Accumulated KeyStatistics
2886 : // Levels contains statistics only for point keys. Range deletions and range keys will
2887 : // appear in Accumulated but not Levels.
2888 : Levels [numLevels]KeyStatistics
2889 : // BytesRead represents the logical, pre-compression size of keys and values read
2890 : BytesRead uint64
2891 : }
2892 :
2893 : // ScanStatisticsOptions is used by DB.ScanStatistics.
2894 : type ScanStatisticsOptions struct {
2895 : // LimitBytesPerSecond indicates the number of bytes that are able to be read
2896 : // per second using ScanInternal.
2897 : // A value of 0 indicates that there is no limit set.
2898 : LimitBytesPerSecond int64
2899 : }
2900 :
2901 : // ScanStatistics returns the count of different key kinds within the lsm for a
2902 : // key span [lower, upper) as well as the number of snapshot keys.
2903 : func (d *DB) ScanStatistics(
2904 : ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions,
2905 1 : ) (LSMKeyStatistics, error) {
2906 1 : stats := LSMKeyStatistics{}
2907 1 : var prevKey InternalKey
2908 1 : var rateLimitFunc func(key *InternalKey, val LazyValue) error
2909 1 : tb := tokenbucket.TokenBucket{}
2910 1 :
2911 1 : if opts.LimitBytesPerSecond != 0 {
2912 0 : // Each "token" roughly corresponds to a byte that was read.
2913 0 : tb.Init(tokenbucket.TokensPerSecond(opts.LimitBytesPerSecond), tokenbucket.Tokens(1024))
2914 0 : rateLimitFunc = func(key *InternalKey, val LazyValue) error {
2915 0 : return tb.WaitCtx(ctx, tokenbucket.Tokens(key.Size()+val.Len()))
2916 0 : }
2917 : }
2918 :
2919 1 : scanInternalOpts := &scanInternalOptions{
2920 1 : visitPointKey: func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
2921 1 : // If the previous key is equal to the current point key, the current key was
2922 1 : // pinned by a snapshot.
2923 1 : size := uint64(key.Size())
2924 1 : kind := key.Kind()
2925 1 : sameKey := d.equal(prevKey.UserKey, key.UserKey)
2926 1 : if iterInfo.Kind == IteratorLevelLSM && sameKey {
2927 1 : stats.Levels[iterInfo.Level].SnapshotPinnedKeys++
2928 1 : stats.Levels[iterInfo.Level].SnapshotPinnedKeysBytes += size
2929 1 : stats.Accumulated.SnapshotPinnedKeys++
2930 1 : stats.Accumulated.SnapshotPinnedKeysBytes += size
2931 1 : }
2932 1 : if iterInfo.Kind == IteratorLevelLSM {
2933 1 : stats.Levels[iterInfo.Level].KindsCount[kind]++
2934 1 : }
2935 1 : if !sameKey {
2936 1 : if iterInfo.Kind == IteratorLevelLSM {
2937 1 : stats.Levels[iterInfo.Level].LatestKindsCount[kind]++
2938 1 : }
2939 1 : stats.Accumulated.LatestKindsCount[kind]++
2940 : }
2941 :
2942 1 : stats.Accumulated.KindsCount[kind]++
2943 1 : prevKey.CopyFrom(*key)
2944 1 : stats.BytesRead += uint64(key.Size() + value.Len())
2945 1 : return nil
2946 : },
2947 0 : visitRangeDel: func(start, end []byte, seqNum uint64) error {
2948 0 : stats.Accumulated.KindsCount[InternalKeyKindRangeDelete]++
2949 0 : stats.BytesRead += uint64(len(start) + len(end))
2950 0 : return nil
2951 0 : },
2952 0 : visitRangeKey: func(start, end []byte, keys []rangekey.Key) error {
2953 0 : stats.BytesRead += uint64(len(start) + len(end))
2954 0 : for _, key := range keys {
2955 0 : stats.Accumulated.KindsCount[key.Kind()]++
2956 0 : stats.BytesRead += uint64(len(key.Value) + len(key.Suffix))
2957 0 : }
2958 0 : return nil
2959 : },
2960 : includeObsoleteKeys: true,
2961 : IterOptions: IterOptions{
2962 : KeyTypes: IterKeyTypePointsAndRanges,
2963 : LowerBound: lower,
2964 : UpperBound: upper,
2965 : },
2966 : rateLimitFunc: rateLimitFunc,
2967 : }
2968 1 : iter, err := d.newInternalIter(ctx, snapshotIterOpts{}, scanInternalOpts)
2969 1 : if err != nil {
2970 0 : return LSMKeyStatistics{}, err
2971 0 : }
2972 1 : defer iter.close()
2973 1 :
2974 1 : err = scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
2975 1 :
2976 1 : if err != nil {
2977 0 : return LSMKeyStatistics{}, err
2978 0 : }
2979 :
2980 1 : return stats, nil
2981 : }
2982 :
2983 : // ObjProvider returns the objstorage.Provider for this database. Meant to be
2984 : // used for internal purposes only.
2985 2 : func (d *DB) ObjProvider() objstorage.Provider {
2986 2 : return d.objProvider
2987 2 : }
2988 :
2989 1 : func (d *DB) checkVirtualBounds(m *fileMetadata) {
2990 1 : if !invariants.Enabled {
2991 0 : return
2992 0 : }
2993 :
2994 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, m.FileBacking.DiskFileNum)
2995 1 : if err != nil {
2996 0 : panic(err)
2997 : }
2998 1 : if objMeta.IsExternal() {
2999 0 : // Nothing to do; bounds are expected to be loose.
3000 0 : return
3001 0 : }
3002 :
3003 1 : iters, err := d.newIters(context.TODO(), m, nil, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
3004 1 : if err != nil {
3005 0 : panic(errors.Wrap(err, "pebble: error creating iterators"))
3006 : }
3007 1 : defer iters.CloseAll()
3008 1 :
3009 1 : if m.HasPointKeys {
3010 1 : pointIter := iters.Point()
3011 1 : rangeDelIter := iters.RangeDeletion()
3012 1 :
3013 1 : // Check that the lower bound is tight.
3014 1 : pointKey, _ := pointIter.First()
3015 1 : rangeDel, err := rangeDelIter.First()
3016 1 : if err != nil {
3017 0 : panic(err)
3018 : }
3019 1 : if (rangeDel == nil || d.cmp(rangeDel.SmallestKey().UserKey, m.SmallestPointKey.UserKey) != 0) &&
3020 1 : (pointKey == nil || d.cmp(pointKey.UserKey, m.SmallestPointKey.UserKey) != 0) {
3021 0 : panic(errors.Newf("pebble: virtual sstable %s lower point key bound is not tight", m.FileNum))
3022 : }
3023 :
3024 : // Check that the upper bound is tight.
3025 1 : pointKey, _ = pointIter.Last()
3026 1 : rangeDel, err = rangeDelIter.Last()
3027 1 : if err != nil {
3028 0 : panic(err)
3029 : }
3030 1 : if (rangeDel == nil || d.cmp(rangeDel.LargestKey().UserKey, m.LargestPointKey.UserKey) != 0) &&
3031 1 : (pointKey == nil || d.cmp(pointKey.UserKey, m.LargestPointKey.UserKey) != 0) {
3032 0 : panic(errors.Newf("pebble: virtual sstable %s upper point key bound is not tight", m.FileNum))
3033 : }
3034 :
3035 : // Check that iterator keys are within bounds.
3036 1 : for key, _ := pointIter.First(); key != nil; key, _ = pointIter.Next() {
3037 1 : if d.cmp(key.UserKey, m.SmallestPointKey.UserKey) < 0 || d.cmp(key.UserKey, m.LargestPointKey.UserKey) > 0 {
3038 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.UserKey))
3039 : }
3040 : }
3041 1 : s, err := rangeDelIter.First()
3042 1 : for ; s != nil; s, err = rangeDelIter.Next() {
3043 1 : if d.cmp(s.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
3044 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.SmallestKey().UserKey))
3045 : }
3046 1 : if d.cmp(s.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
3047 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.LargestKey().UserKey))
3048 : }
3049 : }
3050 1 : if err != nil {
3051 0 : panic(err)
3052 : }
3053 : }
3054 :
3055 1 : if !m.HasRangeKeys {
3056 1 : return
3057 1 : }
3058 0 : rangeKeyIter := iters.RangeKey()
3059 0 :
3060 0 : // Check that the lower bound is tight.
3061 0 : if s, err := rangeKeyIter.First(); err != nil {
3062 0 : panic(err)
3063 0 : } else if d.cmp(s.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) != 0 {
3064 0 : panic(errors.Newf("pebble: virtual sstable %s lower range key bound is not tight", m.FileNum))
3065 : }
3066 :
3067 : // Check that upper bound is tight.
3068 0 : if s, err := rangeKeyIter.Last(); err != nil {
3069 0 : panic(err)
3070 0 : } else if d.cmp(s.LargestKey().UserKey, m.LargestRangeKey.UserKey) != 0 {
3071 0 : panic(errors.Newf("pebble: virtual sstable %s upper range key bound is not tight", m.FileNum))
3072 : }
3073 :
3074 0 : s, err := rangeKeyIter.First()
3075 0 : for ; s != nil; s, err = rangeKeyIter.Next() {
3076 0 : if d.cmp(s.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) < 0 {
3077 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.SmallestKey().UserKey))
3078 : }
3079 0 : if d.cmp(s.LargestKey().UserKey, m.LargestRangeKey.UserKey) > 0 {
3080 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.LargestKey().UserKey))
3081 : }
3082 : }
3083 0 : if err != nil {
3084 0 : panic(err)
3085 : }
3086 : }
3087 :
3088 : // DebugString returns a debugging string describing the LSM.
3089 0 : func (d *DB) DebugString() string {
3090 0 : d.mu.Lock()
3091 0 : defer d.mu.Unlock()
3092 0 : return d.mu.versions.currentVersion().DebugString()
3093 0 : }
|