Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package pebble provides an ordered key/value store.
6 : package pebble // import "github.com/cockroachdb/pebble"
7 :
8 : import (
9 : "context"
10 : "fmt"
11 : "io"
12 : "os"
13 : "strconv"
14 : "sync"
15 : "sync/atomic"
16 : "time"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/arenaskl"
20 : "github.com/cockroachdb/pebble/internal/base"
21 : "github.com/cockroachdb/pebble/internal/invalidating"
22 : "github.com/cockroachdb/pebble/internal/invariants"
23 : "github.com/cockroachdb/pebble/internal/keyspan"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/cockroachdb/pebble/internal/manual"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/remote"
28 : "github.com/cockroachdb/pebble/rangekey"
29 : "github.com/cockroachdb/pebble/record"
30 : "github.com/cockroachdb/pebble/sstable"
31 : "github.com/cockroachdb/pebble/vfs"
32 : "github.com/cockroachdb/pebble/vfs/atomicfs"
33 : "github.com/cockroachdb/tokenbucket"
34 : "github.com/prometheus/client_golang/prometheus"
35 : )
36 :
37 : const (
38 : // minTableCacheSize is the minimum size of the table cache, for a single db.
39 : minTableCacheSize = 64
40 :
41 : // numNonTableCacheFiles is an approximation for the number of files
42 : // that we don't use for table caches, for a given db.
43 : numNonTableCacheFiles = 10
44 : )
45 :
46 : var (
47 : // ErrNotFound is returned when a get operation does not find the requested
48 : // key.
49 : ErrNotFound = base.ErrNotFound
50 : // ErrClosed is panicked when an operation is performed on a closed snapshot or
51 : // DB. Use errors.Is(err, ErrClosed) to check for this error.
52 : ErrClosed = errors.New("pebble: closed")
53 : // ErrReadOnly is returned when a write operation is performed on a read-only
54 : // database.
55 : ErrReadOnly = errors.New("pebble: read-only")
56 : // errNoSplit indicates that the user is trying to perform a range key
57 : // operation but the configured Comparer does not provide a Split
58 : // implementation.
59 : errNoSplit = errors.New("pebble: Comparer.Split required for range key operations")
60 : )
61 :
62 : // Reader is a readable key/value store.
63 : //
64 : // It is safe to call Get and NewIter from concurrent goroutines.
65 : type Reader interface {
66 : // Get gets the value for the given key. It returns ErrNotFound if the DB
67 : // does not contain the key.
68 : //
69 : // The caller should not modify the contents of the returned slice, but it is
70 : // safe to modify the contents of the argument after Get returns. The
71 : // returned slice will remain valid until the returned Closer is closed. On
72 : // success, the caller MUST call closer.Close() or a memory leak will occur.
73 : Get(key []byte) (value []byte, closer io.Closer, err error)
74 :
75 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
76 : // return false). The iterator can be positioned via a call to SeekGE,
77 : // SeekLT, First or Last.
78 : NewIter(o *IterOptions) (*Iterator, error)
79 :
80 : // NewIterWithContext is like NewIter, and additionally accepts a context
81 : // for tracing.
82 : NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error)
83 :
84 : // Close closes the Reader. It may or may not close any underlying io.Reader
85 : // or io.Writer, depending on how the DB was created.
86 : //
87 : // It is not safe to close a DB until all outstanding iterators are closed.
88 : // It is valid to call Close multiple times. Other methods should not be
89 : // called after the DB has been closed.
90 : Close() error
91 : }
92 :
93 : // Writer is a writable key/value store.
94 : //
95 : // Goroutine safety is dependent on the specific implementation.
96 : type Writer interface {
97 : // Apply the operations contained in the batch to the DB.
98 : //
99 : // It is safe to modify the contents of the arguments after Apply returns.
100 : Apply(batch *Batch, o *WriteOptions) error
101 :
102 : // Delete deletes the value for the given key. Deletes are blind all will
103 : // succeed even if the given key does not exist.
104 : //
105 : // It is safe to modify the contents of the arguments after Delete returns.
106 : Delete(key []byte, o *WriteOptions) error
107 :
108 : // DeleteSized behaves identically to Delete, but takes an additional
109 : // argument indicating the size of the value being deleted. DeleteSized
110 : // should be preferred when the caller has the expectation that there exists
111 : // a single internal KV pair for the key (eg, the key has not been
112 : // overwritten recently), and the caller knows the size of its value.
113 : //
114 : // DeleteSized will record the value size within the tombstone and use it to
115 : // inform compaction-picking heuristics which strive to reduce space
116 : // amplification in the LSM. This "calling your shot" mechanic allows the
117 : // storage engine to more accurately estimate and reduce space
118 : // amplification.
119 : //
120 : // It is safe to modify the contents of the arguments after DeleteSized
121 : // returns.
122 : DeleteSized(key []byte, valueSize uint32, _ *WriteOptions) error
123 :
124 : // SingleDelete is similar to Delete in that it deletes the value for the given key. Like Delete,
125 : // it is a blind operation that will succeed even if the given key does not exist.
126 : //
127 : // WARNING: Undefined (non-deterministic) behavior will result if a key is overwritten and
128 : // then deleted using SingleDelete. The record may appear deleted immediately, but be
129 : // resurrected at a later time after compactions have been performed. Or the record may
130 : // be deleted permanently. A Delete operation lays down a "tombstone" which shadows all
131 : // previous versions of a key. The SingleDelete operation is akin to "anti-matter" and will
132 : // only delete the most recently written version for a key. These different semantics allow
133 : // the DB to avoid propagating a SingleDelete operation during a compaction as soon as the
134 : // corresponding Set operation is encountered. These semantics require extreme care to handle
135 : // properly. Only use if you have a workload where the performance gain is critical and you
136 : // can guarantee that a record is written once and then deleted once.
137 : //
138 : // SingleDelete is internally transformed into a Delete if the most recent record for a key is either
139 : // a Merge or Delete record.
140 : //
141 : // It is safe to modify the contents of the arguments after SingleDelete returns.
142 : SingleDelete(key []byte, o *WriteOptions) error
143 :
144 : // DeleteRange deletes all of the point keys (and values) in the range
145 : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
146 : // delete overlapping range keys (eg, keys set via RangeKeySet).
147 : //
148 : // It is safe to modify the contents of the arguments after DeleteRange
149 : // returns.
150 : DeleteRange(start, end []byte, o *WriteOptions) error
151 :
152 : // LogData adds the specified to the batch. The data will be written to the
153 : // WAL, but not added to memtables or sstables. Log data is never indexed,
154 : // which makes it useful for testing WAL performance.
155 : //
156 : // It is safe to modify the contents of the argument after LogData returns.
157 : LogData(data []byte, opts *WriteOptions) error
158 :
159 : // Merge merges the value for the given key. The details of the merge are
160 : // dependent upon the configured merge operation.
161 : //
162 : // It is safe to modify the contents of the arguments after Merge returns.
163 : Merge(key, value []byte, o *WriteOptions) error
164 :
165 : // Set sets the value for the given key. It overwrites any previous value
166 : // for that key; a DB is not a multi-map.
167 : //
168 : // It is safe to modify the contents of the arguments after Set returns.
169 : Set(key, value []byte, o *WriteOptions) error
170 :
171 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
172 : // timestamp suffix to value. The suffix is optional. If any portion of the key
173 : // range [start, end) is already set by a range key with the same suffix value,
174 : // RangeKeySet overrides it.
175 : //
176 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
177 : RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error
178 :
179 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
180 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
181 : // range key. RangeKeyUnset only removes portions of range keys that fall within
182 : // the [start, end) key span, and only range keys with suffixes that exactly
183 : // match the unset suffix.
184 : //
185 : // It is safe to modify the contents of the arguments after RangeKeyUnset
186 : // returns.
187 : RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error
188 :
189 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
190 : // (inclusive on start, exclusive on end). It does not delete point keys (for
191 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
192 : // bounds, including those with or without suffixes.
193 : //
194 : // It is safe to modify the contents of the arguments after RangeKeyDelete
195 : // returns.
196 : RangeKeyDelete(start, end []byte, opts *WriteOptions) error
197 : }
198 :
199 : // CPUWorkHandle represents a handle used by the CPUWorkPermissionGranter API.
200 : type CPUWorkHandle interface {
201 : // Permitted indicates whether Pebble can use additional CPU resources.
202 : Permitted() bool
203 : }
204 :
205 : // CPUWorkPermissionGranter is used to request permission to opportunistically
206 : // use additional CPUs to speed up internal background work.
207 : type CPUWorkPermissionGranter interface {
208 : // GetPermission returns a handle regardless of whether permission is granted
209 : // or not. In the latter case, the handle is only useful for recording
210 : // the CPU time actually spent on this calling goroutine.
211 : GetPermission(time.Duration) CPUWorkHandle
212 : // CPUWorkDone must be called regardless of whether CPUWorkHandle.Permitted
213 : // returns true or false.
214 : CPUWorkDone(CPUWorkHandle)
215 : }
216 :
217 : // Use a default implementation for the CPU work granter to avoid excessive nil
218 : // checks in the code.
219 : type defaultCPUWorkHandle struct{}
220 :
221 1 : func (d defaultCPUWorkHandle) Permitted() bool {
222 1 : return false
223 1 : }
224 :
225 : type defaultCPUWorkGranter struct{}
226 :
227 1 : func (d defaultCPUWorkGranter) GetPermission(_ time.Duration) CPUWorkHandle {
228 1 : return defaultCPUWorkHandle{}
229 1 : }
230 :
231 1 : func (d defaultCPUWorkGranter) CPUWorkDone(_ CPUWorkHandle) {}
232 :
233 : // DB provides a concurrent, persistent ordered key/value store.
234 : //
235 : // A DB's basic operations (Get, Set, Delete) should be self-explanatory. Get
236 : // and Delete will return ErrNotFound if the requested key is not in the store.
237 : // Callers are free to ignore this error.
238 : //
239 : // A DB also allows for iterating over the key/value pairs in key order. If d
240 : // is a DB, the code below prints all key/value pairs whose keys are 'greater
241 : // than or equal to' k:
242 : //
243 : // iter := d.NewIter(readOptions)
244 : // for iter.SeekGE(k); iter.Valid(); iter.Next() {
245 : // fmt.Printf("key=%q value=%q\n", iter.Key(), iter.Value())
246 : // }
247 : // return iter.Close()
248 : //
249 : // The Options struct holds the optional parameters for the DB, including a
250 : // Comparer to define a 'less than' relationship over keys. It is always valid
251 : // to pass a nil *Options, which means to use the default parameter values. Any
252 : // zero field of a non-nil *Options also means to use the default value for
253 : // that parameter. Thus, the code below uses a custom Comparer, but the default
254 : // values for every other parameter:
255 : //
256 : // db := pebble.Open(&Options{
257 : // Comparer: myComparer,
258 : // })
259 : type DB struct {
260 : // The count and size of referenced memtables. This includes memtables
261 : // present in DB.mu.mem.queue, as well as memtables that have been flushed
262 : // but are still referenced by an inuse readState, as well as up to one
263 : // memTable waiting to be reused and stored in d.memTableRecycle.
264 : memTableCount atomic.Int64
265 : memTableReserved atomic.Int64 // number of bytes reserved in the cache for memtables
266 : // memTableRecycle holds a pointer to an obsolete memtable. The next
267 : // memtable allocation will reuse this memtable if it has not already been
268 : // recycled.
269 : memTableRecycle atomic.Pointer[memTable]
270 :
271 : // The size of the current log file (i.e. db.mu.log.queue[len(queue)-1].
272 : logSize atomic.Uint64
273 :
274 : // The number of bytes available on disk.
275 : diskAvailBytes atomic.Uint64
276 :
277 : cacheID uint64
278 : dirname string
279 : walDirname string
280 : opts *Options
281 : cmp Compare
282 : equal Equal
283 : merge Merge
284 : split Split
285 : abbreviatedKey AbbreviatedKey
286 : // The threshold for determining when a batch is "large" and will skip being
287 : // inserted into a memtable.
288 : largeBatchThreshold uint64
289 : // The current OPTIONS file number.
290 : optionsFileNum base.DiskFileNum
291 : // The on-disk size of the current OPTIONS file.
292 : optionsFileSize uint64
293 :
294 : // objProvider is used to access and manage SSTs.
295 : objProvider objstorage.Provider
296 :
297 : fileLock *Lock
298 : dataDir vfs.File
299 : walDir vfs.File
300 :
301 : tableCache *tableCacheContainer
302 : newIters tableNewIters
303 : tableNewRangeKeyIter keyspan.TableNewSpanIter
304 :
305 : commit *commitPipeline
306 :
307 : // readState provides access to the state needed for reading without needing
308 : // to acquire DB.mu.
309 : readState struct {
310 : sync.RWMutex
311 : val *readState
312 : }
313 : // logRecycler holds a set of log file numbers that are available for
314 : // reuse. Writing to a recycled log file is faster than to a new log file on
315 : // some common filesystems (xfs, and ext3/4) due to avoiding metadata
316 : // updates.
317 : logRecycler logRecycler
318 :
319 : closed *atomic.Value
320 : closedCh chan struct{}
321 :
322 : cleanupManager *cleanupManager
323 :
324 : // During an iterator close, we may asynchronously schedule read compactions.
325 : // We want to wait for those goroutines to finish, before closing the DB.
326 : // compactionShedulers.Wait() should not be called while the DB.mu is held.
327 : compactionSchedulers sync.WaitGroup
328 :
329 : // The main mutex protecting internal DB state. This mutex encompasses many
330 : // fields because those fields need to be accessed and updated atomically. In
331 : // particular, the current version, log.*, mem.*, and snapshot list need to
332 : // be accessed and updated atomically during compaction.
333 : //
334 : // Care is taken to avoid holding DB.mu during IO operations. Accomplishing
335 : // this sometimes requires releasing DB.mu in a method that was called with
336 : // it held. See versionSet.logAndApply() and DB.makeRoomForWrite() for
337 : // examples. This is a common pattern, so be careful about expectations that
338 : // DB.mu will be held continuously across a set of calls.
339 : mu struct {
340 : sync.Mutex
341 :
342 : formatVers struct {
343 : // vers is the database's current format major version.
344 : // Backwards-incompatible features are gated behind new
345 : // format major versions and not enabled until a database's
346 : // version is ratcheted upwards.
347 : //
348 : // Although this is under the `mu` prefix, readers may read vers
349 : // atomically without holding d.mu. Writers must only write to this
350 : // value through finalizeFormatVersUpgrade which requires d.mu is
351 : // held.
352 : vers atomic.Uint64
353 : // marker is the atomic marker for the format major version.
354 : // When a database's version is ratcheted upwards, the
355 : // marker is moved in order to atomically record the new
356 : // version.
357 : marker *atomicfs.Marker
358 : // ratcheting when set to true indicates that the database is
359 : // currently in the process of ratcheting the format major version
360 : // to vers + 1. As a part of ratcheting the format major version,
361 : // migrations may drop and re-acquire the mutex.
362 : ratcheting bool
363 : }
364 :
365 : // The ID of the next job. Job IDs are passed to event listener
366 : // notifications and act as a mechanism for tying together the events and
367 : // log messages for a single job such as a flush, compaction, or file
368 : // ingestion. Job IDs are not serialized to disk or used for correctness.
369 : nextJobID int
370 :
371 : // The collection of immutable versions and state about the log and visible
372 : // sequence numbers. Use the pointer here to ensure the atomic fields in
373 : // version set are aligned properly.
374 : versions *versionSet
375 :
376 : log struct {
377 : // The queue of logs, containing both flushed and unflushed logs. The
378 : // flushed logs will be a prefix, the unflushed logs a suffix. The
379 : // delimeter between flushed and unflushed logs is
380 : // versionSet.minUnflushedLogNum.
381 : queue []fileInfo
382 : // The number of input bytes to the log. This is the raw size of the
383 : // batches written to the WAL, without the overhead of the record
384 : // envelopes.
385 : bytesIn uint64
386 : // The LogWriter is protected by commitPipeline.mu. This allows log
387 : // writes to be performed without holding DB.mu, but requires both
388 : // commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
389 : // (i.e. makeRoomForWrite).
390 : *record.LogWriter
391 : // Can be nil.
392 : metrics struct {
393 : fsyncLatency prometheus.Histogram
394 : record.LogWriterMetrics
395 : }
396 : registerLogWriterForTesting func(w *record.LogWriter)
397 : }
398 :
399 : mem struct {
400 : // The current mutable memTable.
401 : mutable *memTable
402 : // Queue of flushables (the mutable memtable is at end). Elements are
403 : // added to the end of the slice and removed from the beginning. Once an
404 : // index is set it is never modified making a fixed slice immutable and
405 : // safe for concurrent reads.
406 : queue flushableList
407 : // nextSize is the size of the next memtable. The memtable size starts at
408 : // min(256KB,Options.MemTableSize) and doubles each time a new memtable
409 : // is allocated up to Options.MemTableSize. This reduces the memory
410 : // footprint of memtables when lots of DB instances are used concurrently
411 : // in test environments.
412 : nextSize uint64
413 : }
414 :
415 : compact struct {
416 : // Condition variable used to signal when a flush or compaction has
417 : // completed. Used by the write-stall mechanism to wait for the stall
418 : // condition to clear. See DB.makeRoomForWrite().
419 : cond sync.Cond
420 : // True when a flush is in progress.
421 : flushing bool
422 : // The number of ongoing compactions.
423 : compactingCount int
424 : // The list of deletion hints, suggesting ranges for delete-only
425 : // compactions.
426 : deletionHints []deleteCompactionHint
427 : // The list of manual compactions. The next manual compaction to perform
428 : // is at the start of the list. New entries are added to the end.
429 : manual []*manualCompaction
430 : // downloads is the list of suggested download tasks. The next download to
431 : // perform is at the start of the list. New entries are added to the end.
432 : downloads []*downloadSpan
433 : // inProgress is the set of in-progress flushes and compactions.
434 : // It's used in the calculation of some metrics and to initialize L0
435 : // sublevels' state. Some of the compactions contained within this
436 : // map may have already committed an edit to the version but are
437 : // lingering performing cleanup, like deleting obsolete files.
438 : inProgress map[*compaction]struct{}
439 :
440 : // rescheduleReadCompaction indicates to an iterator that a read compaction
441 : // should be scheduled.
442 : rescheduleReadCompaction bool
443 :
444 : // readCompactions is a readCompactionQueue which keeps track of the
445 : // compactions which we might have to perform.
446 : readCompactions readCompactionQueue
447 :
448 : // The cumulative duration of all completed compactions since Open.
449 : // Does not include flushes.
450 : duration time.Duration
451 : // Flush throughput metric.
452 : flushWriteThroughput ThroughputMetric
453 : // The idle start time for the flush "loop", i.e., when the flushing
454 : // bool above transitions to false.
455 : noOngoingFlushStartTime time.Time
456 : }
457 :
458 : // Non-zero when file cleaning is disabled. The disabled count acts as a
459 : // reference count to prohibit file cleaning. See
460 : // DB.{disable,Enable}FileDeletions().
461 : disableFileDeletions int
462 :
463 : snapshots struct {
464 : // The list of active snapshots.
465 : snapshotList
466 :
467 : // The cumulative count and size of snapshot-pinned keys written to
468 : // sstables.
469 : cumulativePinnedCount uint64
470 : cumulativePinnedSize uint64
471 : }
472 :
473 : tableStats struct {
474 : // Condition variable used to signal the completion of a
475 : // job to collect table stats.
476 : cond sync.Cond
477 : // True when a stat collection operation is in progress.
478 : loading bool
479 : // True if stat collection has loaded statistics for all tables
480 : // other than those listed explicitly in pending. This flag starts
481 : // as false when a database is opened and flips to true once stat
482 : // collection has caught up.
483 : loadedInitial bool
484 : // A slice of files for which stats have not been computed.
485 : // Compactions, ingests, flushes append files to be processed. An
486 : // active stat collection goroutine clears the list and processes
487 : // them.
488 : pending []manifest.NewFileEntry
489 : }
490 :
491 : tableValidation struct {
492 : // cond is a condition variable used to signal the completion of a
493 : // job to validate one or more sstables.
494 : cond sync.Cond
495 : // pending is a slice of metadata for sstables waiting to be
496 : // validated. Only physical sstables should be added to the pending
497 : // queue.
498 : pending []newFileEntry
499 : // validating is set to true when validation is running.
500 : validating bool
501 : }
502 : }
503 :
504 : // Normally equal to time.Now() but may be overridden in tests.
505 : timeNow func() time.Time
506 : // the time at database Open; may be used to compute metrics like effective
507 : // compaction concurrency
508 : openedAt time.Time
509 : }
510 :
511 : var _ Reader = (*DB)(nil)
512 : var _ Writer = (*DB)(nil)
513 :
514 : // TestOnlyWaitForCleaning MUST only be used in tests.
515 0 : func (d *DB) TestOnlyWaitForCleaning() {
516 0 : d.cleanupManager.Wait()
517 0 : }
518 :
519 : // Get gets the value for the given key. It returns ErrNotFound if the DB does
520 : // not contain the key.
521 : //
522 : // The caller should not modify the contents of the returned slice, but it is
523 : // safe to modify the contents of the argument after Get returns. The returned
524 : // slice will remain valid until the returned Closer is closed. On success, the
525 : // caller MUST call closer.Close() or a memory leak will occur.
526 1 : func (d *DB) Get(key []byte) ([]byte, io.Closer, error) {
527 1 : return d.getInternal(key, nil /* batch */, nil /* snapshot */)
528 1 : }
529 :
530 : type getIterAlloc struct {
531 : dbi Iterator
532 : keyBuf []byte
533 : get getIter
534 : }
535 :
536 : var getIterAllocPool = sync.Pool{
537 1 : New: func() interface{} {
538 1 : return &getIterAlloc{}
539 1 : },
540 : }
541 :
542 1 : func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer, error) {
543 1 : if err := d.closed.Load(); err != nil {
544 0 : panic(err)
545 : }
546 :
547 : // Grab and reference the current readState. This prevents the underlying
548 : // files in the associated version from being deleted if there is a current
549 : // compaction. The readState is unref'd by Iterator.Close().
550 1 : readState := d.loadReadState()
551 1 :
552 1 : // Determine the seqnum to read at after grabbing the read state (current and
553 1 : // memtables) above.
554 1 : var seqNum uint64
555 1 : if s != nil {
556 1 : seqNum = s.seqNum
557 1 : } else {
558 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
559 1 : }
560 :
561 1 : buf := getIterAllocPool.Get().(*getIterAlloc)
562 1 :
563 1 : get := &buf.get
564 1 : *get = getIter{
565 1 : logger: d.opts.Logger,
566 1 : comparer: d.opts.Comparer,
567 1 : newIters: d.newIters,
568 1 : snapshot: seqNum,
569 1 : key: key,
570 1 : batch: b,
571 1 : mem: readState.memtables,
572 1 : l0: readState.current.L0SublevelFiles,
573 1 : version: readState.current,
574 1 : }
575 1 :
576 1 : // Strip off memtables which cannot possibly contain the seqNum being read
577 1 : // at.
578 1 : for len(get.mem) > 0 {
579 1 : n := len(get.mem)
580 1 : if logSeqNum := get.mem[n-1].logSeqNum; logSeqNum < seqNum {
581 1 : break
582 : }
583 1 : get.mem = get.mem[:n-1]
584 : }
585 :
586 1 : i := &buf.dbi
587 1 : pointIter := get
588 1 : *i = Iterator{
589 1 : ctx: context.Background(),
590 1 : getIterAlloc: buf,
591 1 : iter: pointIter,
592 1 : pointIter: pointIter,
593 1 : merge: d.merge,
594 1 : comparer: *d.opts.Comparer,
595 1 : readState: readState,
596 1 : keyBuf: buf.keyBuf,
597 1 : }
598 1 :
599 1 : if !i.First() {
600 1 : err := i.Close()
601 1 : if err != nil {
602 0 : return nil, nil, err
603 0 : }
604 1 : return nil, nil, ErrNotFound
605 : }
606 1 : return i.Value(), i, nil
607 : }
608 :
609 : // Set sets the value for the given key. It overwrites any previous value
610 : // for that key; a DB is not a multi-map.
611 : //
612 : // It is safe to modify the contents of the arguments after Set returns.
613 1 : func (d *DB) Set(key, value []byte, opts *WriteOptions) error {
614 1 : b := newBatch(d)
615 1 : _ = b.Set(key, value, opts)
616 1 : if err := d.Apply(b, opts); err != nil {
617 0 : return err
618 0 : }
619 : // Only release the batch on success.
620 1 : b.release()
621 1 : return nil
622 : }
623 :
624 : // Delete deletes the value for the given key. Deletes are blind all will
625 : // succeed even if the given key does not exist.
626 : //
627 : // It is safe to modify the contents of the arguments after Delete returns.
628 1 : func (d *DB) Delete(key []byte, opts *WriteOptions) error {
629 1 : b := newBatch(d)
630 1 : _ = b.Delete(key, opts)
631 1 : if err := d.Apply(b, opts); err != nil {
632 0 : return err
633 0 : }
634 : // Only release the batch on success.
635 1 : b.release()
636 1 : return nil
637 : }
638 :
639 : // DeleteSized behaves identically to Delete, but takes an additional
640 : // argument indicating the size of the value being deleted. DeleteSized
641 : // should be preferred when the caller has the expectation that there exists
642 : // a single internal KV pair for the key (eg, the key has not been
643 : // overwritten recently), and the caller knows the size of its value.
644 : //
645 : // DeleteSized will record the value size within the tombstone and use it to
646 : // inform compaction-picking heuristics which strive to reduce space
647 : // amplification in the LSM. This "calling your shot" mechanic allows the
648 : // storage engine to more accurately estimate and reduce space amplification.
649 : //
650 : // It is safe to modify the contents of the arguments after DeleteSized
651 : // returns.
652 1 : func (d *DB) DeleteSized(key []byte, valueSize uint32, opts *WriteOptions) error {
653 1 : b := newBatch(d)
654 1 : _ = b.DeleteSized(key, valueSize, opts)
655 1 : if err := d.Apply(b, opts); err != nil {
656 0 : return err
657 0 : }
658 : // Only release the batch on success.
659 1 : b.release()
660 1 : return nil
661 : }
662 :
663 : // SingleDelete adds an action to the batch that single deletes the entry for key.
664 : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
665 : //
666 : // It is safe to modify the contents of the arguments after SingleDelete returns.
667 1 : func (d *DB) SingleDelete(key []byte, opts *WriteOptions) error {
668 1 : b := newBatch(d)
669 1 : _ = b.SingleDelete(key, opts)
670 1 : if err := d.Apply(b, opts); err != nil {
671 0 : return err
672 0 : }
673 : // Only release the batch on success.
674 1 : b.release()
675 1 : return nil
676 : }
677 :
678 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
679 : // (inclusive on start, exclusive on end).
680 : //
681 : // It is safe to modify the contents of the arguments after DeleteRange
682 : // returns.
683 1 : func (d *DB) DeleteRange(start, end []byte, opts *WriteOptions) error {
684 1 : b := newBatch(d)
685 1 : _ = b.DeleteRange(start, end, opts)
686 1 : if err := d.Apply(b, opts); err != nil {
687 0 : return err
688 0 : }
689 : // Only release the batch on success.
690 1 : b.release()
691 1 : return nil
692 : }
693 :
694 : // Merge adds an action to the DB that merges the value at key with the new
695 : // value. The details of the merge are dependent upon the configured merge
696 : // operator.
697 : //
698 : // It is safe to modify the contents of the arguments after Merge returns.
699 1 : func (d *DB) Merge(key, value []byte, opts *WriteOptions) error {
700 1 : b := newBatch(d)
701 1 : _ = b.Merge(key, value, opts)
702 1 : if err := d.Apply(b, opts); err != nil {
703 0 : return err
704 0 : }
705 : // Only release the batch on success.
706 1 : b.release()
707 1 : return nil
708 : }
709 :
710 : // LogData adds the specified to the batch. The data will be written to the
711 : // WAL, but not added to memtables or sstables. Log data is never indexed,
712 : // which makes it useful for testing WAL performance.
713 : //
714 : // It is safe to modify the contents of the argument after LogData returns.
715 0 : func (d *DB) LogData(data []byte, opts *WriteOptions) error {
716 0 : b := newBatch(d)
717 0 : _ = b.LogData(data, opts)
718 0 : if err := d.Apply(b, opts); err != nil {
719 0 : return err
720 0 : }
721 : // Only release the batch on success.
722 0 : b.release()
723 0 : return nil
724 : }
725 :
726 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
727 : // timestamp suffix to value. The suffix is optional. If any portion of the key
728 : // range [start, end) is already set by a range key with the same suffix value,
729 : // RangeKeySet overrides it.
730 : //
731 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
732 1 : func (d *DB) RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error {
733 1 : b := newBatch(d)
734 1 : _ = b.RangeKeySet(start, end, suffix, value, opts)
735 1 : if err := d.Apply(b, opts); err != nil {
736 0 : return err
737 0 : }
738 : // Only release the batch on success.
739 1 : b.release()
740 1 : return nil
741 : }
742 :
743 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
744 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
745 : // range key. RangeKeyUnset only removes portions of range keys that fall within
746 : // the [start, end) key span, and only range keys with suffixes that exactly
747 : // match the unset suffix.
748 : //
749 : // It is safe to modify the contents of the arguments after RangeKeyUnset
750 : // returns.
751 1 : func (d *DB) RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error {
752 1 : b := newBatch(d)
753 1 : _ = b.RangeKeyUnset(start, end, suffix, opts)
754 1 : if err := d.Apply(b, opts); err != nil {
755 0 : return err
756 0 : }
757 : // Only release the batch on success.
758 1 : b.release()
759 1 : return nil
760 : }
761 :
762 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
763 : // (inclusive on start, exclusive on end). It does not delete point keys (for
764 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
765 : // bounds, including those with or without suffixes.
766 : //
767 : // It is safe to modify the contents of the arguments after RangeKeyDelete
768 : // returns.
769 1 : func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error {
770 1 : b := newBatch(d)
771 1 : _ = b.RangeKeyDelete(start, end, opts)
772 1 : if err := d.Apply(b, opts); err != nil {
773 0 : return err
774 0 : }
775 : // Only release the batch on success.
776 1 : b.release()
777 1 : return nil
778 : }
779 :
780 : // Apply the operations contained in the batch to the DB. If the batch is large
781 : // the contents of the batch may be retained by the database. If that occurs
782 : // the batch contents will be cleared preventing the caller from attempting to
783 : // reuse them.
784 : //
785 : // It is safe to modify the contents of the arguments after Apply returns.
786 1 : func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
787 1 : return d.applyInternal(batch, opts, false)
788 1 : }
789 :
790 : // ApplyNoSyncWait must only be used when opts.Sync is true and the caller
791 : // does not want to wait for the WAL fsync to happen. The method will return
792 : // once the mutation is applied to the memtable and is visible (note that a
793 : // mutation is visible before the WAL sync even in the wait case, so we have
794 : // not weakened the durability semantics). The caller must call Batch.SyncWait
795 : // to wait for the WAL fsync. The caller must not Close the batch without
796 : // first calling Batch.SyncWait.
797 : //
798 : // RECOMMENDATION: Prefer using Apply unless you really understand why you
799 : // need ApplyNoSyncWait.
800 : // EXPERIMENTAL: API/feature subject to change. Do not yet use outside
801 : // CockroachDB.
802 1 : func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error {
803 1 : if !opts.Sync {
804 0 : return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false")
805 0 : }
806 1 : return d.applyInternal(batch, opts, true)
807 : }
808 :
809 : // REQUIRES: noSyncWait => opts.Sync
810 1 : func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error {
811 1 : if err := d.closed.Load(); err != nil {
812 0 : panic(err)
813 : }
814 1 : if batch.committing {
815 0 : panic("pebble: batch already committing")
816 : }
817 1 : if batch.applied.Load() {
818 0 : panic("pebble: batch already applied")
819 : }
820 1 : if d.opts.ReadOnly {
821 0 : return ErrReadOnly
822 0 : }
823 1 : if batch.db != nil && batch.db != d {
824 0 : panic(fmt.Sprintf("pebble: batch db mismatch: %p != %p", batch.db, d))
825 : }
826 :
827 1 : sync := opts.GetSync()
828 1 : if sync && d.opts.DisableWAL {
829 0 : return errors.New("pebble: WAL disabled")
830 0 : }
831 :
832 1 : if fmv := d.FormatMajorVersion(); fmv < batch.minimumFormatMajorVersion {
833 0 : panic(fmt.Sprintf(
834 0 : "pebble: batch requires at least format major version %d (current: %d)",
835 0 : batch.minimumFormatMajorVersion, fmv,
836 0 : ))
837 : }
838 :
839 1 : if batch.countRangeKeys > 0 {
840 1 : if d.split == nil {
841 0 : return errNoSplit
842 0 : }
843 : }
844 1 : batch.committing = true
845 1 :
846 1 : if batch.db == nil {
847 0 : if err := batch.refreshMemTableSize(); err != nil {
848 0 : return err
849 0 : }
850 : }
851 1 : if batch.memTableSize >= d.largeBatchThreshold {
852 1 : var err error
853 1 : batch.flushable, err = newFlushableBatch(batch, d.opts.Comparer)
854 1 : if err != nil {
855 0 : return err
856 0 : }
857 : }
858 1 : if err := d.commit.Commit(batch, sync, noSyncWait); err != nil {
859 0 : // There isn't much we can do on an error here. The commit pipeline will be
860 0 : // horked at this point.
861 0 : d.opts.Logger.Fatalf("pebble: fatal commit error: %v", err)
862 0 : }
863 : // If this is a large batch, we need to clear the batch contents as the
864 : // flushable batch may still be present in the flushables queue.
865 : //
866 : // TODO(peter): Currently large batches are written to the WAL. We could
867 : // skip the WAL write and instead wait for the large batch to be flushed to
868 : // an sstable. For a 100 MB batch, this might actually be faster. For a 1
869 : // GB batch this is almost certainly faster.
870 1 : if batch.flushable != nil {
871 1 : batch.data = nil
872 1 : }
873 1 : return nil
874 : }
875 :
876 1 : func (d *DB) commitApply(b *Batch, mem *memTable) error {
877 1 : if b.flushable != nil {
878 1 : // This is a large batch which was already added to the immutable queue.
879 1 : return nil
880 1 : }
881 1 : err := mem.apply(b, b.SeqNum())
882 1 : if err != nil {
883 0 : return err
884 0 : }
885 :
886 : // If the batch contains range tombstones and the database is configured
887 : // to flush range deletions, schedule a delayed flush so that disk space
888 : // may be reclaimed without additional writes or an explicit flush.
889 1 : if b.countRangeDels > 0 && d.opts.FlushDelayDeleteRange > 0 {
890 1 : d.mu.Lock()
891 1 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayDeleteRange)
892 1 : d.mu.Unlock()
893 1 : }
894 :
895 : // If the batch contains range keys and the database is configured to flush
896 : // range keys, schedule a delayed flush so that the range keys are cleared
897 : // from the memtable.
898 1 : if b.countRangeKeys > 0 && d.opts.FlushDelayRangeKey > 0 {
899 1 : d.mu.Lock()
900 1 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayRangeKey)
901 1 : d.mu.Unlock()
902 1 : }
903 :
904 1 : if mem.writerUnref() {
905 1 : d.mu.Lock()
906 1 : d.maybeScheduleFlush()
907 1 : d.mu.Unlock()
908 1 : }
909 1 : return nil
910 : }
911 :
912 1 : func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
913 1 : var size int64
914 1 : repr := b.Repr()
915 1 :
916 1 : if b.flushable != nil {
917 1 : // We have a large batch. Such batches are special in that they don't get
918 1 : // added to the memtable, and are instead inserted into the queue of
919 1 : // memtables. The call to makeRoomForWrite with this batch will force the
920 1 : // current memtable to be flushed. We want the large batch to be part of
921 1 : // the same log, so we add it to the WAL here, rather than after the call
922 1 : // to makeRoomForWrite().
923 1 : //
924 1 : // Set the sequence number since it was not set to the correct value earlier
925 1 : // (see comment in newFlushableBatch()).
926 1 : b.flushable.setSeqNum(b.SeqNum())
927 1 : if !d.opts.DisableWAL {
928 1 : var err error
929 1 : size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr)
930 1 : if err != nil {
931 0 : panic(err)
932 : }
933 : }
934 : }
935 :
936 1 : d.mu.Lock()
937 1 :
938 1 : var err error
939 1 : if !b.ingestedSSTBatch {
940 1 : // Batches which contain keys of kind InternalKeyKindIngestSST will
941 1 : // never be applied to the memtable, so we don't need to make room for
942 1 : // write. For the other cases, switch out the memtable if there was not
943 1 : // enough room to store the batch.
944 1 : err = d.makeRoomForWrite(b)
945 1 : }
946 :
947 1 : if err == nil && !d.opts.DisableWAL {
948 1 : d.mu.log.bytesIn += uint64(len(repr))
949 1 : }
950 :
951 : // Grab a reference to the memtable while holding DB.mu. Note that for
952 : // non-flushable batches (b.flushable == nil) makeRoomForWrite() added a
953 : // reference to the memtable which will prevent it from being flushed until
954 : // we unreference it. This reference is dropped in DB.commitApply().
955 1 : mem := d.mu.mem.mutable
956 1 :
957 1 : d.mu.Unlock()
958 1 : if err != nil {
959 0 : return nil, err
960 0 : }
961 :
962 1 : if d.opts.DisableWAL {
963 1 : return mem, nil
964 1 : }
965 :
966 1 : if b.flushable == nil {
967 1 : size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr)
968 1 : if err != nil {
969 0 : panic(err)
970 : }
971 : }
972 :
973 1 : d.logSize.Store(uint64(size))
974 1 : return mem, err
975 : }
976 :
977 : type iterAlloc struct {
978 : dbi Iterator
979 : keyBuf []byte
980 : boundsBuf [2][]byte
981 : prefixOrFullSeekKey []byte
982 : merging mergingIter
983 : mlevels [3 + numLevels]mergingIterLevel
984 : levels [3 + numLevels]levelIter
985 : levelsPositioned [3 + numLevels]bool
986 : }
987 :
988 : var iterAllocPool = sync.Pool{
989 1 : New: func() interface{} {
990 1 : return &iterAlloc{}
991 1 : },
992 : }
993 :
994 : // snapshotIterOpts denotes snapshot-related iterator options when calling
995 : // newIter. These are the possible cases for a snapshotIterOpts:
996 : // - No snapshot: All fields are zero values.
997 : // - Classic snapshot: Only `seqNum` is set. The latest readState will be used
998 : // and the specified seqNum will be used as the snapshot seqNum.
999 : // - EventuallyFileOnlySnapshot (EFOS) behaving as a classic snapshot. Only
1000 : // the `seqNum` is set. The latest readState will be used
1001 : // and the specified seqNum will be used as the snapshot seqNum.
1002 : // - EFOS in file-only state: Only `seqNum` and `vers` are set. All the
1003 : // relevant SSTs are referenced by the *version.
1004 : // - EFOS that has been excised but is in alwaysCreateIters mode (tests only).
1005 : // Only `seqNum` and `readState` are set.
1006 : type snapshotIterOpts struct {
1007 : seqNum uint64
1008 : vers *version
1009 : readState *readState
1010 : }
1011 :
1012 : type batchIterOpts struct {
1013 : batchOnly bool
1014 : }
1015 : type newIterOpts struct {
1016 : snapshot snapshotIterOpts
1017 : batch batchIterOpts
1018 : }
1019 :
1020 : // newIter constructs a new iterator, merging in batch iterators as an extra
1021 : // level.
1022 : func (d *DB) newIter(
1023 : ctx context.Context, batch *Batch, internalOpts newIterOpts, o *IterOptions,
1024 1 : ) *Iterator {
1025 1 : if internalOpts.batch.batchOnly {
1026 0 : if batch == nil {
1027 0 : panic("batchOnly is true, but batch is nil")
1028 : }
1029 0 : if internalOpts.snapshot.vers != nil {
1030 0 : panic("batchOnly is true, but snapshotIterOpts is initialized")
1031 : }
1032 : }
1033 1 : if err := d.closed.Load(); err != nil {
1034 0 : panic(err)
1035 : }
1036 1 : seqNum := internalOpts.snapshot.seqNum
1037 1 : if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
1038 0 : panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
1039 : }
1040 1 : if (batch != nil || seqNum != 0) && (o != nil && o.OnlyReadGuaranteedDurable) {
1041 0 : // We could add support for OnlyReadGuaranteedDurable on snapshots if
1042 0 : // there was a need: this would require checking that the sequence number
1043 0 : // of the snapshot has been flushed, by comparing with
1044 0 : // DB.mem.queue[0].logSeqNum.
1045 0 : panic("OnlyReadGuaranteedDurable is not supported for batches or snapshots")
1046 : }
1047 1 : var readState *readState
1048 1 : var newIters tableNewIters
1049 1 : var newIterRangeKey keyspan.TableNewSpanIter
1050 1 : if !internalOpts.batch.batchOnly {
1051 1 : // Grab and reference the current readState. This prevents the underlying
1052 1 : // files in the associated version from being deleted if there is a current
1053 1 : // compaction. The readState is unref'd by Iterator.Close().
1054 1 : if internalOpts.snapshot.vers == nil {
1055 1 : if internalOpts.snapshot.readState != nil {
1056 1 : readState = internalOpts.snapshot.readState
1057 1 : readState.ref()
1058 1 : } else {
1059 1 : // NB: loadReadState() calls readState.ref().
1060 1 : readState = d.loadReadState()
1061 1 : }
1062 1 : } else {
1063 1 : // vers != nil
1064 1 : internalOpts.snapshot.vers.Ref()
1065 1 : }
1066 :
1067 : // Determine the seqnum to read at after grabbing the read state (current and
1068 : // memtables) above.
1069 1 : if seqNum == 0 {
1070 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
1071 1 : }
1072 1 : newIters = d.newIters
1073 1 : newIterRangeKey = d.tableNewRangeKeyIter
1074 : }
1075 :
1076 : // Bundle various structures under a single umbrella in order to allocate
1077 : // them together.
1078 1 : buf := iterAllocPool.Get().(*iterAlloc)
1079 1 : dbi := &buf.dbi
1080 1 : *dbi = Iterator{
1081 1 : ctx: ctx,
1082 1 : alloc: buf,
1083 1 : merge: d.merge,
1084 1 : comparer: *d.opts.Comparer,
1085 1 : readState: readState,
1086 1 : version: internalOpts.snapshot.vers,
1087 1 : keyBuf: buf.keyBuf,
1088 1 : prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
1089 1 : boundsBuf: buf.boundsBuf,
1090 1 : batch: batch,
1091 1 : newIters: newIters,
1092 1 : newIterRangeKey: newIterRangeKey,
1093 1 : seqNum: seqNum,
1094 1 : batchOnlyIter: internalOpts.batch.batchOnly,
1095 1 : }
1096 1 : if o != nil {
1097 1 : dbi.opts = *o
1098 1 : dbi.processBounds(o.LowerBound, o.UpperBound)
1099 1 : }
1100 1 : dbi.opts.logger = d.opts.Logger
1101 1 : if d.opts.private.disableLazyCombinedIteration {
1102 1 : dbi.opts.disableLazyCombinedIteration = true
1103 1 : }
1104 1 : if batch != nil {
1105 1 : dbi.batchSeqNum = dbi.batch.nextSeqNum()
1106 1 : }
1107 1 : return finishInitializingIter(ctx, buf)
1108 : }
1109 :
1110 : // finishInitializingIter is a helper for doing the non-trivial initialization
1111 : // of an Iterator. It's invoked to perform the initial initialization of an
1112 : // Iterator during NewIter or Clone, and to perform reinitialization due to a
1113 : // change in IterOptions by a call to Iterator.SetOptions.
1114 1 : func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
1115 1 : // Short-hand.
1116 1 : dbi := &buf.dbi
1117 1 : var memtables flushableList
1118 1 : if dbi.readState != nil {
1119 1 : memtables = dbi.readState.memtables
1120 1 : }
1121 1 : if dbi.opts.OnlyReadGuaranteedDurable {
1122 0 : memtables = nil
1123 1 : } else {
1124 1 : // We only need to read from memtables which contain sequence numbers older
1125 1 : // than seqNum. Trim off newer memtables.
1126 1 : for i := len(memtables) - 1; i >= 0; i-- {
1127 1 : if logSeqNum := memtables[i].logSeqNum; logSeqNum < dbi.seqNum {
1128 1 : break
1129 : }
1130 1 : memtables = memtables[:i]
1131 : }
1132 : }
1133 :
1134 1 : if dbi.opts.pointKeys() {
1135 1 : // Construct the point iterator, initializing dbi.pointIter to point to
1136 1 : // dbi.merging. If this is called during a SetOptions call and this
1137 1 : // Iterator has already initialized dbi.merging, constructPointIter is a
1138 1 : // noop and an initialized pointIter already exists in dbi.pointIter.
1139 1 : dbi.constructPointIter(ctx, memtables, buf)
1140 1 : dbi.iter = dbi.pointIter
1141 1 : } else {
1142 1 : dbi.iter = emptyIter
1143 1 : }
1144 :
1145 1 : if dbi.opts.rangeKeys() {
1146 1 : dbi.rangeKeyMasking.init(dbi, dbi.comparer.Compare, dbi.comparer.Split)
1147 1 :
1148 1 : // When iterating over both point and range keys, don't create the
1149 1 : // range-key iterator stack immediately if we can avoid it. This
1150 1 : // optimization takes advantage of the expected sparseness of range
1151 1 : // keys, and configures the point-key iterator to dynamically switch to
1152 1 : // combined iteration when it observes a file containing range keys.
1153 1 : //
1154 1 : // Lazy combined iteration is not possible if a batch or a memtable
1155 1 : // contains any range keys.
1156 1 : useLazyCombinedIteration := dbi.rangeKey == nil &&
1157 1 : dbi.opts.KeyTypes == IterKeyTypePointsAndRanges &&
1158 1 : (dbi.batch == nil || dbi.batch.countRangeKeys == 0) &&
1159 1 : !dbi.opts.disableLazyCombinedIteration
1160 1 : if useLazyCombinedIteration {
1161 1 : // The user requested combined iteration, and there's no indexed
1162 1 : // batch currently containing range keys that would prevent lazy
1163 1 : // combined iteration. Check the memtables to see if they contain
1164 1 : // any range keys.
1165 1 : for i := range memtables {
1166 1 : if memtables[i].containsRangeKeys() {
1167 1 : useLazyCombinedIteration = false
1168 1 : break
1169 : }
1170 : }
1171 : }
1172 :
1173 1 : if useLazyCombinedIteration {
1174 1 : dbi.lazyCombinedIter = lazyCombinedIter{
1175 1 : parent: dbi,
1176 1 : pointIter: dbi.pointIter,
1177 1 : combinedIterState: combinedIterState{
1178 1 : initialized: false,
1179 1 : },
1180 1 : }
1181 1 : dbi.iter = &dbi.lazyCombinedIter
1182 1 : dbi.iter = invalidating.MaybeWrapIfInvariants(dbi.iter)
1183 1 : } else {
1184 1 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{
1185 1 : initialized: true,
1186 1 : }
1187 1 : if dbi.rangeKey == nil {
1188 1 : dbi.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1189 1 : dbi.rangeKey.init(dbi.comparer.Compare, dbi.comparer.Split, &dbi.opts)
1190 1 : dbi.constructRangeKeyIter()
1191 1 : } else {
1192 1 : dbi.rangeKey.iterConfig.SetBounds(dbi.opts.LowerBound, dbi.opts.UpperBound)
1193 1 : }
1194 :
1195 : // Wrap the point iterator (currently dbi.iter) with an interleaving
1196 : // iterator that interleaves range keys pulled from
1197 : // dbi.rangeKey.rangeKeyIter.
1198 : //
1199 : // NB: The interleaving iterator is always reinitialized, even if
1200 : // dbi already had an initialized range key iterator, in case the point
1201 : // iterator changed or the range key masking suffix changed.
1202 1 : dbi.rangeKey.iiter.Init(&dbi.comparer, dbi.iter, dbi.rangeKey.rangeKeyIter,
1203 1 : keyspan.InterleavingIterOpts{
1204 1 : Mask: &dbi.rangeKeyMasking,
1205 1 : LowerBound: dbi.opts.LowerBound,
1206 1 : UpperBound: dbi.opts.UpperBound,
1207 1 : })
1208 1 : dbi.iter = &dbi.rangeKey.iiter
1209 : }
1210 1 : } else {
1211 1 : // !dbi.opts.rangeKeys()
1212 1 : //
1213 1 : // Reset the combined iterator state. The initialized=true ensures the
1214 1 : // iterator doesn't unnecessarily try to switch to combined iteration.
1215 1 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{initialized: true}
1216 1 : }
1217 1 : return dbi
1218 : }
1219 :
1220 : // ScanInternal scans all internal keys within the specified bounds, truncating
1221 : // any rangedels and rangekeys to those bounds if they span past them. For use
1222 : // when an external user needs to be aware of all internal keys that make up a
1223 : // key range.
1224 : //
1225 : // Keys deleted by range deletions must not be returned or exposed by this
1226 : // method, while the range deletion deleting that key must be exposed using
1227 : // visitRangeDel. Keys that would be masked by range key masking (if an
1228 : // appropriate prefix were set) should be exposed, alongside the range key
1229 : // that would have masked it. This method also collapses all point keys into
1230 : // one InternalKey; so only one internal key at most per user key is returned
1231 : // to visitPointKey.
1232 : //
1233 : // If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
1234 : // mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
1235 : // their metadatas truncated to [lower, upper) and passed into visitSharedFile.
1236 : // ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
1237 : // sstable in L5 or L6 is found that is not in shared storage according to
1238 : // provider.IsShared, or an sstable in those levels contains a newer key than the
1239 : // snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
1240 : // of when this could happen could be if Pebble started writing sstables before a
1241 : // creator ID was set (as creator IDs are necessary to enable shared storage)
1242 : // resulting in some lower level SSTs being on non-shared storage. Skip-shared
1243 : // iteration is invalid in those cases.
1244 : func (d *DB) ScanInternal(
1245 : ctx context.Context,
1246 : categoryAndQoS sstable.CategoryAndQoS,
1247 : lower, upper []byte,
1248 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
1249 : visitRangeDel func(start, end []byte, seqNum uint64) error,
1250 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
1251 : visitSharedFile func(sst *SharedSSTMeta) error,
1252 1 : ) error {
1253 1 : scanInternalOpts := &scanInternalOptions{
1254 1 : CategoryAndQoS: categoryAndQoS,
1255 1 : visitPointKey: visitPointKey,
1256 1 : visitRangeDel: visitRangeDel,
1257 1 : visitRangeKey: visitRangeKey,
1258 1 : visitSharedFile: visitSharedFile,
1259 1 : skipSharedLevels: visitSharedFile != nil,
1260 1 : IterOptions: IterOptions{
1261 1 : KeyTypes: IterKeyTypePointsAndRanges,
1262 1 : LowerBound: lower,
1263 1 : UpperBound: upper,
1264 1 : },
1265 1 : }
1266 1 : iter, err := d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, scanInternalOpts)
1267 1 : if err != nil {
1268 0 : return err
1269 0 : }
1270 1 : defer iter.close()
1271 1 : return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
1272 : }
1273 :
1274 : // newInternalIter constructs and returns a new scanInternalIterator on this db.
1275 : // If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
1276 : // to the internal iterator.
1277 : //
1278 : // TODO(bilal): This method has a lot of similarities with db.newIter as well as
1279 : // finishInitializingIter. Both pairs of methods should be refactored to reduce
1280 : // this duplication.
1281 : func (d *DB) newInternalIter(
1282 : ctx context.Context, sOpts snapshotIterOpts, o *scanInternalOptions,
1283 1 : ) (*scanInternalIterator, error) {
1284 1 : if err := d.closed.Load(); err != nil {
1285 0 : panic(err)
1286 : }
1287 : // Grab and reference the current readState. This prevents the underlying
1288 : // files in the associated version from being deleted if there is a current
1289 : // compaction. The readState is unref'd by Iterator.Close().
1290 1 : var readState *readState
1291 1 : if sOpts.vers == nil {
1292 1 : if sOpts.readState != nil {
1293 0 : readState = sOpts.readState
1294 0 : readState.ref()
1295 1 : } else {
1296 1 : readState = d.loadReadState()
1297 1 : }
1298 : }
1299 1 : if sOpts.vers != nil {
1300 0 : sOpts.vers.Ref()
1301 0 : }
1302 :
1303 : // Determine the seqnum to read at after grabbing the read state (current and
1304 : // memtables) above.
1305 1 : seqNum := sOpts.seqNum
1306 1 : if seqNum == 0 {
1307 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
1308 1 : }
1309 :
1310 : // Bundle various structures under a single umbrella in order to allocate
1311 : // them together.
1312 1 : buf := iterAllocPool.Get().(*iterAlloc)
1313 1 : dbi := &scanInternalIterator{
1314 1 : ctx: ctx,
1315 1 : db: d,
1316 1 : comparer: d.opts.Comparer,
1317 1 : merge: d.opts.Merger.Merge,
1318 1 : readState: readState,
1319 1 : version: sOpts.vers,
1320 1 : alloc: buf,
1321 1 : newIters: d.newIters,
1322 1 : newIterRangeKey: d.tableNewRangeKeyIter,
1323 1 : seqNum: seqNum,
1324 1 : mergingIter: &buf.merging,
1325 1 : }
1326 1 : dbi.opts = *o
1327 1 : dbi.opts.logger = d.opts.Logger
1328 1 : if d.opts.private.disableLazyCombinedIteration {
1329 1 : dbi.opts.disableLazyCombinedIteration = true
1330 1 : }
1331 1 : return finishInitializingInternalIter(buf, dbi)
1332 : }
1333 :
1334 : func finishInitializingInternalIter(
1335 : buf *iterAlloc, i *scanInternalIterator,
1336 1 : ) (*scanInternalIterator, error) {
1337 1 : // Short-hand.
1338 1 : var memtables flushableList
1339 1 : if i.readState != nil {
1340 1 : memtables = i.readState.memtables
1341 1 : }
1342 : // We only need to read from memtables which contain sequence numbers older
1343 : // than seqNum. Trim off newer memtables.
1344 1 : for j := len(memtables) - 1; j >= 0; j-- {
1345 1 : if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
1346 1 : break
1347 : }
1348 1 : memtables = memtables[:j]
1349 : }
1350 1 : i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
1351 1 :
1352 1 : i.constructPointIter(i.opts.CategoryAndQoS, memtables, buf)
1353 1 :
1354 1 : // For internal iterators, we skip the lazy combined iteration optimization
1355 1 : // entirely, and create the range key iterator stack directly.
1356 1 : i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1357 1 : i.rangeKey.init(i.comparer.Compare, i.comparer.Split, &i.opts.IterOptions)
1358 1 : if err := i.constructRangeKeyIter(); err != nil {
1359 0 : return nil, err
1360 0 : }
1361 :
1362 : // Wrap the point iterator (currently i.iter) with an interleaving
1363 : // iterator that interleaves range keys pulled from
1364 : // i.rangeKey.rangeKeyIter.
1365 1 : i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
1366 1 : keyspan.InterleavingIterOpts{
1367 1 : LowerBound: i.opts.LowerBound,
1368 1 : UpperBound: i.opts.UpperBound,
1369 1 : })
1370 1 : i.iter = &i.rangeKey.iiter
1371 1 :
1372 1 : return i, nil
1373 : }
1374 :
1375 : func (i *Iterator) constructPointIter(
1376 : ctx context.Context, memtables flushableList, buf *iterAlloc,
1377 1 : ) {
1378 1 : if i.pointIter != nil {
1379 1 : // Already have one.
1380 1 : return
1381 1 : }
1382 1 : internalOpts := internalIterOpts{stats: &i.stats.InternalStats}
1383 1 : if i.opts.RangeKeyMasking.Filter != nil {
1384 1 : internalOpts.boundLimitedFilter = &i.rangeKeyMasking
1385 1 : }
1386 :
1387 : // Merging levels and levels from iterAlloc.
1388 1 : mlevels := buf.mlevels[:0]
1389 1 : levels := buf.levels[:0]
1390 1 :
1391 1 : // We compute the number of levels needed ahead of time and reallocate a slice if
1392 1 : // the array from the iterAlloc isn't large enough. Doing this allocation once
1393 1 : // should improve the performance.
1394 1 : numMergingLevels := 0
1395 1 : numLevelIters := 0
1396 1 : if i.batch != nil {
1397 1 : numMergingLevels++
1398 1 : }
1399 :
1400 1 : var current *version
1401 1 : if !i.batchOnlyIter {
1402 1 : numMergingLevels += len(memtables)
1403 1 :
1404 1 : current = i.version
1405 1 : if current == nil {
1406 1 : current = i.readState.current
1407 1 : }
1408 1 : numMergingLevels += len(current.L0SublevelFiles)
1409 1 : numLevelIters += len(current.L0SublevelFiles)
1410 1 : for level := 1; level < len(current.Levels); level++ {
1411 1 : if current.Levels[level].Empty() {
1412 1 : continue
1413 : }
1414 1 : numMergingLevels++
1415 1 : numLevelIters++
1416 : }
1417 : }
1418 :
1419 1 : if numMergingLevels > cap(mlevels) {
1420 1 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
1421 1 : }
1422 1 : if numLevelIters > cap(levels) {
1423 1 : levels = make([]levelIter, 0, numLevelIters)
1424 1 : }
1425 :
1426 : // Top-level is the batch, if any.
1427 1 : if i.batch != nil {
1428 1 : if i.batch.index == nil {
1429 0 : // This isn't an indexed batch. We shouldn't have gotten this far.
1430 0 : panic(errors.AssertionFailedf("creating an iterator over an unindexed batch"))
1431 1 : } else {
1432 1 : i.batch.initInternalIter(&i.opts, &i.batchPointIter)
1433 1 : i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum)
1434 1 : // Only include the batch's rangedel iterator if it's non-empty.
1435 1 : // This requires some subtle logic in the case a rangedel is later
1436 1 : // written to the batch and the view of the batch is refreshed
1437 1 : // during a call to SetOptions—in this case, we need to reconstruct
1438 1 : // the point iterator to add the batch rangedel iterator.
1439 1 : var rangeDelIter keyspan.FragmentIterator
1440 1 : if i.batchRangeDelIter.Count() > 0 {
1441 1 : rangeDelIter = &i.batchRangeDelIter
1442 1 : }
1443 1 : mlevels = append(mlevels, mergingIterLevel{
1444 1 : iter: &i.batchPointIter,
1445 1 : rangeDelIter: rangeDelIter,
1446 1 : })
1447 : }
1448 : }
1449 :
1450 1 : if !i.batchOnlyIter {
1451 1 : // Next are the memtables.
1452 1 : for j := len(memtables) - 1; j >= 0; j-- {
1453 1 : mem := memtables[j]
1454 1 : mlevels = append(mlevels, mergingIterLevel{
1455 1 : iter: mem.newIter(&i.opts),
1456 1 : rangeDelIter: mem.newRangeDelIter(&i.opts),
1457 1 : })
1458 1 : }
1459 :
1460 : // Next are the file levels: L0 sub-levels followed by lower levels.
1461 1 : mlevelsIndex := len(mlevels)
1462 1 : levelsIndex := len(levels)
1463 1 : mlevels = mlevels[:numMergingLevels]
1464 1 : levels = levels[:numLevelIters]
1465 1 : i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
1466 1 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
1467 1 : li := &levels[levelsIndex]
1468 1 :
1469 1 : li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
1470 1 : li.initRangeDel(&mlevels[mlevelsIndex].rangeDelIter)
1471 1 : li.initBoundaryContext(&mlevels[mlevelsIndex].levelIterBoundaryContext)
1472 1 : li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
1473 1 : mlevels[mlevelsIndex].levelIter = li
1474 1 : mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
1475 1 :
1476 1 : levelsIndex++
1477 1 : mlevelsIndex++
1478 1 : }
1479 :
1480 : // Add level iterators for the L0 sublevels, iterating from newest to
1481 : // oldest.
1482 1 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
1483 1 : addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
1484 1 : }
1485 :
1486 : // Add level iterators for the non-empty non-L0 levels.
1487 1 : for level := 1; level < len(current.Levels); level++ {
1488 1 : if current.Levels[level].Empty() {
1489 1 : continue
1490 : }
1491 1 : addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
1492 : }
1493 : }
1494 1 : buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
1495 1 : if len(mlevels) <= cap(buf.levelsPositioned) {
1496 1 : buf.merging.levelsPositioned = buf.levelsPositioned[:len(mlevels)]
1497 1 : }
1498 1 : buf.merging.snapshot = i.seqNum
1499 1 : buf.merging.batchSnapshot = i.batchSeqNum
1500 1 : buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
1501 1 : i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging)
1502 1 : i.merging = &buf.merging
1503 : }
1504 :
1505 : // NewBatch returns a new empty write-only batch. Any reads on the batch will
1506 : // return an error. If the batch is committed it will be applied to the DB.
1507 1 : func (d *DB) NewBatch() *Batch {
1508 1 : return newBatch(d)
1509 1 : }
1510 :
1511 : // NewBatchWithSize is mostly identical to NewBatch, but it will allocate the
1512 : // the specified memory space for the internal slice in advance.
1513 0 : func (d *DB) NewBatchWithSize(size int) *Batch {
1514 0 : return newBatchWithSize(d, size)
1515 0 : }
1516 :
1517 : // NewIndexedBatch returns a new empty read-write batch. Any reads on the batch
1518 : // will read from both the batch and the DB. If the batch is committed it will
1519 : // be applied to the DB. An indexed batch is slower that a non-indexed batch
1520 : // for insert operations. If you do not need to perform reads on the batch, use
1521 : // NewBatch instead.
1522 1 : func (d *DB) NewIndexedBatch() *Batch {
1523 1 : return newIndexedBatch(d, d.opts.Comparer)
1524 1 : }
1525 :
1526 : // NewIndexedBatchWithSize is mostly identical to NewIndexedBatch, but it will
1527 : // allocate the the specified memory space for the internal slice in advance.
1528 0 : func (d *DB) NewIndexedBatchWithSize(size int) *Batch {
1529 0 : return newIndexedBatchWithSize(d, d.opts.Comparer, size)
1530 0 : }
1531 :
1532 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
1533 : // return false). The iterator can be positioned via a call to SeekGE, SeekLT,
1534 : // First or Last. The iterator provides a point-in-time view of the current DB
1535 : // state. This view is maintained by preventing file deletions and preventing
1536 : // memtables referenced by the iterator from being deleted. Using an iterator
1537 : // to maintain a long-lived point-in-time view of the DB state can lead to an
1538 : // apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for
1539 : // point-in-time snapshots which avoids these problems.
1540 1 : func (d *DB) NewIter(o *IterOptions) (*Iterator, error) {
1541 1 : return d.NewIterWithContext(context.Background(), o)
1542 1 : }
1543 :
1544 : // NewIterWithContext is like NewIter, and additionally accepts a context for
1545 : // tracing.
1546 1 : func (d *DB) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
1547 1 : return d.newIter(ctx, nil /* batch */, newIterOpts{}, o), nil
1548 1 : }
1549 :
1550 : // NewSnapshot returns a point-in-time view of the current DB state. Iterators
1551 : // created with this handle will all observe a stable snapshot of the current
1552 : // DB state. The caller must call Snapshot.Close() when the snapshot is no
1553 : // longer needed. Snapshots are not persisted across DB restarts (close ->
1554 : // open). Unlike the implicit snapshot maintained by an iterator, a snapshot
1555 : // will not prevent memtables from being released or sstables from being
1556 : // deleted. Instead, a snapshot prevents deletion of sequence numbers
1557 : // referenced by the snapshot.
1558 1 : func (d *DB) NewSnapshot() *Snapshot {
1559 1 : if err := d.closed.Load(); err != nil {
1560 0 : panic(err)
1561 : }
1562 :
1563 1 : d.mu.Lock()
1564 1 : s := &Snapshot{
1565 1 : db: d,
1566 1 : seqNum: d.mu.versions.visibleSeqNum.Load(),
1567 1 : }
1568 1 : d.mu.snapshots.pushBack(s)
1569 1 : d.mu.Unlock()
1570 1 : return s
1571 : }
1572 :
1573 : // NewEventuallyFileOnlySnapshot returns a point-in-time view of the current DB
1574 : // state, similar to NewSnapshot, but with consistency constrained to the
1575 : // provided set of key ranges. See the comment at EventuallyFileOnlySnapshot for
1576 : // its semantics.
1577 1 : func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
1578 1 : if err := d.closed.Load(); err != nil {
1579 0 : panic(err)
1580 : }
1581 :
1582 1 : internalKeyRanges := make([]internalKeyRange, len(keyRanges))
1583 1 : for i := range keyRanges {
1584 1 : if i > 0 && d.cmp(keyRanges[i-1].End, keyRanges[i].Start) > 0 {
1585 0 : panic("pebble: key ranges for eventually-file-only-snapshot not in order")
1586 : }
1587 1 : internalKeyRanges[i] = internalKeyRange{
1588 1 : smallest: base.MakeInternalKey(keyRanges[i].Start, InternalKeySeqNumMax, InternalKeyKindMax),
1589 1 : largest: base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, keyRanges[i].End),
1590 1 : }
1591 : }
1592 :
1593 1 : return d.makeEventuallyFileOnlySnapshot(keyRanges, internalKeyRanges)
1594 : }
1595 :
1596 : // Close closes the DB.
1597 : //
1598 : // It is not safe to close a DB until all outstanding iterators are closed
1599 : // or to call Close concurrently with any other DB method. It is not valid
1600 : // to call any of a DB's methods after the DB has been closed.
1601 1 : func (d *DB) Close() error {
1602 1 : // Lock the commit pipeline for the duration of Close. This prevents a race
1603 1 : // with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires
1604 1 : // dropping d.mu several times for I/O. If Close only holds d.mu, an
1605 1 : // in-progress WAL rotation may re-acquire d.mu only once the database is
1606 1 : // closed.
1607 1 : //
1608 1 : // Additionally, locking the commit pipeline makes it more likely that
1609 1 : // (illegal) concurrent writes will observe d.closed.Load() != nil, creating
1610 1 : // more understable panics if the database is improperly used concurrently
1611 1 : // during Close.
1612 1 : d.commit.mu.Lock()
1613 1 : defer d.commit.mu.Unlock()
1614 1 : d.mu.Lock()
1615 1 : defer d.mu.Unlock()
1616 1 : if err := d.closed.Load(); err != nil {
1617 0 : panic(err)
1618 : }
1619 :
1620 : // Clear the finalizer that is used to check that an unreferenced DB has been
1621 : // closed. We're closing the DB here, so the check performed by that
1622 : // finalizer isn't necessary.
1623 : //
1624 : // Note: this is a no-op if invariants are disabled or race is enabled.
1625 1 : invariants.SetFinalizer(d.closed, nil)
1626 1 :
1627 1 : d.closed.Store(errors.WithStack(ErrClosed))
1628 1 : close(d.closedCh)
1629 1 :
1630 1 : defer d.opts.Cache.Unref()
1631 1 :
1632 1 : for d.mu.compact.compactingCount > 0 || d.mu.compact.flushing {
1633 1 : d.mu.compact.cond.Wait()
1634 1 : }
1635 1 : for d.mu.tableStats.loading {
1636 1 : d.mu.tableStats.cond.Wait()
1637 1 : }
1638 1 : for d.mu.tableValidation.validating {
1639 1 : d.mu.tableValidation.cond.Wait()
1640 1 : }
1641 :
1642 1 : var err error
1643 1 : if n := len(d.mu.compact.inProgress); n > 0 {
1644 0 : err = errors.Errorf("pebble: %d unexpected in-progress compactions", errors.Safe(n))
1645 0 : }
1646 1 : err = firstError(err, d.mu.formatVers.marker.Close())
1647 1 : err = firstError(err, d.tableCache.close())
1648 1 : if !d.opts.ReadOnly {
1649 1 : err = firstError(err, d.mu.log.Close())
1650 1 : } else if d.mu.log.LogWriter != nil {
1651 0 : panic("pebble: log-writer should be nil in read-only mode")
1652 : }
1653 1 : err = firstError(err, d.fileLock.Close())
1654 1 :
1655 1 : // Note that versionSet.close() only closes the MANIFEST. The versions list
1656 1 : // is still valid for the checks below.
1657 1 : err = firstError(err, d.mu.versions.close())
1658 1 :
1659 1 : err = firstError(err, d.dataDir.Close())
1660 1 : if d.dataDir != d.walDir {
1661 1 : err = firstError(err, d.walDir.Close())
1662 1 : }
1663 :
1664 1 : d.readState.val.unrefLocked()
1665 1 :
1666 1 : current := d.mu.versions.currentVersion()
1667 1 : for v := d.mu.versions.versions.Front(); true; v = v.Next() {
1668 1 : refs := v.Refs()
1669 1 : if v == current {
1670 1 : if refs != 1 {
1671 0 : err = firstError(err, errors.Errorf("leaked iterators: current\n%s", v))
1672 0 : }
1673 1 : break
1674 : }
1675 0 : if refs != 0 {
1676 0 : err = firstError(err, errors.Errorf("leaked iterators:\n%s", v))
1677 0 : }
1678 : }
1679 :
1680 1 : for _, mem := range d.mu.mem.queue {
1681 1 : // Usually, we'd want to delete the files returned by readerUnref. But
1682 1 : // in this case, even if we're unreferencing the flushables, the
1683 1 : // flushables aren't obsolete. They will be reconstructed during WAL
1684 1 : // replay.
1685 1 : mem.readerUnrefLocked(false)
1686 1 : }
1687 : // If there's an unused, recycled memtable, we need to release its memory.
1688 1 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
1689 1 : d.freeMemTable(obsoleteMemTable)
1690 1 : }
1691 1 : if reserved := d.memTableReserved.Load(); reserved != 0 {
1692 0 : err = firstError(err, errors.Errorf("leaked memtable reservation: %d", errors.Safe(reserved)))
1693 0 : }
1694 :
1695 : // Since we called d.readState.val.unrefLocked() above, we are expected to
1696 : // manually schedule deletion of obsolete files.
1697 1 : if len(d.mu.versions.obsoleteTables) > 0 {
1698 1 : d.deleteObsoleteFiles(d.mu.nextJobID)
1699 1 : }
1700 :
1701 1 : d.mu.Unlock()
1702 1 : d.compactionSchedulers.Wait()
1703 1 :
1704 1 : // Wait for all cleaning jobs to finish.
1705 1 : d.cleanupManager.Close()
1706 1 :
1707 1 : // Sanity check metrics.
1708 1 : if invariants.Enabled {
1709 1 : m := d.Metrics()
1710 1 : if m.Compact.NumInProgress > 0 || m.Compact.InProgressBytes > 0 {
1711 0 : d.mu.Lock()
1712 0 : panic(fmt.Sprintf("invalid metrics on close:\n%s", m))
1713 : }
1714 : }
1715 :
1716 1 : d.mu.Lock()
1717 1 :
1718 1 : // As a sanity check, ensure that there are no zombie tables. A non-zero count
1719 1 : // hints at a reference count leak.
1720 1 : if ztbls := len(d.mu.versions.zombieTables); ztbls > 0 {
1721 0 : err = firstError(err, errors.Errorf("non-zero zombie file count: %d", ztbls))
1722 0 : }
1723 :
1724 1 : err = firstError(err, d.objProvider.Close())
1725 1 :
1726 1 : // If the options include a closer to 'close' the filesystem, close it.
1727 1 : if d.opts.private.fsCloser != nil {
1728 1 : d.opts.private.fsCloser.Close()
1729 1 : }
1730 :
1731 : // Return an error if the user failed to close all open snapshots.
1732 1 : if v := d.mu.snapshots.count(); v > 0 {
1733 0 : err = firstError(err, errors.Errorf("leaked snapshots: %d open snapshots on DB %p", v, d))
1734 0 : }
1735 :
1736 1 : return err
1737 : }
1738 :
1739 : // Compact the specified range of keys in the database.
1740 1 : func (d *DB) Compact(start, end []byte, parallelize bool) error {
1741 1 : if err := d.closed.Load(); err != nil {
1742 0 : panic(err)
1743 : }
1744 1 : if d.opts.ReadOnly {
1745 0 : return ErrReadOnly
1746 0 : }
1747 1 : if d.cmp(start, end) >= 0 {
1748 1 : return errors.Errorf("Compact start %s is not less than end %s",
1749 1 : d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
1750 1 : }
1751 1 : iStart := base.MakeInternalKey(start, InternalKeySeqNumMax, InternalKeyKindMax)
1752 1 : iEnd := base.MakeInternalKey(end, 0, 0)
1753 1 : m := (&fileMetadata{}).ExtendPointKeyBounds(d.cmp, iStart, iEnd)
1754 1 : meta := []*fileMetadata{m}
1755 1 :
1756 1 : d.mu.Lock()
1757 1 : maxLevelWithFiles := 1
1758 1 : cur := d.mu.versions.currentVersion()
1759 1 : for level := 0; level < numLevels; level++ {
1760 1 : overlaps := cur.Overlaps(level, d.cmp, start, end, iEnd.IsExclusiveSentinel())
1761 1 : if !overlaps.Empty() {
1762 1 : maxLevelWithFiles = level + 1
1763 1 : }
1764 : }
1765 :
1766 1 : keyRanges := make([]internalKeyRange, len(meta))
1767 1 : for i := range meta {
1768 1 : keyRanges[i] = internalKeyRange{smallest: m.Smallest, largest: m.Largest}
1769 1 : }
1770 : // Determine if any memtable overlaps with the compaction range. We wait for
1771 : // any such overlap to flush (initiating a flush if necessary).
1772 1 : mem, err := func() (*flushableEntry, error) {
1773 1 : // Check to see if any files overlap with any of the memtables. The queue
1774 1 : // is ordered from oldest to newest with the mutable memtable being the
1775 1 : // last element in the slice. We want to wait for the newest table that
1776 1 : // overlaps.
1777 1 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1778 1 : mem := d.mu.mem.queue[i]
1779 1 : if ingestMemtableOverlaps(d.cmp, mem, keyRanges) {
1780 1 : var err error
1781 1 : if mem.flushable == d.mu.mem.mutable {
1782 1 : // We have to hold both commitPipeline.mu and DB.mu when calling
1783 1 : // makeRoomForWrite(). Lock order requirements elsewhere force us to
1784 1 : // unlock DB.mu in order to grab commitPipeline.mu first.
1785 1 : d.mu.Unlock()
1786 1 : d.commit.mu.Lock()
1787 1 : d.mu.Lock()
1788 1 : defer d.commit.mu.Unlock()
1789 1 : if mem.flushable == d.mu.mem.mutable {
1790 1 : // Only flush if the active memtable is unchanged.
1791 1 : err = d.makeRoomForWrite(nil)
1792 1 : }
1793 : }
1794 1 : mem.flushForced = true
1795 1 : d.maybeScheduleFlush()
1796 1 : return mem, err
1797 : }
1798 : }
1799 1 : return nil, nil
1800 : }()
1801 :
1802 1 : d.mu.Unlock()
1803 1 :
1804 1 : if err != nil {
1805 0 : return err
1806 0 : }
1807 1 : if mem != nil {
1808 1 : <-mem.flushed
1809 1 : }
1810 :
1811 1 : for level := 0; level < maxLevelWithFiles; {
1812 1 : for {
1813 1 : if err := d.manualCompact(
1814 1 : iStart.UserKey, iEnd.UserKey, level, parallelize); err != nil {
1815 0 : if errors.Is(err, ErrCancelledCompaction) {
1816 0 : continue
1817 : }
1818 0 : return err
1819 : }
1820 1 : break
1821 : }
1822 1 : level++
1823 1 : if level == numLevels-1 {
1824 1 : // A manual compaction of the bottommost level occurred.
1825 1 : // There is no next level to try and compact.
1826 1 : break
1827 : }
1828 : }
1829 1 : return nil
1830 : }
1831 :
1832 1 : func (d *DB) manualCompact(start, end []byte, level int, parallelize bool) error {
1833 1 : d.mu.Lock()
1834 1 : curr := d.mu.versions.currentVersion()
1835 1 : files := curr.Overlaps(level, d.cmp, start, end, false)
1836 1 : if files.Empty() {
1837 1 : d.mu.Unlock()
1838 1 : return nil
1839 1 : }
1840 :
1841 1 : var compactions []*manualCompaction
1842 1 : if parallelize {
1843 1 : compactions = append(compactions, d.splitManualCompaction(start, end, level)...)
1844 1 : } else {
1845 1 : compactions = append(compactions, &manualCompaction{
1846 1 : level: level,
1847 1 : done: make(chan error, 1),
1848 1 : start: start,
1849 1 : end: end,
1850 1 : })
1851 1 : }
1852 1 : d.mu.compact.manual = append(d.mu.compact.manual, compactions...)
1853 1 : d.maybeScheduleCompaction()
1854 1 : d.mu.Unlock()
1855 1 :
1856 1 : // Each of the channels is guaranteed to be eventually sent to once. After a
1857 1 : // compaction is possibly picked in d.maybeScheduleCompaction(), either the
1858 1 : // compaction is dropped, executed after being scheduled, or retried later.
1859 1 : // Assuming eventual progress when a compaction is retried, all outcomes send
1860 1 : // a value to the done channel. Since the channels are buffered, it is not
1861 1 : // necessary to read from each channel, and so we can exit early in the event
1862 1 : // of an error.
1863 1 : for _, compaction := range compactions {
1864 1 : if err := <-compaction.done; err != nil {
1865 0 : return err
1866 0 : }
1867 : }
1868 1 : return nil
1869 : }
1870 :
1871 : // splitManualCompaction splits a manual compaction over [start,end] on level
1872 : // such that the resulting compactions have no key overlap.
1873 : func (d *DB) splitManualCompaction(
1874 : start, end []byte, level int,
1875 1 : ) (splitCompactions []*manualCompaction) {
1876 1 : curr := d.mu.versions.currentVersion()
1877 1 : endLevel := level + 1
1878 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
1879 1 : if level == 0 {
1880 1 : endLevel = baseLevel
1881 1 : }
1882 1 : keyRanges := calculateInuseKeyRanges(curr, d.cmp, level, endLevel, start, end)
1883 1 : for _, keyRange := range keyRanges {
1884 1 : splitCompactions = append(splitCompactions, &manualCompaction{
1885 1 : level: level,
1886 1 : done: make(chan error, 1),
1887 1 : start: keyRange.Start,
1888 1 : end: keyRange.End,
1889 1 : split: true,
1890 1 : })
1891 1 : }
1892 1 : return splitCompactions
1893 : }
1894 :
1895 : // DownloadSpan is a key range passed to the Download method.
1896 : type DownloadSpan struct {
1897 : StartKey []byte
1898 : // EndKey is exclusive.
1899 : EndKey []byte
1900 : }
1901 :
1902 0 : func (d *DB) downloadSpan(ctx context.Context, span DownloadSpan) error {
1903 0 : dSpan := &downloadSpan{
1904 0 : start: span.StartKey,
1905 0 : end: span.EndKey,
1906 0 : // Protected by d.mu.
1907 0 : doneChans: make([]chan error, 1),
1908 0 : }
1909 0 : dSpan.doneChans[0] = make(chan error, 1)
1910 0 : doneChan := dSpan.doneChans[0]
1911 0 : compactionIdx := 0
1912 0 :
1913 0 : func() {
1914 0 : d.mu.Lock()
1915 0 : defer d.mu.Unlock()
1916 0 :
1917 0 : d.mu.compact.downloads = append(d.mu.compact.downloads, dSpan)
1918 0 : d.maybeScheduleCompaction()
1919 0 : }()
1920 :
1921 : // Requires d.mu to be held.
1922 0 : noExternalFilesInSpan := func() (noExternalFiles bool) {
1923 0 : vers := d.mu.versions.currentVersion()
1924 0 :
1925 0 : for i := 0; i < len(vers.Levels); i++ {
1926 0 : if vers.Levels[i].Empty() {
1927 0 : continue
1928 : }
1929 0 : overlap := vers.Overlaps(i, d.cmp, span.StartKey, span.EndKey, true /* exclusiveEnd */)
1930 0 : foundExternalFile := false
1931 0 : overlap.Each(func(metadata *manifest.FileMetadata) {
1932 0 : objMeta, err := d.objProvider.Lookup(fileTypeTable, metadata.FileBacking.DiskFileNum)
1933 0 : if err != nil {
1934 0 : return
1935 0 : }
1936 0 : if objMeta.IsExternal() {
1937 0 : foundExternalFile = true
1938 0 : }
1939 : })
1940 0 : if foundExternalFile {
1941 0 : return false
1942 0 : }
1943 : }
1944 0 : return true
1945 : }
1946 :
1947 : // Requires d.mu to be held.
1948 0 : removeUsFromList := func() {
1949 0 : // Check where we are in d.mu.compact.downloads. Remove us from the
1950 0 : // list.
1951 0 : for i := range d.mu.compact.downloads {
1952 0 : if d.mu.compact.downloads[i] != dSpan {
1953 0 : continue
1954 : }
1955 0 : copy(d.mu.compact.downloads[i:], d.mu.compact.downloads[i+1:])
1956 0 : d.mu.compact.downloads = d.mu.compact.downloads[:len(d.mu.compact.downloads)-1]
1957 0 : break
1958 : }
1959 : }
1960 :
1961 0 : for {
1962 0 : select {
1963 0 : case <-ctx.Done():
1964 0 : d.mu.Lock()
1965 0 : defer d.mu.Unlock()
1966 0 : removeUsFromList()
1967 0 : return ctx.Err()
1968 0 : case err := <-doneChan:
1969 0 : if err != nil {
1970 0 : d.mu.Lock()
1971 0 : defer d.mu.Unlock()
1972 0 : removeUsFromList()
1973 0 : return err
1974 0 : }
1975 0 : compactionIdx++
1976 0 : // Grab the next doneCh to wait on.
1977 0 : func() {
1978 0 : d.mu.Lock()
1979 0 : defer d.mu.Unlock()
1980 0 : doneChan = dSpan.doneChans[compactionIdx]
1981 0 : }()
1982 0 : default:
1983 0 : doneSpan := func() bool {
1984 0 : d.mu.Lock()
1985 0 : defer d.mu.Unlock()
1986 0 : // It's possible to have downloaded all files without writing to any
1987 0 : // doneChans. This is expected if there are a significant amount
1988 0 : // of overlapping writes that schedule regular, non-download compactions.
1989 0 : if noExternalFilesInSpan() {
1990 0 : removeUsFromList()
1991 0 : return true
1992 0 : }
1993 0 : d.maybeScheduleCompaction()
1994 0 : d.mu.compact.cond.Wait()
1995 0 : return false
1996 : }()
1997 0 : if doneSpan {
1998 0 : return nil
1999 0 : }
2000 : }
2001 : }
2002 : }
2003 :
2004 : // Download ensures that the LSM does not use any external sstables for the
2005 : // given key ranges. It does so by performing appropriate compactions so that
2006 : // all external data becomes available locally.
2007 : //
2008 : // Note that calling this method does not imply that all other compactions stop;
2009 : // it simply informs Pebble of a list of spans for which external data should be
2010 : // downloaded with high priority.
2011 : //
2012 : // The method returns once no external sstasbles overlap the given spans, the
2013 : // context is canceled, or an error is hit.
2014 : //
2015 : // TODO(radu): consider passing a priority/impact knob to express how important
2016 : // the download is (versus live traffic performance, LSM health).
2017 0 : func (d *DB) Download(ctx context.Context, spans []DownloadSpan) error {
2018 0 : ctx, cancel := context.WithCancel(ctx)
2019 0 : defer cancel()
2020 0 : if err := d.closed.Load(); err != nil {
2021 0 : panic(err)
2022 : }
2023 0 : if d.opts.ReadOnly {
2024 0 : return ErrReadOnly
2025 0 : }
2026 0 : for i := range spans {
2027 0 : if err := ctx.Err(); err != nil {
2028 0 : return err
2029 0 : }
2030 0 : if err := d.downloadSpan(ctx, spans[i]); err != nil {
2031 0 : return err
2032 0 : }
2033 : }
2034 0 : return nil
2035 : }
2036 :
2037 : // Flush the memtable to stable storage.
2038 1 : func (d *DB) Flush() error {
2039 1 : flushDone, err := d.AsyncFlush()
2040 1 : if err != nil {
2041 0 : return err
2042 0 : }
2043 1 : <-flushDone
2044 1 : return nil
2045 : }
2046 :
2047 : // AsyncFlush asynchronously flushes the memtable to stable storage.
2048 : //
2049 : // If no error is returned, the caller can receive from the returned channel in
2050 : // order to wait for the flush to complete.
2051 1 : func (d *DB) AsyncFlush() (<-chan struct{}, error) {
2052 1 : if err := d.closed.Load(); err != nil {
2053 0 : panic(err)
2054 : }
2055 1 : if d.opts.ReadOnly {
2056 0 : return nil, ErrReadOnly
2057 0 : }
2058 :
2059 1 : d.commit.mu.Lock()
2060 1 : defer d.commit.mu.Unlock()
2061 1 : d.mu.Lock()
2062 1 : defer d.mu.Unlock()
2063 1 : flushed := d.mu.mem.queue[len(d.mu.mem.queue)-1].flushed
2064 1 : err := d.makeRoomForWrite(nil)
2065 1 : if err != nil {
2066 0 : return nil, err
2067 0 : }
2068 1 : return flushed, nil
2069 : }
2070 :
2071 : // Metrics returns metrics about the database.
2072 1 : func (d *DB) Metrics() *Metrics {
2073 1 : metrics := &Metrics{}
2074 1 : recycledLogsCount, recycledLogSize := d.logRecycler.stats()
2075 1 :
2076 1 : d.mu.Lock()
2077 1 : vers := d.mu.versions.currentVersion()
2078 1 : *metrics = d.mu.versions.metrics
2079 1 : metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt(0)
2080 1 : metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load()
2081 1 : metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount)
2082 1 : metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction
2083 1 : metrics.Compact.Duration = d.mu.compact.duration
2084 1 : for c := range d.mu.compact.inProgress {
2085 0 : if c.kind != compactionKindFlush {
2086 0 : metrics.Compact.Duration += d.timeNow().Sub(c.beganAt)
2087 0 : }
2088 : }
2089 :
2090 1 : for _, m := range d.mu.mem.queue {
2091 1 : metrics.MemTable.Size += m.totalBytes()
2092 1 : }
2093 1 : metrics.Snapshots.Count = d.mu.snapshots.count()
2094 1 : if metrics.Snapshots.Count > 0 {
2095 0 : metrics.Snapshots.EarliestSeqNum = d.mu.snapshots.earliest()
2096 0 : }
2097 1 : metrics.Snapshots.PinnedKeys = d.mu.snapshots.cumulativePinnedCount
2098 1 : metrics.Snapshots.PinnedSize = d.mu.snapshots.cumulativePinnedSize
2099 1 : metrics.MemTable.Count = int64(len(d.mu.mem.queue))
2100 1 : metrics.MemTable.ZombieCount = d.memTableCount.Load() - metrics.MemTable.Count
2101 1 : metrics.MemTable.ZombieSize = uint64(d.memTableReserved.Load()) - metrics.MemTable.Size
2102 1 : metrics.WAL.ObsoleteFiles = int64(recycledLogsCount)
2103 1 : metrics.WAL.ObsoletePhysicalSize = recycledLogSize
2104 1 : metrics.WAL.Size = d.logSize.Load()
2105 1 : // The current WAL size (d.atomic.logSize) is the current logical size,
2106 1 : // which may be less than the WAL's physical size if it was recycled.
2107 1 : // The file sizes in d.mu.log.queue are updated to the physical size
2108 1 : // during WAL rotation. Use the larger of the two for the current WAL. All
2109 1 : // the previous WALs's fileSizes in d.mu.log.queue are already updated.
2110 1 : metrics.WAL.PhysicalSize = metrics.WAL.Size
2111 1 : if len(d.mu.log.queue) > 0 && metrics.WAL.PhysicalSize < d.mu.log.queue[len(d.mu.log.queue)-1].fileSize {
2112 0 : metrics.WAL.PhysicalSize = d.mu.log.queue[len(d.mu.log.queue)-1].fileSize
2113 0 : }
2114 1 : for i, n := 0, len(d.mu.log.queue)-1; i < n; i++ {
2115 1 : metrics.WAL.PhysicalSize += d.mu.log.queue[i].fileSize
2116 1 : }
2117 :
2118 1 : metrics.WAL.BytesIn = d.mu.log.bytesIn // protected by d.mu
2119 1 : for i, n := 0, len(d.mu.mem.queue)-1; i < n; i++ {
2120 1 : metrics.WAL.Size += d.mu.mem.queue[i].logSize
2121 1 : }
2122 1 : metrics.WAL.BytesWritten = metrics.Levels[0].BytesIn + metrics.WAL.Size
2123 1 : if p := d.mu.versions.picker; p != nil {
2124 1 : compactions := d.getInProgressCompactionInfoLocked(nil)
2125 1 : for level, score := range p.getScores(compactions) {
2126 1 : metrics.Levels[level].Score = score
2127 1 : }
2128 : }
2129 1 : metrics.Table.ZombieCount = int64(len(d.mu.versions.zombieTables))
2130 1 : for _, size := range d.mu.versions.zombieTables {
2131 0 : metrics.Table.ZombieSize += size
2132 0 : }
2133 1 : metrics.private.optionsFileSize = d.optionsFileSize
2134 1 :
2135 1 : // TODO(jackson): Consider making these metrics optional.
2136 1 : metrics.Keys.RangeKeySetsCount = countRangeKeySetFragments(vers)
2137 1 : metrics.Keys.TombstoneCount = countTombstones(vers)
2138 1 :
2139 1 : d.mu.versions.logLock()
2140 1 : metrics.private.manifestFileSize = uint64(d.mu.versions.manifest.Size())
2141 1 : metrics.Table.BackingTableCount = uint64(len(d.mu.versions.backingState.fileBackingMap))
2142 1 : metrics.Table.BackingTableSize = d.mu.versions.backingState.fileBackingSize
2143 1 : if invariants.Enabled {
2144 1 : var totalSize uint64
2145 1 : for _, backing := range d.mu.versions.backingState.fileBackingMap {
2146 1 : totalSize += backing.Size
2147 1 : }
2148 1 : if totalSize != metrics.Table.BackingTableSize {
2149 0 : panic("pebble: invalid backing table size accounting")
2150 : }
2151 : }
2152 1 : d.mu.versions.logUnlock()
2153 1 :
2154 1 : metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
2155 1 : if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
2156 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2157 0 : }
2158 1 : metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput
2159 1 : if d.mu.compact.flushing {
2160 0 : metrics.Flush.NumInProgress = 1
2161 0 : }
2162 1 : for i := 0; i < numLevels; i++ {
2163 1 : metrics.Levels[i].Additional.ValueBlocksSize = valueBlocksSizeForLevel(vers, i)
2164 1 : }
2165 :
2166 1 : d.mu.Unlock()
2167 1 :
2168 1 : metrics.BlockCache = d.opts.Cache.Metrics()
2169 1 : metrics.TableCache, metrics.Filter = d.tableCache.metrics()
2170 1 : metrics.TableIters = int64(d.tableCache.iterCount())
2171 1 : metrics.CategoryStats = d.tableCache.dbOpts.sstStatsCollector.GetStats()
2172 1 :
2173 1 : metrics.SecondaryCacheMetrics = d.objProvider.Metrics()
2174 1 :
2175 1 : metrics.Uptime = d.timeNow().Sub(d.openedAt)
2176 1 :
2177 1 : return metrics
2178 : }
2179 :
2180 : // sstablesOptions hold the optional parameters to retrieve TableInfo for all sstables.
2181 : type sstablesOptions struct {
2182 : // set to true will return the sstable properties in TableInfo
2183 : withProperties bool
2184 :
2185 : // if set, return sstables that overlap the key range (end-exclusive)
2186 : start []byte
2187 : end []byte
2188 :
2189 : withApproximateSpanBytes bool
2190 : }
2191 :
2192 : // SSTablesOption set optional parameter used by `DB.SSTables`.
2193 : type SSTablesOption func(*sstablesOptions)
2194 :
2195 : // WithProperties enable return sstable properties in each TableInfo.
2196 : //
2197 : // NOTE: if most of the sstable properties need to be read from disk,
2198 : // this options may make method `SSTables` quite slow.
2199 0 : func WithProperties() SSTablesOption {
2200 0 : return func(opt *sstablesOptions) {
2201 0 : opt.withProperties = true
2202 0 : }
2203 : }
2204 :
2205 : // WithKeyRangeFilter ensures returned sstables overlap start and end (end-exclusive)
2206 : // if start and end are both nil these properties have no effect.
2207 0 : func WithKeyRangeFilter(start, end []byte) SSTablesOption {
2208 0 : return func(opt *sstablesOptions) {
2209 0 : opt.end = end
2210 0 : opt.start = start
2211 0 : }
2212 : }
2213 :
2214 : // WithApproximateSpanBytes enables capturing the approximate number of bytes that
2215 : // overlap the provided key span for each sstable.
2216 : // NOTE: this option can only be used with WithKeyRangeFilter and WithProperties
2217 : // provided.
2218 0 : func WithApproximateSpanBytes() SSTablesOption {
2219 0 : return func(opt *sstablesOptions) {
2220 0 : opt.withApproximateSpanBytes = true
2221 0 : }
2222 : }
2223 :
2224 : // BackingType denotes the type of storage backing a given sstable.
2225 : type BackingType int
2226 :
2227 : const (
2228 : // BackingTypeLocal denotes an sstable stored on local disk according to the
2229 : // objprovider. This file is completely owned by us.
2230 : BackingTypeLocal BackingType = iota
2231 : // BackingTypeShared denotes an sstable stored on shared storage, created
2232 : // by this Pebble instance and possibly shared by other Pebble instances.
2233 : // These types of files have lifecycle managed by Pebble.
2234 : BackingTypeShared
2235 : // BackingTypeSharedForeign denotes an sstable stored on shared storage,
2236 : // created by a Pebble instance other than this one. These types of files have
2237 : // lifecycle managed by Pebble.
2238 : BackingTypeSharedForeign
2239 : // BackingTypeExternal denotes an sstable stored on external storage,
2240 : // not owned by any Pebble instance and with no refcounting/cleanup methods
2241 : // or lifecycle management. An example of an external file is a file restored
2242 : // from a backup.
2243 : BackingTypeExternal
2244 : )
2245 :
2246 : // SSTableInfo export manifest.TableInfo with sstable.Properties alongside
2247 : // other file backing info.
2248 : type SSTableInfo struct {
2249 : manifest.TableInfo
2250 : // Virtual indicates whether the sstable is virtual.
2251 : Virtual bool
2252 : // BackingSSTNum is the file number associated with backing sstable which
2253 : // backs the sstable associated with this SSTableInfo. If Virtual is false,
2254 : // then BackingSSTNum == FileNum.
2255 : BackingSSTNum base.FileNum
2256 : // BackingType is the type of storage backing this sstable.
2257 : BackingType BackingType
2258 : // Locator is the remote.Locator backing this sstable, if the backing type is
2259 : // not BackingTypeLocal.
2260 : Locator remote.Locator
2261 :
2262 : // Properties is the sstable properties of this table. If Virtual is true,
2263 : // then the Properties are associated with the backing sst.
2264 : Properties *sstable.Properties
2265 : }
2266 :
2267 : // SSTables retrieves the current sstables. The returned slice is indexed by
2268 : // level and each level is indexed by the position of the sstable within the
2269 : // level. Note that this information may be out of date due to concurrent
2270 : // flushes and compactions.
2271 0 : func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
2272 0 : opt := &sstablesOptions{}
2273 0 : for _, fn := range opts {
2274 0 : fn(opt)
2275 0 : }
2276 :
2277 0 : if opt.withApproximateSpanBytes && !opt.withProperties {
2278 0 : return nil, errors.Errorf("Cannot use WithApproximateSpanBytes without WithProperties option.")
2279 0 : }
2280 0 : if opt.withApproximateSpanBytes && (opt.start == nil || opt.end == nil) {
2281 0 : return nil, errors.Errorf("Cannot use WithApproximateSpanBytes without WithKeyRangeFilter option.")
2282 0 : }
2283 :
2284 : // Grab and reference the current readState.
2285 0 : readState := d.loadReadState()
2286 0 : defer readState.unref()
2287 0 :
2288 0 : // TODO(peter): This is somewhat expensive, especially on a large
2289 0 : // database. It might be worthwhile to unify TableInfo and FileMetadata and
2290 0 : // then we could simply return current.Files. Note that RocksDB is doing
2291 0 : // something similar to the current code, so perhaps it isn't too bad.
2292 0 : srcLevels := readState.current.Levels
2293 0 : var totalTables int
2294 0 : for i := range srcLevels {
2295 0 : totalTables += srcLevels[i].Len()
2296 0 : }
2297 :
2298 0 : destTables := make([]SSTableInfo, totalTables)
2299 0 : destLevels := make([][]SSTableInfo, len(srcLevels))
2300 0 : for i := range destLevels {
2301 0 : iter := srcLevels[i].Iter()
2302 0 : j := 0
2303 0 : for m := iter.First(); m != nil; m = iter.Next() {
2304 0 : if opt.start != nil && opt.end != nil && !m.Overlaps(d.opts.Comparer.Compare, opt.start, opt.end, true /* exclusive end */) {
2305 0 : continue
2306 : }
2307 0 : destTables[j] = SSTableInfo{TableInfo: m.TableInfo()}
2308 0 : if opt.withProperties {
2309 0 : p, err := d.tableCache.getTableProperties(
2310 0 : m,
2311 0 : )
2312 0 : if err != nil {
2313 0 : return nil, err
2314 0 : }
2315 0 : destTables[j].Properties = p
2316 : }
2317 0 : destTables[j].Virtual = m.Virtual
2318 0 : destTables[j].BackingSSTNum = m.FileBacking.DiskFileNum.FileNum()
2319 0 : objMeta, err := d.objProvider.Lookup(fileTypeTable, m.FileBacking.DiskFileNum)
2320 0 : if err != nil {
2321 0 : return nil, err
2322 0 : }
2323 0 : if objMeta.IsRemote() {
2324 0 : if objMeta.IsShared() {
2325 0 : if d.objProvider.IsSharedForeign(objMeta) {
2326 0 : destTables[j].BackingType = BackingTypeSharedForeign
2327 0 : } else {
2328 0 : destTables[j].BackingType = BackingTypeShared
2329 0 : }
2330 0 : } else {
2331 0 : destTables[j].BackingType = BackingTypeExternal
2332 0 : }
2333 0 : destTables[j].Locator = objMeta.Remote.Locator
2334 0 : } else {
2335 0 : destTables[j].BackingType = BackingTypeLocal
2336 0 : }
2337 :
2338 0 : if opt.withApproximateSpanBytes {
2339 0 : var spanBytes uint64
2340 0 : if m.ContainedWithinSpan(d.opts.Comparer.Compare, opt.start, opt.end) {
2341 0 : spanBytes = m.Size
2342 0 : } else {
2343 0 : size, err := d.tableCache.estimateSize(m, opt.start, opt.end)
2344 0 : if err != nil {
2345 0 : return nil, err
2346 0 : }
2347 0 : spanBytes = size
2348 : }
2349 0 : propertiesCopy := *destTables[j].Properties
2350 0 :
2351 0 : // Deep copy user properties so approximate span bytes can be added.
2352 0 : propertiesCopy.UserProperties = make(map[string]string, len(destTables[j].Properties.UserProperties)+1)
2353 0 : for k, v := range destTables[j].Properties.UserProperties {
2354 0 : propertiesCopy.UserProperties[k] = v
2355 0 : }
2356 0 : propertiesCopy.UserProperties["approximate-span-bytes"] = strconv.FormatUint(spanBytes, 10)
2357 0 : destTables[j].Properties = &propertiesCopy
2358 : }
2359 0 : j++
2360 : }
2361 0 : destLevels[i] = destTables[:j]
2362 0 : destTables = destTables[j:]
2363 : }
2364 :
2365 0 : return destLevels, nil
2366 : }
2367 :
2368 : // EstimateDiskUsage returns the estimated filesystem space used in bytes for
2369 : // storing the range `[start, end]`. The estimation is computed as follows:
2370 : //
2371 : // - For sstables fully contained in the range the whole file size is included.
2372 : // - For sstables partially contained in the range the overlapping data block sizes
2373 : // are included. Even if a data block partially overlaps, or we cannot determine
2374 : // overlap due to abbreviated index keys, the full data block size is included in
2375 : // the estimation. Note that unlike fully contained sstables, none of the
2376 : // meta-block space is counted for partially overlapped files.
2377 : // - For virtual sstables, we use the overlap between start, end and the virtual
2378 : // sstable bounds to determine disk usage.
2379 : // - There may also exist WAL entries for unflushed keys in this range. This
2380 : // estimation currently excludes space used for the range in the WAL.
2381 0 : func (d *DB) EstimateDiskUsage(start, end []byte) (uint64, error) {
2382 0 : bytes, _, _, err := d.EstimateDiskUsageByBackingType(start, end)
2383 0 : return bytes, err
2384 0 : }
2385 :
2386 : // EstimateDiskUsageByBackingType is like EstimateDiskUsage but additionally
2387 : // returns the subsets of that size in remote ane external files.
2388 : func (d *DB) EstimateDiskUsageByBackingType(
2389 : start, end []byte,
2390 0 : ) (totalSize, remoteSize, externalSize uint64, _ error) {
2391 0 : if err := d.closed.Load(); err != nil {
2392 0 : panic(err)
2393 : }
2394 0 : if d.opts.Comparer.Compare(start, end) > 0 {
2395 0 : return 0, 0, 0, errors.New("invalid key-range specified (start > end)")
2396 0 : }
2397 :
2398 : // Grab and reference the current readState. This prevents the underlying
2399 : // files in the associated version from being deleted if there is a concurrent
2400 : // compaction.
2401 0 : readState := d.loadReadState()
2402 0 : defer readState.unref()
2403 0 :
2404 0 : for level, files := range readState.current.Levels {
2405 0 : iter := files.Iter()
2406 0 : if level > 0 {
2407 0 : // We can only use `Overlaps` to restrict `files` at L1+ since at L0 it
2408 0 : // expands the range iteratively until it has found a set of files that
2409 0 : // do not overlap any other L0 files outside that set.
2410 0 : overlaps := readState.current.Overlaps(level, d.opts.Comparer.Compare, start, end, false /* exclusiveEnd */)
2411 0 : iter = overlaps.Iter()
2412 0 : }
2413 0 : for file := iter.First(); file != nil; file = iter.Next() {
2414 0 : if d.opts.Comparer.Compare(start, file.Smallest.UserKey) <= 0 &&
2415 0 : d.opts.Comparer.Compare(file.Largest.UserKey, end) <= 0 {
2416 0 : // The range fully contains the file, so skip looking it up in
2417 0 : // table cache/looking at its indexes, and add the full file size.
2418 0 : meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
2419 0 : if err != nil {
2420 0 : return 0, 0, 0, err
2421 0 : }
2422 0 : if meta.IsRemote() {
2423 0 : remoteSize += file.Size
2424 0 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
2425 0 : externalSize += file.Size
2426 0 : }
2427 : }
2428 0 : totalSize += file.Size
2429 0 : } else if d.opts.Comparer.Compare(file.Smallest.UserKey, end) <= 0 &&
2430 0 : d.opts.Comparer.Compare(start, file.Largest.UserKey) <= 0 {
2431 0 : var size uint64
2432 0 : var err error
2433 0 : if file.Virtual {
2434 0 : err = d.tableCache.withVirtualReader(
2435 0 : file.VirtualMeta(),
2436 0 : func(r sstable.VirtualReader) (err error) {
2437 0 : size, err = r.EstimateDiskUsage(start, end)
2438 0 : return err
2439 0 : },
2440 : )
2441 0 : } else {
2442 0 : err = d.tableCache.withReader(
2443 0 : file.PhysicalMeta(),
2444 0 : func(r *sstable.Reader) (err error) {
2445 0 : size, err = r.EstimateDiskUsage(start, end)
2446 0 : return err
2447 0 : },
2448 : )
2449 : }
2450 0 : if err != nil {
2451 0 : return 0, 0, 0, err
2452 0 : }
2453 0 : meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
2454 0 : if err != nil {
2455 0 : return 0, 0, 0, err
2456 0 : }
2457 0 : if meta.IsRemote() {
2458 0 : remoteSize += size
2459 0 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
2460 0 : externalSize += size
2461 0 : }
2462 : }
2463 0 : totalSize += size
2464 : }
2465 : }
2466 : }
2467 0 : return totalSize, remoteSize, externalSize, nil
2468 : }
2469 :
2470 1 : func (d *DB) walPreallocateSize() int {
2471 1 : // Set the WAL preallocate size to 110% of the memtable size. Note that there
2472 1 : // is a bit of apples and oranges in units here as the memtabls size
2473 1 : // corresponds to the memory usage of the memtable while the WAL size is the
2474 1 : // size of the batches (plus overhead) stored in the WAL.
2475 1 : //
2476 1 : // TODO(peter): 110% of the memtable size is quite hefty for a block
2477 1 : // size. This logic is taken from GetWalPreallocateBlockSize in
2478 1 : // RocksDB. Could a smaller preallocation block size be used?
2479 1 : size := d.opts.MemTableSize
2480 1 : size = (size / 10) + size
2481 1 : return int(size)
2482 1 : }
2483 :
2484 1 : func (d *DB) newMemTable(logNum base.DiskFileNum, logSeqNum uint64) (*memTable, *flushableEntry) {
2485 1 : size := d.mu.mem.nextSize
2486 1 : if d.mu.mem.nextSize < d.opts.MemTableSize {
2487 1 : d.mu.mem.nextSize *= 2
2488 1 : if d.mu.mem.nextSize > d.opts.MemTableSize {
2489 0 : d.mu.mem.nextSize = d.opts.MemTableSize
2490 0 : }
2491 : }
2492 :
2493 1 : memtblOpts := memTableOptions{
2494 1 : Options: d.opts,
2495 1 : logSeqNum: logSeqNum,
2496 1 : }
2497 1 :
2498 1 : // Before attempting to allocate a new memtable, check if there's one
2499 1 : // available for recycling in memTableRecycle. Large contiguous allocations
2500 1 : // can be costly as fragmentation makes it more difficult to find a large
2501 1 : // contiguous free space. We've observed 64MB allocations taking 10ms+.
2502 1 : //
2503 1 : // To reduce these costly allocations, up to 1 obsolete memtable is stashed
2504 1 : // in `d.memTableRecycle` to allow a future memtable rotation to reuse
2505 1 : // existing memory.
2506 1 : var mem *memTable
2507 1 : mem = d.memTableRecycle.Swap(nil)
2508 1 : if mem != nil && uint64(len(mem.arenaBuf)) != size {
2509 1 : d.freeMemTable(mem)
2510 1 : mem = nil
2511 1 : }
2512 1 : if mem != nil {
2513 1 : // Carry through the existing buffer and memory reservation.
2514 1 : memtblOpts.arenaBuf = mem.arenaBuf
2515 1 : memtblOpts.releaseAccountingReservation = mem.releaseAccountingReservation
2516 1 : } else {
2517 1 : mem = new(memTable)
2518 1 : memtblOpts.arenaBuf = manual.New(int(size))
2519 1 : memtblOpts.releaseAccountingReservation = d.opts.Cache.Reserve(int(size))
2520 1 : d.memTableCount.Add(1)
2521 1 : d.memTableReserved.Add(int64(size))
2522 1 :
2523 1 : // Note: this is a no-op if invariants are disabled or race is enabled.
2524 1 : invariants.SetFinalizer(mem, checkMemTable)
2525 1 : }
2526 1 : mem.init(memtblOpts)
2527 1 :
2528 1 : entry := d.newFlushableEntry(mem, logNum, logSeqNum)
2529 1 : entry.releaseMemAccounting = func() {
2530 1 : // If the user leaks iterators, we may be releasing the memtable after
2531 1 : // the DB is already closed. In this case, we want to just release the
2532 1 : // memory because DB.Close won't come along to free it for us.
2533 1 : if err := d.closed.Load(); err != nil {
2534 1 : d.freeMemTable(mem)
2535 1 : return
2536 1 : }
2537 :
2538 : // The next memtable allocation might be able to reuse this memtable.
2539 : // Stash it on d.memTableRecycle.
2540 1 : if unusedMem := d.memTableRecycle.Swap(mem); unusedMem != nil {
2541 1 : // There was already a memtable waiting to be recycled. We're now
2542 1 : // responsible for freeing it.
2543 1 : d.freeMemTable(unusedMem)
2544 1 : }
2545 : }
2546 1 : return mem, entry
2547 : }
2548 :
2549 1 : func (d *DB) freeMemTable(m *memTable) {
2550 1 : d.memTableCount.Add(-1)
2551 1 : d.memTableReserved.Add(-int64(len(m.arenaBuf)))
2552 1 : m.free()
2553 1 : }
2554 :
2555 : func (d *DB) newFlushableEntry(
2556 : f flushable, logNum base.DiskFileNum, logSeqNum uint64,
2557 1 : ) *flushableEntry {
2558 1 : fe := &flushableEntry{
2559 1 : flushable: f,
2560 1 : flushed: make(chan struct{}),
2561 1 : logNum: logNum,
2562 1 : logSeqNum: logSeqNum,
2563 1 : deleteFn: d.mu.versions.addObsolete,
2564 1 : deleteFnLocked: d.mu.versions.addObsoleteLocked,
2565 1 : }
2566 1 : fe.readerRefs.Store(1)
2567 1 : return fe
2568 1 : }
2569 :
2570 : // makeRoomForWrite ensures that the memtable has room to hold the contents of
2571 : // Batch. It reserves the space in the memtable and adds a reference to the
2572 : // memtable. The caller must later ensure that the memtable is unreferenced. If
2573 : // the memtable is full, or a nil Batch is provided, the current memtable is
2574 : // rotated (marked as immutable) and a new mutable memtable is allocated. This
2575 : // memtable rotation also causes a log rotation.
2576 : //
2577 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2578 : // may be released and reacquired.
2579 1 : func (d *DB) makeRoomForWrite(b *Batch) error {
2580 1 : if b != nil && b.ingestedSSTBatch {
2581 0 : panic("pebble: invalid function call")
2582 : }
2583 :
2584 1 : force := b == nil || b.flushable != nil
2585 1 : stalled := false
2586 1 : for {
2587 1 : if b != nil && b.flushable == nil {
2588 1 : err := d.mu.mem.mutable.prepare(b)
2589 1 : if err != arenaskl.ErrArenaFull {
2590 1 : if stalled {
2591 1 : d.opts.EventListener.WriteStallEnd()
2592 1 : }
2593 1 : return err
2594 : }
2595 1 : } else if !force {
2596 1 : if stalled {
2597 1 : d.opts.EventListener.WriteStallEnd()
2598 1 : }
2599 1 : return nil
2600 : }
2601 : // force || err == ErrArenaFull, so we need to rotate the current memtable.
2602 1 : {
2603 1 : var size uint64
2604 1 : for i := range d.mu.mem.queue {
2605 1 : size += d.mu.mem.queue[i].totalBytes()
2606 1 : }
2607 1 : if size >= uint64(d.opts.MemTableStopWritesThreshold)*d.opts.MemTableSize {
2608 1 : // We have filled up the current memtable, but already queued memtables
2609 1 : // are still flushing, so we wait.
2610 1 : if !stalled {
2611 1 : stalled = true
2612 1 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2613 1 : Reason: "memtable count limit reached",
2614 1 : })
2615 1 : }
2616 1 : now := time.Now()
2617 1 : d.mu.compact.cond.Wait()
2618 1 : if b != nil {
2619 1 : b.commitStats.MemTableWriteStallDuration += time.Since(now)
2620 1 : }
2621 1 : continue
2622 : }
2623 : }
2624 1 : l0ReadAmp := d.mu.versions.currentVersion().L0Sublevels.ReadAmplification()
2625 1 : if l0ReadAmp >= d.opts.L0StopWritesThreshold {
2626 1 : // There are too many level-0 files, so we wait.
2627 1 : if !stalled {
2628 1 : stalled = true
2629 1 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2630 1 : Reason: "L0 file count limit exceeded",
2631 1 : })
2632 1 : }
2633 1 : now := time.Now()
2634 1 : d.mu.compact.cond.Wait()
2635 1 : if b != nil {
2636 1 : b.commitStats.L0ReadAmpWriteStallDuration += time.Since(now)
2637 1 : }
2638 1 : continue
2639 : }
2640 :
2641 1 : var newLogNum base.DiskFileNum
2642 1 : var prevLogSize uint64
2643 1 : if !d.opts.DisableWAL {
2644 1 : now := time.Now()
2645 1 : newLogNum, prevLogSize = d.recycleWAL()
2646 1 : if b != nil {
2647 1 : b.commitStats.WALRotationDuration += time.Since(now)
2648 1 : }
2649 : }
2650 :
2651 1 : immMem := d.mu.mem.mutable
2652 1 : imm := d.mu.mem.queue[len(d.mu.mem.queue)-1]
2653 1 : imm.logSize = prevLogSize
2654 1 : imm.flushForced = imm.flushForced || (b == nil)
2655 1 :
2656 1 : // If we are manually flushing and we used less than half of the bytes in
2657 1 : // the memtable, don't increase the size for the next memtable. This
2658 1 : // reduces memtable memory pressure when an application is frequently
2659 1 : // manually flushing.
2660 1 : if (b == nil) && uint64(immMem.availBytes()) > immMem.totalBytes()/2 {
2661 1 : d.mu.mem.nextSize = immMem.totalBytes()
2662 1 : }
2663 :
2664 1 : if b != nil && b.flushable != nil {
2665 1 : // The batch is too large to fit in the memtable so add it directly to
2666 1 : // the immutable queue. The flushable batch is associated with the same
2667 1 : // log as the immutable memtable, but logically occurs after it in
2668 1 : // seqnum space. We ensure while flushing that the flushable batch
2669 1 : // is flushed along with the previous memtable in the flushable
2670 1 : // queue. See the top level comment in DB.flush1 to learn how this
2671 1 : // is ensured.
2672 1 : //
2673 1 : // See DB.commitWrite for the special handling of log writes for large
2674 1 : // batches. In particular, the large batch has already written to
2675 1 : // imm.logNum.
2676 1 : entry := d.newFlushableEntry(b.flushable, imm.logNum, b.SeqNum())
2677 1 : // The large batch is by definition large. Reserve space from the cache
2678 1 : // for it until it is flushed.
2679 1 : entry.releaseMemAccounting = d.opts.Cache.Reserve(int(b.flushable.totalBytes()))
2680 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2681 1 : }
2682 :
2683 1 : var logSeqNum uint64
2684 1 : if b != nil {
2685 1 : logSeqNum = b.SeqNum()
2686 1 : if b.flushable != nil {
2687 1 : logSeqNum += uint64(b.Count())
2688 1 : }
2689 1 : } else {
2690 1 : logSeqNum = d.mu.versions.logSeqNum.Load()
2691 1 : }
2692 1 : d.rotateMemtable(newLogNum, logSeqNum, immMem)
2693 1 : force = false
2694 : }
2695 : }
2696 :
2697 : // Both DB.mu and commitPipeline.mu must be held by the caller.
2698 1 : func (d *DB) rotateMemtable(newLogNum base.DiskFileNum, logSeqNum uint64, prev *memTable) {
2699 1 : // Create a new memtable, scheduling the previous one for flushing. We do
2700 1 : // this even if the previous memtable was empty because the DB.Flush
2701 1 : // mechanism is dependent on being able to wait for the empty memtable to
2702 1 : // flush. We can't just mark the empty memtable as flushed here because we
2703 1 : // also have to wait for all previous immutable tables to
2704 1 : // flush. Additionally, the memtable is tied to particular WAL file and we
2705 1 : // want to go through the flush path in order to recycle that WAL file.
2706 1 : //
2707 1 : // NB: newLogNum corresponds to the WAL that contains mutations that are
2708 1 : // present in the new memtable. When immutable memtables are flushed to
2709 1 : // disk, a VersionEdit will be created telling the manifest the minimum
2710 1 : // unflushed log number (which will be the next one in d.mu.mem.mutable
2711 1 : // that was not flushed).
2712 1 : //
2713 1 : // NB: prev should be the current mutable memtable.
2714 1 : var entry *flushableEntry
2715 1 : d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum)
2716 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2717 1 : d.updateReadStateLocked(nil)
2718 1 : if prev.writerUnref() {
2719 1 : d.maybeScheduleFlush()
2720 1 : }
2721 : }
2722 :
2723 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2724 : // may be released and reacquired.
2725 1 : func (d *DB) recycleWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
2726 1 : if d.opts.DisableWAL {
2727 0 : panic("pebble: invalid function call")
2728 : }
2729 :
2730 1 : jobID := d.mu.nextJobID
2731 1 : d.mu.nextJobID++
2732 1 : newLogNum = d.mu.versions.getNextDiskFileNum()
2733 1 :
2734 1 : prevLogSize = uint64(d.mu.log.Size())
2735 1 :
2736 1 : // The previous log may have grown past its original physical
2737 1 : // size. Update its file size in the queue so we have a proper
2738 1 : // accounting of its file size.
2739 1 : if d.mu.log.queue[len(d.mu.log.queue)-1].fileSize < prevLogSize {
2740 1 : d.mu.log.queue[len(d.mu.log.queue)-1].fileSize = prevLogSize
2741 1 : }
2742 1 : d.mu.Unlock()
2743 1 :
2744 1 : var err error
2745 1 : // Close the previous log first. This writes an EOF trailer
2746 1 : // signifying the end of the file and syncs it to disk. We must
2747 1 : // close the previous log before linking the new log file,
2748 1 : // otherwise a crash could leave both logs with unclean tails, and
2749 1 : // Open will treat the previous log as corrupt.
2750 1 : err = d.mu.log.LogWriter.Close()
2751 1 : metrics := d.mu.log.LogWriter.Metrics()
2752 1 : d.mu.Lock()
2753 1 : if err := d.mu.log.metrics.Merge(metrics); err != nil {
2754 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2755 0 : }
2756 1 : d.mu.Unlock()
2757 1 :
2758 1 : newLogName := base.MakeFilepath(d.opts.FS, d.walDirname, fileTypeLog, newLogNum)
2759 1 :
2760 1 : // Try to use a recycled log file. Recycling log files is an important
2761 1 : // performance optimization as it is faster to sync a file that has
2762 1 : // already been written, than one which is being written for the first
2763 1 : // time. This is due to the need to sync file metadata when a file is
2764 1 : // being written for the first time. Note this is true even if file
2765 1 : // preallocation is performed (e.g. fallocate).
2766 1 : var recycleLog fileInfo
2767 1 : var recycleOK bool
2768 1 : var newLogFile vfs.File
2769 1 : if err == nil {
2770 1 : recycleLog, recycleOK = d.logRecycler.peek()
2771 1 : if recycleOK {
2772 0 : recycleLogName := base.MakeFilepath(d.opts.FS, d.walDirname, fileTypeLog, recycleLog.fileNum)
2773 0 : newLogFile, err = d.opts.FS.ReuseForWrite(recycleLogName, newLogName)
2774 0 : base.MustExist(d.opts.FS, newLogName, d.opts.Logger, err)
2775 1 : } else {
2776 1 : newLogFile, err = d.opts.FS.Create(newLogName)
2777 1 : base.MustExist(d.opts.FS, newLogName, d.opts.Logger, err)
2778 1 : }
2779 : }
2780 :
2781 1 : var newLogSize uint64
2782 1 : if err == nil && recycleOK {
2783 0 : // Figure out the recycled WAL size. This Stat is necessary
2784 0 : // because ReuseForWrite's contract allows for removing the
2785 0 : // old file and creating a new one. We don't know whether the
2786 0 : // WAL was actually recycled.
2787 0 : // TODO(jackson): Adding a boolean to the ReuseForWrite return
2788 0 : // value indicating whether or not the file was actually
2789 0 : // reused would allow us to skip the stat and use
2790 0 : // recycleLog.fileSize.
2791 0 : var finfo os.FileInfo
2792 0 : finfo, err = newLogFile.Stat()
2793 0 : if err == nil {
2794 0 : newLogSize = uint64(finfo.Size())
2795 0 : }
2796 : }
2797 :
2798 1 : if err == nil {
2799 1 : // TODO(peter): RocksDB delays sync of the parent directory until the
2800 1 : // first time the log is synced. Is that worthwhile?
2801 1 : err = d.walDir.Sync()
2802 1 : }
2803 :
2804 1 : if err != nil && newLogFile != nil {
2805 0 : newLogFile.Close()
2806 1 : } else if err == nil {
2807 1 : newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{
2808 1 : NoSyncOnClose: d.opts.NoSyncOnClose,
2809 1 : BytesPerSync: d.opts.WALBytesPerSync,
2810 1 : PreallocateSize: d.walPreallocateSize(),
2811 1 : })
2812 1 : }
2813 :
2814 1 : if recycleOK {
2815 0 : err = firstError(err, d.logRecycler.pop(recycleLog.fileNum.FileNum()))
2816 0 : }
2817 :
2818 1 : d.opts.EventListener.WALCreated(WALCreateInfo{
2819 1 : JobID: jobID,
2820 1 : Path: newLogName,
2821 1 : FileNum: newLogNum,
2822 1 : RecycledFileNum: recycleLog.fileNum.FileNum(),
2823 1 : Err: err,
2824 1 : })
2825 1 :
2826 1 : d.mu.Lock()
2827 1 :
2828 1 : d.mu.versions.metrics.WAL.Files++
2829 1 :
2830 1 : if err != nil {
2831 0 : // TODO(peter): avoid chewing through file numbers in a tight loop if there
2832 0 : // is an error here.
2833 0 : //
2834 0 : // What to do here? Stumbling on doesn't seem worthwhile. If we failed to
2835 0 : // close the previous log it is possible we lost a write.
2836 0 : panic(err)
2837 : }
2838 :
2839 1 : d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: newLogSize})
2840 1 : d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{
2841 1 : WALFsyncLatency: d.mu.log.metrics.fsyncLatency,
2842 1 : WALMinSyncInterval: d.opts.WALMinSyncInterval,
2843 1 : QueueSemChan: d.commit.logSyncQSem,
2844 1 : })
2845 1 : if d.mu.log.registerLogWriterForTesting != nil {
2846 0 : d.mu.log.registerLogWriterForTesting(d.mu.log.LogWriter)
2847 0 : }
2848 :
2849 1 : return
2850 : }
2851 :
2852 1 : func (d *DB) getEarliestUnflushedSeqNumLocked() uint64 {
2853 1 : seqNum := InternalKeySeqNumMax
2854 1 : for i := range d.mu.mem.queue {
2855 1 : logSeqNum := d.mu.mem.queue[i].logSeqNum
2856 1 : if seqNum > logSeqNum {
2857 1 : seqNum = logSeqNum
2858 1 : }
2859 : }
2860 1 : return seqNum
2861 : }
2862 :
2863 1 : func (d *DB) getInProgressCompactionInfoLocked(finishing *compaction) (rv []compactionInfo) {
2864 1 : for c := range d.mu.compact.inProgress {
2865 1 : if len(c.flushing) == 0 && (finishing == nil || c != finishing) {
2866 1 : info := compactionInfo{
2867 1 : versionEditApplied: c.versionEditApplied,
2868 1 : inputs: c.inputs,
2869 1 : smallest: c.smallest,
2870 1 : largest: c.largest,
2871 1 : outputLevel: -1,
2872 1 : }
2873 1 : if c.outputLevel != nil {
2874 1 : info.outputLevel = c.outputLevel.level
2875 1 : }
2876 1 : rv = append(rv, info)
2877 : }
2878 : }
2879 1 : return
2880 : }
2881 :
2882 1 : func inProgressL0Compactions(inProgress []compactionInfo) []manifest.L0Compaction {
2883 1 : var compactions []manifest.L0Compaction
2884 1 : for _, info := range inProgress {
2885 1 : // Skip in-progress compactions that have already committed; the L0
2886 1 : // sublevels initialization code requires the set of in-progress
2887 1 : // compactions to be consistent with the current version. Compactions
2888 1 : // with versionEditApplied=true are already applied to the current
2889 1 : // version and but are performing cleanup without the database mutex.
2890 1 : if info.versionEditApplied {
2891 1 : continue
2892 : }
2893 1 : l0 := false
2894 1 : for _, cl := range info.inputs {
2895 1 : l0 = l0 || cl.level == 0
2896 1 : }
2897 1 : if !l0 {
2898 1 : continue
2899 : }
2900 1 : compactions = append(compactions, manifest.L0Compaction{
2901 1 : Smallest: info.smallest,
2902 1 : Largest: info.largest,
2903 1 : IsIntraL0: info.outputLevel == 0,
2904 1 : })
2905 : }
2906 1 : return compactions
2907 : }
2908 :
2909 : // firstError returns the first non-nil error of err0 and err1, or nil if both
2910 : // are nil.
2911 1 : func firstError(err0, err1 error) error {
2912 1 : if err0 != nil {
2913 1 : return err0
2914 1 : }
2915 1 : return err1
2916 : }
2917 :
2918 : // SetCreatorID sets the CreatorID which is needed in order to use shared objects.
2919 : // Remote object usage is disabled until this method is called the first time.
2920 : // Once set, the Creator ID is persisted and cannot change.
2921 : //
2922 : // Does nothing if SharedStorage was not set in the options when the DB was
2923 : // opened or if the DB is in read-only mode.
2924 1 : func (d *DB) SetCreatorID(creatorID uint64) error {
2925 1 : if d.opts.Experimental.RemoteStorage == nil || d.opts.ReadOnly {
2926 0 : return nil
2927 0 : }
2928 1 : return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
2929 : }
2930 :
2931 : // KeyStatistics keeps track of the number of keys that have been pinned by a
2932 : // snapshot as well as counts of the different key kinds in the lsm.
2933 : //
2934 : // One way of using the accumulated stats, when we only have sets and dels,
2935 : // and say the counts are represented as del_count, set_count,
2936 : // del_latest_count, set_latest_count, snapshot_pinned_count.
2937 : //
2938 : // - del_latest_count + set_latest_count is the set of unique user keys
2939 : // (unique).
2940 : //
2941 : // - set_latest_count is the set of live unique user keys (live_unique).
2942 : //
2943 : // - Garbage is del_count + set_count - live_unique.
2944 : //
2945 : // - If everything were in the LSM, del_count+set_count-snapshot_pinned_count
2946 : // would also be the set of unique user keys (note that
2947 : // snapshot_pinned_count is counting something different -- see comment below).
2948 : // But snapshot_pinned_count only counts keys in the LSM so the excess here
2949 : // must be keys in memtables.
2950 : type KeyStatistics struct {
2951 : // TODO(sumeer): the SnapshotPinned* are incorrect in that these older
2952 : // versions can be in a different level. Either fix the accounting or
2953 : // rename these fields.
2954 :
2955 : // SnapshotPinnedKeys represents obsolete keys that cannot be elided during
2956 : // a compaction, because they are required by an open snapshot.
2957 : SnapshotPinnedKeys int
2958 : // SnapshotPinnedKeysBytes is the total number of bytes of all snapshot
2959 : // pinned keys.
2960 : SnapshotPinnedKeysBytes uint64
2961 : // KindsCount is the count for each kind of key. It includes point keys,
2962 : // range deletes and range keys.
2963 : KindsCount [InternalKeyKindMax + 1]int
2964 : // LatestKindsCount is the count for each kind of key when it is the latest
2965 : // kind for a user key. It is only populated for point keys.
2966 : LatestKindsCount [InternalKeyKindMax + 1]int
2967 : }
2968 :
2969 : // LSMKeyStatistics is used by DB.ScanStatistics.
2970 : type LSMKeyStatistics struct {
2971 : Accumulated KeyStatistics
2972 : // Levels contains statistics only for point keys. Range deletions and range keys will
2973 : // appear in Accumulated but not Levels.
2974 : Levels [numLevels]KeyStatistics
2975 : // BytesRead represents the logical, pre-compression size of keys and values read
2976 : BytesRead uint64
2977 : }
2978 :
2979 : // ScanStatisticsOptions is used by DB.ScanStatistics.
2980 : type ScanStatisticsOptions struct {
2981 : // LimitBytesPerSecond indicates the number of bytes that are able to be read
2982 : // per second using ScanInternal.
2983 : // A value of 0 indicates that there is no limit set.
2984 : LimitBytesPerSecond int64
2985 : }
2986 :
2987 : // ScanStatistics returns the count of different key kinds within the lsm for a
2988 : // key span [lower, upper) as well as the number of snapshot keys.
2989 : func (d *DB) ScanStatistics(
2990 : ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions,
2991 0 : ) (LSMKeyStatistics, error) {
2992 0 : stats := LSMKeyStatistics{}
2993 0 : var prevKey InternalKey
2994 0 : var rateLimitFunc func(key *InternalKey, val LazyValue) error
2995 0 : tb := tokenbucket.TokenBucket{}
2996 0 :
2997 0 : if opts.LimitBytesPerSecond != 0 {
2998 0 : // Each "token" roughly corresponds to a byte that was read.
2999 0 : tb.Init(tokenbucket.TokensPerSecond(opts.LimitBytesPerSecond), tokenbucket.Tokens(1024))
3000 0 : rateLimitFunc = func(key *InternalKey, val LazyValue) error {
3001 0 : return tb.WaitCtx(ctx, tokenbucket.Tokens(key.Size()+val.Len()))
3002 0 : }
3003 : }
3004 :
3005 0 : scanInternalOpts := &scanInternalOptions{
3006 0 : visitPointKey: func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
3007 0 : // If the previous key is equal to the current point key, the current key was
3008 0 : // pinned by a snapshot.
3009 0 : size := uint64(key.Size())
3010 0 : kind := key.Kind()
3011 0 : sameKey := d.equal(prevKey.UserKey, key.UserKey)
3012 0 : if iterInfo.Kind == IteratorLevelLSM && sameKey {
3013 0 : stats.Levels[iterInfo.Level].SnapshotPinnedKeys++
3014 0 : stats.Levels[iterInfo.Level].SnapshotPinnedKeysBytes += size
3015 0 : stats.Accumulated.SnapshotPinnedKeys++
3016 0 : stats.Accumulated.SnapshotPinnedKeysBytes += size
3017 0 : }
3018 0 : if iterInfo.Kind == IteratorLevelLSM {
3019 0 : stats.Levels[iterInfo.Level].KindsCount[kind]++
3020 0 : }
3021 0 : if !sameKey {
3022 0 : if iterInfo.Kind == IteratorLevelLSM {
3023 0 : stats.Levels[iterInfo.Level].LatestKindsCount[kind]++
3024 0 : }
3025 0 : stats.Accumulated.LatestKindsCount[kind]++
3026 : }
3027 :
3028 0 : stats.Accumulated.KindsCount[kind]++
3029 0 : prevKey.CopyFrom(*key)
3030 0 : stats.BytesRead += uint64(key.Size() + value.Len())
3031 0 : return nil
3032 : },
3033 0 : visitRangeDel: func(start, end []byte, seqNum uint64) error {
3034 0 : stats.Accumulated.KindsCount[InternalKeyKindRangeDelete]++
3035 0 : stats.BytesRead += uint64(len(start) + len(end))
3036 0 : return nil
3037 0 : },
3038 0 : visitRangeKey: func(start, end []byte, keys []rangekey.Key) error {
3039 0 : stats.BytesRead += uint64(len(start) + len(end))
3040 0 : for _, key := range keys {
3041 0 : stats.Accumulated.KindsCount[key.Kind()]++
3042 0 : stats.BytesRead += uint64(len(key.Value) + len(key.Suffix))
3043 0 : }
3044 0 : return nil
3045 : },
3046 : includeObsoleteKeys: true,
3047 : IterOptions: IterOptions{
3048 : KeyTypes: IterKeyTypePointsAndRanges,
3049 : LowerBound: lower,
3050 : UpperBound: upper,
3051 : },
3052 : rateLimitFunc: rateLimitFunc,
3053 : }
3054 0 : iter, err := d.newInternalIter(ctx, snapshotIterOpts{}, scanInternalOpts)
3055 0 : if err != nil {
3056 0 : return LSMKeyStatistics{}, err
3057 0 : }
3058 0 : defer iter.close()
3059 0 :
3060 0 : err = scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
3061 0 :
3062 0 : if err != nil {
3063 0 : return LSMKeyStatistics{}, err
3064 0 : }
3065 :
3066 0 : return stats, nil
3067 : }
3068 :
3069 : // ObjProvider returns the objstorage.Provider for this database. Meant to be
3070 : // used for internal purposes only.
3071 1 : func (d *DB) ObjProvider() objstorage.Provider {
3072 1 : return d.objProvider
3073 1 : }
3074 :
3075 0 : func (d *DB) checkVirtualBounds(m *fileMetadata) {
3076 0 : if !invariants.Enabled {
3077 0 : return
3078 0 : }
3079 :
3080 0 : objMeta, err := d.objProvider.Lookup(fileTypeTable, m.FileBacking.DiskFileNum)
3081 0 : if err != nil {
3082 0 : panic(err)
3083 : }
3084 0 : if objMeta.IsExternal() {
3085 0 : // Nothing to do; bounds are expected to be loose.
3086 0 : return
3087 0 : }
3088 :
3089 0 : if m.HasPointKeys {
3090 0 : pointIter, rangeDelIter, err := d.newIters(context.TODO(), m, nil, internalIterOpts{})
3091 0 : if err != nil {
3092 0 : panic(errors.Wrap(err, "pebble: error creating point iterator"))
3093 : }
3094 :
3095 0 : defer pointIter.Close()
3096 0 : if rangeDelIter != nil {
3097 0 : defer rangeDelIter.Close()
3098 0 : }
3099 :
3100 0 : pointKey, _ := pointIter.First()
3101 0 : var rangeDel *keyspan.Span
3102 0 : if rangeDelIter != nil {
3103 0 : rangeDel = rangeDelIter.First()
3104 0 : }
3105 :
3106 : // Check that the lower bound is tight.
3107 0 : if (rangeDel == nil || d.cmp(rangeDel.SmallestKey().UserKey, m.SmallestPointKey.UserKey) != 0) &&
3108 0 : (pointKey == nil || d.cmp(pointKey.UserKey, m.SmallestPointKey.UserKey) != 0) {
3109 0 : panic(errors.Newf("pebble: virtual sstable %s lower point key bound is not tight", m.FileNum))
3110 : }
3111 :
3112 0 : pointKey, _ = pointIter.Last()
3113 0 : rangeDel = nil
3114 0 : if rangeDelIter != nil {
3115 0 : rangeDel = rangeDelIter.Last()
3116 0 : }
3117 :
3118 : // Check that the upper bound is tight.
3119 0 : if (rangeDel == nil || d.cmp(rangeDel.LargestKey().UserKey, m.LargestPointKey.UserKey) != 0) &&
3120 0 : (pointKey == nil || d.cmp(pointKey.UserKey, m.LargestPointKey.UserKey) != 0) {
3121 0 : panic(errors.Newf("pebble: virtual sstable %s upper point key bound is not tight", m.FileNum))
3122 : }
3123 :
3124 : // Check that iterator keys are within bounds.
3125 0 : for key, _ := pointIter.First(); key != nil; key, _ = pointIter.Next() {
3126 0 : if d.cmp(key.UserKey, m.SmallestPointKey.UserKey) < 0 || d.cmp(key.UserKey, m.LargestPointKey.UserKey) > 0 {
3127 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.UserKey))
3128 : }
3129 : }
3130 :
3131 0 : if rangeDelIter != nil {
3132 0 : for key := rangeDelIter.First(); key != nil; key = rangeDelIter.Next() {
3133 0 : if d.cmp(key.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
3134 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey))
3135 : }
3136 :
3137 0 : if d.cmp(key.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
3138 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey))
3139 : }
3140 : }
3141 : }
3142 : }
3143 :
3144 0 : if !m.HasRangeKeys {
3145 0 : return
3146 0 : }
3147 :
3148 0 : rangeKeyIter, err := d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
3149 0 : defer rangeKeyIter.Close()
3150 0 :
3151 0 : if err != nil {
3152 0 : panic(errors.Wrap(err, "pebble: error creating range key iterator"))
3153 : }
3154 :
3155 : // Check that the lower bound is tight.
3156 0 : if d.cmp(rangeKeyIter.First().SmallestKey().UserKey, m.SmallestRangeKey.UserKey) != 0 {
3157 0 : panic(errors.Newf("pebble: virtual sstable %s lower range key bound is not tight", m.FileNum))
3158 : }
3159 :
3160 : // Check that upper bound is tight.
3161 0 : if d.cmp(rangeKeyIter.Last().LargestKey().UserKey, m.LargestRangeKey.UserKey) != 0 {
3162 0 : panic(errors.Newf("pebble: virtual sstable %s upper range key bound is not tight", m.FileNum))
3163 : }
3164 :
3165 0 : for key := rangeKeyIter.First(); key != nil; key = rangeKeyIter.Next() {
3166 0 : if d.cmp(key.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) < 0 {
3167 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey))
3168 : }
3169 0 : if d.cmp(key.LargestKey().UserKey, m.LargestRangeKey.UserKey) > 0 {
3170 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey))
3171 : }
3172 : }
3173 : }
|