Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package pebble provides an ordered key/value store.
6 : package pebble // import "github.com/cockroachdb/pebble"
7 :
8 : import (
9 : "context"
10 : "fmt"
11 : "io"
12 : "os"
13 : "strconv"
14 : "sync"
15 : "sync/atomic"
16 : "time"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/arenaskl"
20 : "github.com/cockroachdb/pebble/internal/base"
21 : "github.com/cockroachdb/pebble/internal/invalidating"
22 : "github.com/cockroachdb/pebble/internal/invariants"
23 : "github.com/cockroachdb/pebble/internal/keyspan"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/cockroachdb/pebble/internal/manual"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/remote"
28 : "github.com/cockroachdb/pebble/rangekey"
29 : "github.com/cockroachdb/pebble/record"
30 : "github.com/cockroachdb/pebble/sstable"
31 : "github.com/cockroachdb/pebble/vfs"
32 : "github.com/cockroachdb/pebble/vfs/atomicfs"
33 : "github.com/cockroachdb/tokenbucket"
34 : "github.com/prometheus/client_golang/prometheus"
35 : )
36 :
37 : const (
38 : // minTableCacheSize is the minimum size of the table cache, for a single db.
39 : minTableCacheSize = 64
40 :
41 : // numNonTableCacheFiles is an approximation for the number of files
42 : // that we don't use for table caches, for a given db.
43 : numNonTableCacheFiles = 10
44 : )
45 :
46 : var (
47 : // ErrNotFound is returned when a get operation does not find the requested
48 : // key.
49 : ErrNotFound = base.ErrNotFound
50 : // ErrClosed is panicked when an operation is performed on a closed snapshot or
51 : // DB. Use errors.Is(err, ErrClosed) to check for this error.
52 : ErrClosed = errors.New("pebble: closed")
53 : // ErrReadOnly is returned when a write operation is performed on a read-only
54 : // database.
55 : ErrReadOnly = errors.New("pebble: read-only")
56 : // errNoSplit indicates that the user is trying to perform a range key
57 : // operation but the configured Comparer does not provide a Split
58 : // implementation.
59 : errNoSplit = errors.New("pebble: Comparer.Split required for range key operations")
60 : )
61 :
62 : // Reader is a readable key/value store.
63 : //
64 : // It is safe to call Get and NewIter from concurrent goroutines.
65 : type Reader interface {
66 : // Get gets the value for the given key. It returns ErrNotFound if the DB
67 : // does not contain the key.
68 : //
69 : // The caller should not modify the contents of the returned slice, but it is
70 : // safe to modify the contents of the argument after Get returns. The
71 : // returned slice will remain valid until the returned Closer is closed. On
72 : // success, the caller MUST call closer.Close() or a memory leak will occur.
73 : Get(key []byte) (value []byte, closer io.Closer, err error)
74 :
75 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
76 : // return false). The iterator can be positioned via a call to SeekGE,
77 : // SeekLT, First or Last.
78 : NewIter(o *IterOptions) (*Iterator, error)
79 :
80 : // Close closes the Reader. It may or may not close any underlying io.Reader
81 : // or io.Writer, depending on how the DB was created.
82 : //
83 : // It is not safe to close a DB until all outstanding iterators are closed.
84 : // It is valid to call Close multiple times. Other methods should not be
85 : // called after the DB has been closed.
86 : Close() error
87 : }
88 :
89 : // Writer is a writable key/value store.
90 : //
91 : // Goroutine safety is dependent on the specific implementation.
92 : type Writer interface {
93 : // Apply the operations contained in the batch to the DB.
94 : //
95 : // It is safe to modify the contents of the arguments after Apply returns.
96 : Apply(batch *Batch, o *WriteOptions) error
97 :
98 : // Delete deletes the value for the given key. Deletes are blind all will
99 : // succeed even if the given key does not exist.
100 : //
101 : // It is safe to modify the contents of the arguments after Delete returns.
102 : Delete(key []byte, o *WriteOptions) error
103 :
104 : // DeleteSized behaves identically to Delete, but takes an additional
105 : // argument indicating the size of the value being deleted. DeleteSized
106 : // should be preferred when the caller has the expectation that there exists
107 : // a single internal KV pair for the key (eg, the key has not been
108 : // overwritten recently), and the caller knows the size of its value.
109 : //
110 : // DeleteSized will record the value size within the tombstone and use it to
111 : // inform compaction-picking heuristics which strive to reduce space
112 : // amplification in the LSM. This "calling your shot" mechanic allows the
113 : // storage engine to more accurately estimate and reduce space
114 : // amplification.
115 : //
116 : // It is safe to modify the contents of the arguments after DeleteSized
117 : // returns.
118 : DeleteSized(key []byte, valueSize uint32, _ *WriteOptions) error
119 :
120 : // SingleDelete is similar to Delete in that it deletes the value for the given key. Like Delete,
121 : // it is a blind operation that will succeed even if the given key does not exist.
122 : //
123 : // WARNING: Undefined (non-deterministic) behavior will result if a key is overwritten and
124 : // then deleted using SingleDelete. The record may appear deleted immediately, but be
125 : // resurrected at a later time after compactions have been performed. Or the record may
126 : // be deleted permanently. A Delete operation lays down a "tombstone" which shadows all
127 : // previous versions of a key. The SingleDelete operation is akin to "anti-matter" and will
128 : // only delete the most recently written version for a key. These different semantics allow
129 : // the DB to avoid propagating a SingleDelete operation during a compaction as soon as the
130 : // corresponding Set operation is encountered. These semantics require extreme care to handle
131 : // properly. Only use if you have a workload where the performance gain is critical and you
132 : // can guarantee that a record is written once and then deleted once.
133 : //
134 : // SingleDelete is internally transformed into a Delete if the most recent record for a key is either
135 : // a Merge or Delete record.
136 : //
137 : // It is safe to modify the contents of the arguments after SingleDelete returns.
138 : SingleDelete(key []byte, o *WriteOptions) error
139 :
140 : // DeleteRange deletes all of the point keys (and values) in the range
141 : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
142 : // delete overlapping range keys (eg, keys set via RangeKeySet).
143 : //
144 : // It is safe to modify the contents of the arguments after DeleteRange
145 : // returns.
146 : DeleteRange(start, end []byte, o *WriteOptions) error
147 :
148 : // LogData adds the specified to the batch. The data will be written to the
149 : // WAL, but not added to memtables or sstables. Log data is never indexed,
150 : // which makes it useful for testing WAL performance.
151 : //
152 : // It is safe to modify the contents of the argument after LogData returns.
153 : LogData(data []byte, opts *WriteOptions) error
154 :
155 : // Merge merges the value for the given key. The details of the merge are
156 : // dependent upon the configured merge operation.
157 : //
158 : // It is safe to modify the contents of the arguments after Merge returns.
159 : Merge(key, value []byte, o *WriteOptions) error
160 :
161 : // Set sets the value for the given key. It overwrites any previous value
162 : // for that key; a DB is not a multi-map.
163 : //
164 : // It is safe to modify the contents of the arguments after Set returns.
165 : Set(key, value []byte, o *WriteOptions) error
166 :
167 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
168 : // timestamp suffix to value. The suffix is optional. If any portion of the key
169 : // range [start, end) is already set by a range key with the same suffix value,
170 : // RangeKeySet overrides it.
171 : //
172 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
173 : RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error
174 :
175 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
176 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
177 : // range key. RangeKeyUnset only removes portions of range keys that fall within
178 : // the [start, end) key span, and only range keys with suffixes that exactly
179 : // match the unset suffix.
180 : //
181 : // It is safe to modify the contents of the arguments after RangeKeyUnset
182 : // returns.
183 : RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error
184 :
185 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
186 : // (inclusive on start, exclusive on end). It does not delete point keys (for
187 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
188 : // bounds, including those with or without suffixes.
189 : //
190 : // It is safe to modify the contents of the arguments after RangeKeyDelete
191 : // returns.
192 : RangeKeyDelete(start, end []byte, opts *WriteOptions) error
193 : }
194 :
195 : // CPUWorkHandle represents a handle used by the CPUWorkPermissionGranter API.
196 : type CPUWorkHandle interface {
197 : // Permitted indicates whether Pebble can use additional CPU resources.
198 : Permitted() bool
199 : }
200 :
201 : // CPUWorkPermissionGranter is used to request permission to opportunistically
202 : // use additional CPUs to speed up internal background work.
203 : type CPUWorkPermissionGranter interface {
204 : // GetPermission returns a handle regardless of whether permission is granted
205 : // or not. In the latter case, the handle is only useful for recording
206 : // the CPU time actually spent on this calling goroutine.
207 : GetPermission(time.Duration) CPUWorkHandle
208 : // CPUWorkDone must be called regardless of whether CPUWorkHandle.Permitted
209 : // returns true or false.
210 : CPUWorkDone(CPUWorkHandle)
211 : }
212 :
213 : // Use a default implementation for the CPU work granter to avoid excessive nil
214 : // checks in the code.
215 : type defaultCPUWorkHandle struct{}
216 :
217 1 : func (d defaultCPUWorkHandle) Permitted() bool {
218 1 : return false
219 1 : }
220 :
221 : type defaultCPUWorkGranter struct{}
222 :
223 2 : func (d defaultCPUWorkGranter) GetPermission(_ time.Duration) CPUWorkHandle {
224 2 : return defaultCPUWorkHandle{}
225 2 : }
226 :
227 2 : func (d defaultCPUWorkGranter) CPUWorkDone(_ CPUWorkHandle) {}
228 :
229 : // DB provides a concurrent, persistent ordered key/value store.
230 : //
231 : // A DB's basic operations (Get, Set, Delete) should be self-explanatory. Get
232 : // and Delete will return ErrNotFound if the requested key is not in the store.
233 : // Callers are free to ignore this error.
234 : //
235 : // A DB also allows for iterating over the key/value pairs in key order. If d
236 : // is a DB, the code below prints all key/value pairs whose keys are 'greater
237 : // than or equal to' k:
238 : //
239 : // iter := d.NewIter(readOptions)
240 : // for iter.SeekGE(k); iter.Valid(); iter.Next() {
241 : // fmt.Printf("key=%q value=%q\n", iter.Key(), iter.Value())
242 : // }
243 : // return iter.Close()
244 : //
245 : // The Options struct holds the optional parameters for the DB, including a
246 : // Comparer to define a 'less than' relationship over keys. It is always valid
247 : // to pass a nil *Options, which means to use the default parameter values. Any
248 : // zero field of a non-nil *Options also means to use the default value for
249 : // that parameter. Thus, the code below uses a custom Comparer, but the default
250 : // values for every other parameter:
251 : //
252 : // db := pebble.Open(&Options{
253 : // Comparer: myComparer,
254 : // })
255 : type DB struct {
256 : // The count and size of referenced memtables. This includes memtables
257 : // present in DB.mu.mem.queue, as well as memtables that have been flushed
258 : // but are still referenced by an inuse readState, as well as up to one
259 : // memTable waiting to be reused and stored in d.memTableRecycle.
260 : memTableCount atomic.Int64
261 : memTableReserved atomic.Int64 // number of bytes reserved in the cache for memtables
262 : // memTableRecycle holds a pointer to an obsolete memtable. The next
263 : // memtable allocation will reuse this memtable if it has not already been
264 : // recycled.
265 : memTableRecycle atomic.Pointer[memTable]
266 :
267 : // The size of the current log file (i.e. db.mu.log.queue[len(queue)-1].
268 : logSize atomic.Uint64
269 :
270 : // The number of bytes available on disk.
271 : diskAvailBytes atomic.Uint64
272 :
273 : cacheID uint64
274 : dirname string
275 : walDirname string
276 : opts *Options
277 : cmp Compare
278 : equal Equal
279 : merge Merge
280 : split Split
281 : abbreviatedKey AbbreviatedKey
282 : // The threshold for determining when a batch is "large" and will skip being
283 : // inserted into a memtable.
284 : largeBatchThreshold uint64
285 : // The current OPTIONS file number.
286 : optionsFileNum base.DiskFileNum
287 : // The on-disk size of the current OPTIONS file.
288 : optionsFileSize uint64
289 :
290 : // objProvider is used to access and manage SSTs.
291 : objProvider objstorage.Provider
292 :
293 : fileLock *Lock
294 : dataDir vfs.File
295 : walDir vfs.File
296 :
297 : tableCache *tableCacheContainer
298 : newIters tableNewIters
299 : tableNewRangeKeyIter keyspan.TableNewSpanIter
300 :
301 : commit *commitPipeline
302 :
303 : // readState provides access to the state needed for reading without needing
304 : // to acquire DB.mu.
305 : readState struct {
306 : sync.RWMutex
307 : val *readState
308 : }
309 : // logRecycler holds a set of log file numbers that are available for
310 : // reuse. Writing to a recycled log file is faster than to a new log file on
311 : // some common filesystems (xfs, and ext3/4) due to avoiding metadata
312 : // updates.
313 : logRecycler logRecycler
314 :
315 : closed *atomic.Value
316 : closedCh chan struct{}
317 :
318 : cleanupManager *cleanupManager
319 : // testingAlwaysWaitForCleanup is set by some tests to force waiting for
320 : // obsolete file deletion (to make events deterministic).
321 : testingAlwaysWaitForCleanup bool
322 :
323 : // During an iterator close, we may asynchronously schedule read compactions.
324 : // We want to wait for those goroutines to finish, before closing the DB.
325 : // compactionShedulers.Wait() should not be called while the DB.mu is held.
326 : compactionSchedulers sync.WaitGroup
327 :
328 : // The main mutex protecting internal DB state. This mutex encompasses many
329 : // fields because those fields need to be accessed and updated atomically. In
330 : // particular, the current version, log.*, mem.*, and snapshot list need to
331 : // be accessed and updated atomically during compaction.
332 : //
333 : // Care is taken to avoid holding DB.mu during IO operations. Accomplishing
334 : // this sometimes requires releasing DB.mu in a method that was called with
335 : // it held. See versionSet.logAndApply() and DB.makeRoomForWrite() for
336 : // examples. This is a common pattern, so be careful about expectations that
337 : // DB.mu will be held continuously across a set of calls.
338 : mu struct {
339 : sync.Mutex
340 :
341 : formatVers struct {
342 : // vers is the database's current format major version.
343 : // Backwards-incompatible features are gated behind new
344 : // format major versions and not enabled until a database's
345 : // version is ratcheted upwards.
346 : //
347 : // Although this is under the `mu` prefix, readers may read vers
348 : // atomically without holding d.mu. Writers must only write to this
349 : // value through finalizeFormatVersUpgrade which requires d.mu is
350 : // held.
351 : vers atomic.Uint64
352 : // marker is the atomic marker for the format major version.
353 : // When a database's version is ratcheted upwards, the
354 : // marker is moved in order to atomically record the new
355 : // version.
356 : marker *atomicfs.Marker
357 : // ratcheting when set to true indicates that the database is
358 : // currently in the process of ratcheting the format major version
359 : // to vers + 1. As a part of ratcheting the format major version,
360 : // migrations may drop and re-acquire the mutex.
361 : ratcheting bool
362 : }
363 :
364 : // The ID of the next job. Job IDs are passed to event listener
365 : // notifications and act as a mechanism for tying together the events and
366 : // log messages for a single job such as a flush, compaction, or file
367 : // ingestion. Job IDs are not serialized to disk or used for correctness.
368 : nextJobID int
369 :
370 : // The collection of immutable versions and state about the log and visible
371 : // sequence numbers. Use the pointer here to ensure the atomic fields in
372 : // version set are aligned properly.
373 : versions *versionSet
374 :
375 : log struct {
376 : // The queue of logs, containing both flushed and unflushed logs. The
377 : // flushed logs will be a prefix, the unflushed logs a suffix. The
378 : // delimeter between flushed and unflushed logs is
379 : // versionSet.minUnflushedLogNum.
380 : queue []fileInfo
381 : // The number of input bytes to the log. This is the raw size of the
382 : // batches written to the WAL, without the overhead of the record
383 : // envelopes.
384 : bytesIn uint64
385 : // The LogWriter is protected by commitPipeline.mu. This allows log
386 : // writes to be performed without holding DB.mu, but requires both
387 : // commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
388 : // (i.e. makeRoomForWrite).
389 : *record.LogWriter
390 : // Can be nil.
391 : metrics struct {
392 : fsyncLatency prometheus.Histogram
393 : record.LogWriterMetrics
394 : }
395 : registerLogWriterForTesting func(w *record.LogWriter)
396 : }
397 :
398 : mem struct {
399 : // The current mutable memTable.
400 : mutable *memTable
401 : // Queue of flushables (the mutable memtable is at end). Elements are
402 : // added to the end of the slice and removed from the beginning. Once an
403 : // index is set it is never modified making a fixed slice immutable and
404 : // safe for concurrent reads.
405 : queue flushableList
406 : // nextSize is the size of the next memtable. The memtable size starts at
407 : // min(256KB,Options.MemTableSize) and doubles each time a new memtable
408 : // is allocated up to Options.MemTableSize. This reduces the memory
409 : // footprint of memtables when lots of DB instances are used concurrently
410 : // in test environments.
411 : nextSize uint64
412 : }
413 :
414 : compact struct {
415 : // Condition variable used to signal when a flush or compaction has
416 : // completed. Used by the write-stall mechanism to wait for the stall
417 : // condition to clear. See DB.makeRoomForWrite().
418 : cond sync.Cond
419 : // True when a flush is in progress.
420 : flushing bool
421 : // The number of ongoing compactions.
422 : compactingCount int
423 : // The list of deletion hints, suggesting ranges for delete-only
424 : // compactions.
425 : deletionHints []deleteCompactionHint
426 : // The list of manual compactions. The next manual compaction to perform
427 : // is at the start of the list. New entries are added to the end.
428 : manual []*manualCompaction
429 : // inProgress is the set of in-progress flushes and compactions.
430 : // It's used in the calculation of some metrics and to initialize L0
431 : // sublevels' state. Some of the compactions contained within this
432 : // map may have already committed an edit to the version but are
433 : // lingering performing cleanup, like deleting obsolete files.
434 : inProgress map[*compaction]struct{}
435 :
436 : // rescheduleReadCompaction indicates to an iterator that a read compaction
437 : // should be scheduled.
438 : rescheduleReadCompaction bool
439 :
440 : // readCompactions is a readCompactionQueue which keeps track of the
441 : // compactions which we might have to perform.
442 : readCompactions readCompactionQueue
443 :
444 : // The cumulative duration of all completed compactions since Open.
445 : // Does not include flushes.
446 : duration time.Duration
447 : // Flush throughput metric.
448 : flushWriteThroughput ThroughputMetric
449 : // The idle start time for the flush "loop", i.e., when the flushing
450 : // bool above transitions to false.
451 : noOngoingFlushStartTime time.Time
452 : }
453 :
454 : // Non-zero when file cleaning is disabled. The disabled count acts as a
455 : // reference count to prohibit file cleaning. See
456 : // DB.{disable,Enable}FileDeletions().
457 : disableFileDeletions int
458 :
459 : snapshots struct {
460 : // The list of active snapshots.
461 : snapshotList
462 :
463 : // The cumulative count and size of snapshot-pinned keys written to
464 : // sstables.
465 : cumulativePinnedCount uint64
466 : cumulativePinnedSize uint64
467 : }
468 :
469 : tableStats struct {
470 : // Condition variable used to signal the completion of a
471 : // job to collect table stats.
472 : cond sync.Cond
473 : // True when a stat collection operation is in progress.
474 : loading bool
475 : // True if stat collection has loaded statistics for all tables
476 : // other than those listed explicitly in pending. This flag starts
477 : // as false when a database is opened and flips to true once stat
478 : // collection has caught up.
479 : loadedInitial bool
480 : // A slice of files for which stats have not been computed.
481 : // Compactions, ingests, flushes append files to be processed. An
482 : // active stat collection goroutine clears the list and processes
483 : // them.
484 : pending []manifest.NewFileEntry
485 : }
486 :
487 : tableValidation struct {
488 : // cond is a condition variable used to signal the completion of a
489 : // job to validate one or more sstables.
490 : cond sync.Cond
491 : // pending is a slice of metadata for sstables waiting to be
492 : // validated. Only physical sstables should be added to the pending
493 : // queue.
494 : pending []newFileEntry
495 : // validating is set to true when validation is running.
496 : validating bool
497 : }
498 : }
499 :
500 : // Normally equal to time.Now() but may be overridden in tests.
501 : timeNow func() time.Time
502 : // the time at database Open; may be used to compute metrics like effective
503 : // compaction concurrency
504 : openedAt time.Time
505 : }
506 :
507 : var _ Reader = (*DB)(nil)
508 : var _ Writer = (*DB)(nil)
509 :
510 : // TestOnlyWaitForCleaning MUST only be used in tests.
511 1 : func (d *DB) TestOnlyWaitForCleaning() {
512 1 : d.cleanupManager.Wait()
513 1 : }
514 :
515 : // Get gets the value for the given key. It returns ErrNotFound if the DB does
516 : // not contain the key.
517 : //
518 : // The caller should not modify the contents of the returned slice, but it is
519 : // safe to modify the contents of the argument after Get returns. The returned
520 : // slice will remain valid until the returned Closer is closed. On success, the
521 : // caller MUST call closer.Close() or a memory leak will occur.
522 2 : func (d *DB) Get(key []byte) ([]byte, io.Closer, error) {
523 2 : return d.getInternal(key, nil /* batch */, nil /* snapshot */)
524 2 : }
525 :
526 : type getIterAlloc struct {
527 : dbi Iterator
528 : keyBuf []byte
529 : get getIter
530 : }
531 :
532 : var getIterAllocPool = sync.Pool{
533 2 : New: func() interface{} {
534 2 : return &getIterAlloc{}
535 2 : },
536 : }
537 :
538 2 : func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer, error) {
539 2 : if err := d.closed.Load(); err != nil {
540 1 : panic(err)
541 : }
542 :
543 : // Grab and reference the current readState. This prevents the underlying
544 : // files in the associated version from being deleted if there is a current
545 : // compaction. The readState is unref'd by Iterator.Close().
546 2 : readState := d.loadReadState()
547 2 :
548 2 : // Determine the seqnum to read at after grabbing the read state (current and
549 2 : // memtables) above.
550 2 : var seqNum uint64
551 2 : if s != nil {
552 2 : seqNum = s.seqNum
553 2 : } else {
554 2 : seqNum = d.mu.versions.visibleSeqNum.Load()
555 2 : }
556 :
557 2 : buf := getIterAllocPool.Get().(*getIterAlloc)
558 2 :
559 2 : get := &buf.get
560 2 : *get = getIter{
561 2 : logger: d.opts.Logger,
562 2 : comparer: d.opts.Comparer,
563 2 : newIters: d.newIters,
564 2 : snapshot: seqNum,
565 2 : key: key,
566 2 : batch: b,
567 2 : mem: readState.memtables,
568 2 : l0: readState.current.L0SublevelFiles,
569 2 : version: readState.current,
570 2 : }
571 2 :
572 2 : // Strip off memtables which cannot possibly contain the seqNum being read
573 2 : // at.
574 2 : for len(get.mem) > 0 {
575 2 : n := len(get.mem)
576 2 : if logSeqNum := get.mem[n-1].logSeqNum; logSeqNum < seqNum {
577 2 : break
578 : }
579 2 : get.mem = get.mem[:n-1]
580 : }
581 :
582 2 : i := &buf.dbi
583 2 : pointIter := get
584 2 : *i = Iterator{
585 2 : ctx: context.Background(),
586 2 : getIterAlloc: buf,
587 2 : iter: pointIter,
588 2 : pointIter: pointIter,
589 2 : merge: d.merge,
590 2 : comparer: *d.opts.Comparer,
591 2 : readState: readState,
592 2 : keyBuf: buf.keyBuf,
593 2 : }
594 2 :
595 2 : if !i.First() {
596 2 : err := i.Close()
597 2 : if err != nil {
598 1 : return nil, nil, err
599 1 : }
600 2 : return nil, nil, ErrNotFound
601 : }
602 2 : return i.Value(), i, nil
603 : }
604 :
605 : // Set sets the value for the given key. It overwrites any previous value
606 : // for that key; a DB is not a multi-map.
607 : //
608 : // It is safe to modify the contents of the arguments after Set returns.
609 2 : func (d *DB) Set(key, value []byte, opts *WriteOptions) error {
610 2 : b := newBatch(d)
611 2 : _ = b.Set(key, value, opts)
612 2 : if err := d.Apply(b, opts); err != nil {
613 1 : return err
614 1 : }
615 : // Only release the batch on success.
616 2 : b.release()
617 2 : return nil
618 : }
619 :
620 : // Delete deletes the value for the given key. Deletes are blind all will
621 : // succeed even if the given key does not exist.
622 : //
623 : // It is safe to modify the contents of the arguments after Delete returns.
624 2 : func (d *DB) Delete(key []byte, opts *WriteOptions) error {
625 2 : b := newBatch(d)
626 2 : _ = b.Delete(key, opts)
627 2 : if err := d.Apply(b, opts); err != nil {
628 1 : return err
629 1 : }
630 : // Only release the batch on success.
631 2 : b.release()
632 2 : return nil
633 : }
634 :
635 : // DeleteSized behaves identically to Delete, but takes an additional
636 : // argument indicating the size of the value being deleted. DeleteSized
637 : // should be preferred when the caller has the expectation that there exists
638 : // a single internal KV pair for the key (eg, the key has not been
639 : // overwritten recently), and the caller knows the size of its value.
640 : //
641 : // DeleteSized will record the value size within the tombstone and use it to
642 : // inform compaction-picking heuristics which strive to reduce space
643 : // amplification in the LSM. This "calling your shot" mechanic allows the
644 : // storage engine to more accurately estimate and reduce space amplification.
645 : //
646 : // It is safe to modify the contents of the arguments after DeleteSized
647 : // returns.
648 1 : func (d *DB) DeleteSized(key []byte, valueSize uint32, opts *WriteOptions) error {
649 1 : b := newBatch(d)
650 1 : _ = b.DeleteSized(key, valueSize, opts)
651 1 : if err := d.Apply(b, opts); err != nil {
652 0 : return err
653 0 : }
654 : // Only release the batch on success.
655 1 : b.release()
656 1 : return nil
657 : }
658 :
659 : // SingleDelete adds an action to the batch that single deletes the entry for key.
660 : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
661 : //
662 : // It is safe to modify the contents of the arguments after SingleDelete returns.
663 2 : func (d *DB) SingleDelete(key []byte, opts *WriteOptions) error {
664 2 : b := newBatch(d)
665 2 : _ = b.SingleDelete(key, opts)
666 2 : if err := d.Apply(b, opts); err != nil {
667 0 : return err
668 0 : }
669 : // Only release the batch on success.
670 2 : b.release()
671 2 : return nil
672 : }
673 :
674 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
675 : // (inclusive on start, exclusive on end).
676 : //
677 : // It is safe to modify the contents of the arguments after DeleteRange
678 : // returns.
679 2 : func (d *DB) DeleteRange(start, end []byte, opts *WriteOptions) error {
680 2 : b := newBatch(d)
681 2 : _ = b.DeleteRange(start, end, opts)
682 2 : if err := d.Apply(b, opts); err != nil {
683 1 : return err
684 1 : }
685 : // Only release the batch on success.
686 2 : b.release()
687 2 : return nil
688 : }
689 :
690 : // Merge adds an action to the DB that merges the value at key with the new
691 : // value. The details of the merge are dependent upon the configured merge
692 : // operator.
693 : //
694 : // It is safe to modify the contents of the arguments after Merge returns.
695 2 : func (d *DB) Merge(key, value []byte, opts *WriteOptions) error {
696 2 : b := newBatch(d)
697 2 : _ = b.Merge(key, value, opts)
698 2 : if err := d.Apply(b, opts); err != nil {
699 1 : return err
700 1 : }
701 : // Only release the batch on success.
702 2 : b.release()
703 2 : return nil
704 : }
705 :
706 : // LogData adds the specified to the batch. The data will be written to the
707 : // WAL, but not added to memtables or sstables. Log data is never indexed,
708 : // which makes it useful for testing WAL performance.
709 : //
710 : // It is safe to modify the contents of the argument after LogData returns.
711 1 : func (d *DB) LogData(data []byte, opts *WriteOptions) error {
712 1 : b := newBatch(d)
713 1 : _ = b.LogData(data, opts)
714 1 : if err := d.Apply(b, opts); err != nil {
715 1 : return err
716 1 : }
717 : // Only release the batch on success.
718 1 : b.release()
719 1 : return nil
720 : }
721 :
722 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
723 : // timestamp suffix to value. The suffix is optional. If any portion of the key
724 : // range [start, end) is already set by a range key with the same suffix value,
725 : // RangeKeySet overrides it.
726 : //
727 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
728 2 : func (d *DB) RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error {
729 2 : b := newBatch(d)
730 2 : _ = b.RangeKeySet(start, end, suffix, value, opts)
731 2 : if err := d.Apply(b, opts); err != nil {
732 0 : return err
733 0 : }
734 : // Only release the batch on success.
735 2 : b.release()
736 2 : return nil
737 : }
738 :
739 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
740 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
741 : // range key. RangeKeyUnset only removes portions of range keys that fall within
742 : // the [start, end) key span, and only range keys with suffixes that exactly
743 : // match the unset suffix.
744 : //
745 : // It is safe to modify the contents of the arguments after RangeKeyUnset
746 : // returns.
747 2 : func (d *DB) RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error {
748 2 : b := newBatch(d)
749 2 : _ = b.RangeKeyUnset(start, end, suffix, opts)
750 2 : if err := d.Apply(b, opts); err != nil {
751 0 : return err
752 0 : }
753 : // Only release the batch on success.
754 2 : b.release()
755 2 : return nil
756 : }
757 :
758 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
759 : // (inclusive on start, exclusive on end). It does not delete point keys (for
760 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
761 : // bounds, including those with or without suffixes.
762 : //
763 : // It is safe to modify the contents of the arguments after RangeKeyDelete
764 : // returns.
765 2 : func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error {
766 2 : b := newBatch(d)
767 2 : _ = b.RangeKeyDelete(start, end, opts)
768 2 : if err := d.Apply(b, opts); err != nil {
769 0 : return err
770 0 : }
771 : // Only release the batch on success.
772 2 : b.release()
773 2 : return nil
774 : }
775 :
776 : // Apply the operations contained in the batch to the DB. If the batch is large
777 : // the contents of the batch may be retained by the database. If that occurs
778 : // the batch contents will be cleared preventing the caller from attempting to
779 : // reuse them.
780 : //
781 : // It is safe to modify the contents of the arguments after Apply returns.
782 2 : func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
783 2 : return d.applyInternal(batch, opts, false)
784 2 : }
785 :
786 : // ApplyNoSyncWait must only be used when opts.Sync is true and the caller
787 : // does not want to wait for the WAL fsync to happen. The method will return
788 : // once the mutation is applied to the memtable and is visible (note that a
789 : // mutation is visible before the WAL sync even in the wait case, so we have
790 : // not weakened the durability semantics). The caller must call Batch.SyncWait
791 : // to wait for the WAL fsync. The caller must not Close the batch without
792 : // first calling Batch.SyncWait.
793 : //
794 : // RECOMMENDATION: Prefer using Apply unless you really understand why you
795 : // need ApplyNoSyncWait.
796 : // EXPERIMENTAL: API/feature subject to change. Do not yet use outside
797 : // CockroachDB.
798 2 : func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error {
799 2 : if !opts.Sync {
800 0 : return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false")
801 0 : }
802 2 : return d.applyInternal(batch, opts, true)
803 : }
804 :
805 : // REQUIRES: noSyncWait => opts.Sync
806 2 : func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error {
807 2 : if err := d.closed.Load(); err != nil {
808 1 : panic(err)
809 : }
810 2 : if batch.applied.Load() {
811 0 : panic("pebble: batch already applied")
812 : }
813 2 : if d.opts.ReadOnly {
814 1 : return ErrReadOnly
815 1 : }
816 2 : if batch.db != nil && batch.db != d {
817 1 : panic(fmt.Sprintf("pebble: batch db mismatch: %p != %p", batch.db, d))
818 : }
819 :
820 2 : sync := opts.GetSync()
821 2 : if sync && d.opts.DisableWAL {
822 0 : return errors.New("pebble: WAL disabled")
823 0 : }
824 :
825 2 : if batch.minimumFormatMajorVersion != FormatMostCompatible {
826 2 : if fmv := d.FormatMajorVersion(); fmv < batch.minimumFormatMajorVersion {
827 1 : panic(fmt.Sprintf(
828 1 : "pebble: batch requires at least format major version %d (current: %d)",
829 1 : batch.minimumFormatMajorVersion, fmv,
830 1 : ))
831 : }
832 : }
833 :
834 2 : if batch.countRangeKeys > 0 {
835 2 : if d.split == nil {
836 0 : return errNoSplit
837 0 : }
838 : // TODO(jackson): Assert that all range key operands are suffixless.
839 : }
840 :
841 2 : if batch.db == nil {
842 1 : batch.refreshMemTableSize()
843 1 : }
844 2 : if batch.memTableSize >= d.largeBatchThreshold {
845 2 : batch.flushable = newFlushableBatch(batch, d.opts.Comparer)
846 2 : }
847 2 : if err := d.commit.Commit(batch, sync, noSyncWait); err != nil {
848 0 : // There isn't much we can do on an error here. The commit pipeline will be
849 0 : // horked at this point.
850 0 : d.opts.Logger.Fatalf("pebble: fatal commit error: %v", err)
851 0 : }
852 : // If this is a large batch, we need to clear the batch contents as the
853 : // flushable batch may still be present in the flushables queue.
854 : //
855 : // TODO(peter): Currently large batches are written to the WAL. We could
856 : // skip the WAL write and instead wait for the large batch to be flushed to
857 : // an sstable. For a 100 MB batch, this might actually be faster. For a 1
858 : // GB batch this is almost certainly faster.
859 2 : if batch.flushable != nil {
860 2 : batch.data = nil
861 2 : }
862 2 : return nil
863 : }
864 :
865 2 : func (d *DB) commitApply(b *Batch, mem *memTable) error {
866 2 : if b.flushable != nil {
867 2 : // This is a large batch which was already added to the immutable queue.
868 2 : return nil
869 2 : }
870 2 : err := mem.apply(b, b.SeqNum())
871 2 : if err != nil {
872 0 : return err
873 0 : }
874 :
875 : // If the batch contains range tombstones and the database is configured
876 : // to flush range deletions, schedule a delayed flush so that disk space
877 : // may be reclaimed without additional writes or an explicit flush.
878 2 : if b.countRangeDels > 0 && d.opts.FlushDelayDeleteRange > 0 {
879 2 : d.mu.Lock()
880 2 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayDeleteRange)
881 2 : d.mu.Unlock()
882 2 : }
883 :
884 : // If the batch contains range keys and the database is configured to flush
885 : // range keys, schedule a delayed flush so that the range keys are cleared
886 : // from the memtable.
887 2 : if b.countRangeKeys > 0 && d.opts.FlushDelayRangeKey > 0 {
888 2 : d.mu.Lock()
889 2 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayRangeKey)
890 2 : d.mu.Unlock()
891 2 : }
892 :
893 2 : if mem.writerUnref() {
894 2 : d.mu.Lock()
895 2 : d.maybeScheduleFlush()
896 2 : d.mu.Unlock()
897 2 : }
898 2 : return nil
899 : }
900 :
901 2 : func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
902 2 : var size int64
903 2 : repr := b.Repr()
904 2 :
905 2 : if b.flushable != nil {
906 2 : // We have a large batch. Such batches are special in that they don't get
907 2 : // added to the memtable, and are instead inserted into the queue of
908 2 : // memtables. The call to makeRoomForWrite with this batch will force the
909 2 : // current memtable to be flushed. We want the large batch to be part of
910 2 : // the same log, so we add it to the WAL here, rather than after the call
911 2 : // to makeRoomForWrite().
912 2 : //
913 2 : // Set the sequence number since it was not set to the correct value earlier
914 2 : // (see comment in newFlushableBatch()).
915 2 : b.flushable.setSeqNum(b.SeqNum())
916 2 : if !d.opts.DisableWAL {
917 2 : var err error
918 2 : size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr)
919 2 : if err != nil {
920 0 : panic(err)
921 : }
922 : }
923 : }
924 :
925 2 : d.mu.Lock()
926 2 :
927 2 : var err error
928 2 : if !b.ingestedSSTBatch {
929 2 : // Batches which contain keys of kind InternalKeyKindIngestSST will
930 2 : // never be applied to the memtable, so we don't need to make room for
931 2 : // write. For the other cases, switch out the memtable if there was not
932 2 : // enough room to store the batch.
933 2 : err = d.makeRoomForWrite(b)
934 2 : }
935 :
936 2 : if err == nil && !d.opts.DisableWAL {
937 2 : d.mu.log.bytesIn += uint64(len(repr))
938 2 : }
939 :
940 : // Grab a reference to the memtable while holding DB.mu. Note that for
941 : // non-flushable batches (b.flushable == nil) makeRoomForWrite() added a
942 : // reference to the memtable which will prevent it from being flushed until
943 : // we unreference it. This reference is dropped in DB.commitApply().
944 2 : mem := d.mu.mem.mutable
945 2 :
946 2 : d.mu.Unlock()
947 2 : if err != nil {
948 0 : return nil, err
949 0 : }
950 :
951 2 : if d.opts.DisableWAL {
952 2 : return mem, nil
953 2 : }
954 :
955 2 : if b.flushable == nil {
956 2 : size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr)
957 2 : if err != nil {
958 0 : panic(err)
959 : }
960 : }
961 :
962 2 : d.logSize.Store(uint64(size))
963 2 : return mem, err
964 : }
965 :
966 : type iterAlloc struct {
967 : dbi Iterator
968 : keyBuf []byte
969 : boundsBuf [2][]byte
970 : prefixOrFullSeekKey []byte
971 : merging mergingIter
972 : mlevels [3 + numLevels]mergingIterLevel
973 : levels [3 + numLevels]levelIter
974 : levelsPositioned [3 + numLevels]bool
975 : }
976 :
977 : var iterAllocPool = sync.Pool{
978 2 : New: func() interface{} {
979 2 : return &iterAlloc{}
980 2 : },
981 : }
982 :
983 : // snapshotIterOpts denotes snapshot-related iterator options when calling
984 : // newIter. These are the possible cases for a snapshotIterOpts:
985 : // - No snapshot: All fields are zero values.
986 : // - Classic snapshot: Only `seqNum` is set. The latest readState will be used
987 : // and the specified seqNum will be used as the snapshot seqNum.
988 : // - EventuallyFileOnlySnapshot (EFOS) behaving as a classic snapshot. Only
989 : // the `seqNum` is set. The latest readState will be used
990 : // and the specified seqNum will be used as the snapshot seqNum.
991 : // - EFOS in file-only state: Only `seqNum` and `vers` are set. All the
992 : // relevant SSTs are referenced by the *version.
993 : type snapshotIterOpts struct {
994 : seqNum uint64
995 : vers *version
996 : }
997 :
998 : // newIter constructs a new iterator, merging in batch iterators as an extra
999 : // level.
1000 : func (d *DB) newIter(
1001 : ctx context.Context, batch *Batch, sOpts snapshotIterOpts, o *IterOptions,
1002 2 : ) *Iterator {
1003 2 : if err := d.closed.Load(); err != nil {
1004 1 : panic(err)
1005 : }
1006 2 : seqNum := sOpts.seqNum
1007 2 : if o.rangeKeys() {
1008 2 : if d.FormatMajorVersion() < FormatRangeKeys {
1009 1 : panic(fmt.Sprintf(
1010 1 : "pebble: range keys require at least format major version %d (current: %d)",
1011 1 : FormatRangeKeys, d.FormatMajorVersion(),
1012 1 : ))
1013 : }
1014 : }
1015 2 : if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
1016 0 : panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
1017 : }
1018 2 : if (batch != nil || seqNum != 0) && (o != nil && o.OnlyReadGuaranteedDurable) {
1019 1 : // We could add support for OnlyReadGuaranteedDurable on snapshots if
1020 1 : // there was a need: this would require checking that the sequence number
1021 1 : // of the snapshot has been flushed, by comparing with
1022 1 : // DB.mem.queue[0].logSeqNum.
1023 1 : panic("OnlyReadGuaranteedDurable is not supported for batches or snapshots")
1024 : }
1025 : // Grab and reference the current readState. This prevents the underlying
1026 : // files in the associated version from being deleted if there is a current
1027 : // compaction. The readState is unref'd by Iterator.Close().
1028 2 : var readState *readState
1029 2 : if sOpts.vers == nil {
1030 2 : // NB: loadReadState() calls readState.ref().
1031 2 : readState = d.loadReadState()
1032 2 : } else {
1033 2 : // s.vers != nil
1034 2 : sOpts.vers.Ref()
1035 2 : }
1036 :
1037 : // Determine the seqnum to read at after grabbing the read state (current and
1038 : // memtables) above.
1039 2 : if seqNum == 0 {
1040 2 : seqNum = d.mu.versions.visibleSeqNum.Load()
1041 2 : }
1042 :
1043 : // Bundle various structures under a single umbrella in order to allocate
1044 : // them together.
1045 2 : buf := iterAllocPool.Get().(*iterAlloc)
1046 2 : dbi := &buf.dbi
1047 2 : *dbi = Iterator{
1048 2 : ctx: ctx,
1049 2 : alloc: buf,
1050 2 : merge: d.merge,
1051 2 : comparer: *d.opts.Comparer,
1052 2 : readState: readState,
1053 2 : version: sOpts.vers,
1054 2 : keyBuf: buf.keyBuf,
1055 2 : prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
1056 2 : boundsBuf: buf.boundsBuf,
1057 2 : batch: batch,
1058 2 : newIters: d.newIters,
1059 2 : newIterRangeKey: d.tableNewRangeKeyIter,
1060 2 : seqNum: seqNum,
1061 2 : }
1062 2 : if o != nil {
1063 2 : dbi.opts = *o
1064 2 : dbi.processBounds(o.LowerBound, o.UpperBound)
1065 2 : }
1066 2 : dbi.opts.logger = d.opts.Logger
1067 2 : if d.opts.private.disableLazyCombinedIteration {
1068 1 : dbi.opts.disableLazyCombinedIteration = true
1069 1 : }
1070 2 : if batch != nil {
1071 2 : dbi.batchSeqNum = dbi.batch.nextSeqNum()
1072 2 : }
1073 2 : return finishInitializingIter(ctx, buf)
1074 : }
1075 :
1076 : // finishInitializingIter is a helper for doing the non-trivial initialization
1077 : // of an Iterator. It's invoked to perform the initial initialization of an
1078 : // Iterator during NewIter or Clone, and to perform reinitialization due to a
1079 : // change in IterOptions by a call to Iterator.SetOptions.
1080 2 : func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
1081 2 : // Short-hand.
1082 2 : dbi := &buf.dbi
1083 2 : var memtables flushableList
1084 2 : if dbi.readState != nil {
1085 2 : memtables = dbi.readState.memtables
1086 2 : }
1087 2 : if dbi.opts.OnlyReadGuaranteedDurable {
1088 1 : memtables = nil
1089 2 : } else {
1090 2 : // We only need to read from memtables which contain sequence numbers older
1091 2 : // than seqNum. Trim off newer memtables.
1092 2 : for i := len(memtables) - 1; i >= 0; i-- {
1093 2 : if logSeqNum := memtables[i].logSeqNum; logSeqNum < dbi.seqNum {
1094 2 : break
1095 : }
1096 2 : memtables = memtables[:i]
1097 : }
1098 : }
1099 :
1100 2 : if dbi.opts.pointKeys() {
1101 2 : // Construct the point iterator, initializing dbi.pointIter to point to
1102 2 : // dbi.merging. If this is called during a SetOptions call and this
1103 2 : // Iterator has already initialized dbi.merging, constructPointIter is a
1104 2 : // noop and an initialized pointIter already exists in dbi.pointIter.
1105 2 : dbi.constructPointIter(ctx, memtables, buf)
1106 2 : dbi.iter = dbi.pointIter
1107 2 : } else {
1108 2 : dbi.iter = emptyIter
1109 2 : }
1110 :
1111 2 : if dbi.opts.rangeKeys() {
1112 2 : dbi.rangeKeyMasking.init(dbi, dbi.comparer.Compare, dbi.comparer.Split)
1113 2 :
1114 2 : // When iterating over both point and range keys, don't create the
1115 2 : // range-key iterator stack immediately if we can avoid it. This
1116 2 : // optimization takes advantage of the expected sparseness of range
1117 2 : // keys, and configures the point-key iterator to dynamically switch to
1118 2 : // combined iteration when it observes a file containing range keys.
1119 2 : //
1120 2 : // Lazy combined iteration is not possible if a batch or a memtable
1121 2 : // contains any range keys.
1122 2 : useLazyCombinedIteration := dbi.rangeKey == nil &&
1123 2 : dbi.opts.KeyTypes == IterKeyTypePointsAndRanges &&
1124 2 : (dbi.batch == nil || dbi.batch.countRangeKeys == 0) &&
1125 2 : !dbi.opts.disableLazyCombinedIteration
1126 2 : if useLazyCombinedIteration {
1127 2 : // The user requested combined iteration, and there's no indexed
1128 2 : // batch currently containing range keys that would prevent lazy
1129 2 : // combined iteration. Check the memtables to see if they contain
1130 2 : // any range keys.
1131 2 : for i := range memtables {
1132 2 : if memtables[i].containsRangeKeys() {
1133 2 : useLazyCombinedIteration = false
1134 2 : break
1135 : }
1136 : }
1137 : }
1138 :
1139 2 : if useLazyCombinedIteration {
1140 2 : dbi.lazyCombinedIter = lazyCombinedIter{
1141 2 : parent: dbi,
1142 2 : pointIter: dbi.pointIter,
1143 2 : combinedIterState: combinedIterState{
1144 2 : initialized: false,
1145 2 : },
1146 2 : }
1147 2 : dbi.iter = &dbi.lazyCombinedIter
1148 2 : dbi.iter = invalidating.MaybeWrapIfInvariants(dbi.iter)
1149 2 : } else {
1150 2 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{
1151 2 : initialized: true,
1152 2 : }
1153 2 : if dbi.rangeKey == nil {
1154 2 : dbi.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1155 2 : dbi.rangeKey.init(dbi.comparer.Compare, dbi.comparer.Split, &dbi.opts)
1156 2 : dbi.constructRangeKeyIter()
1157 2 : } else {
1158 2 : dbi.rangeKey.iterConfig.SetBounds(dbi.opts.LowerBound, dbi.opts.UpperBound)
1159 2 : }
1160 :
1161 : // Wrap the point iterator (currently dbi.iter) with an interleaving
1162 : // iterator that interleaves range keys pulled from
1163 : // dbi.rangeKey.rangeKeyIter.
1164 : //
1165 : // NB: The interleaving iterator is always reinitialized, even if
1166 : // dbi already had an initialized range key iterator, in case the point
1167 : // iterator changed or the range key masking suffix changed.
1168 2 : dbi.rangeKey.iiter.Init(&dbi.comparer, dbi.iter, dbi.rangeKey.rangeKeyIter,
1169 2 : keyspan.InterleavingIterOpts{
1170 2 : Mask: &dbi.rangeKeyMasking,
1171 2 : LowerBound: dbi.opts.LowerBound,
1172 2 : UpperBound: dbi.opts.UpperBound,
1173 2 : })
1174 2 : dbi.iter = &dbi.rangeKey.iiter
1175 : }
1176 2 : } else {
1177 2 : // !dbi.opts.rangeKeys()
1178 2 : //
1179 2 : // Reset the combined iterator state. The initialized=true ensures the
1180 2 : // iterator doesn't unnecessarily try to switch to combined iteration.
1181 2 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{initialized: true}
1182 2 : }
1183 2 : return dbi
1184 : }
1185 :
1186 : // ScanInternal scans all internal keys within the specified bounds, truncating
1187 : // any rangedels and rangekeys to those bounds if they span past them. For use
1188 : // when an external user needs to be aware of all internal keys that make up a
1189 : // key range.
1190 : //
1191 : // Keys deleted by range deletions must not be returned or exposed by this
1192 : // method, while the range deletion deleting that key must be exposed using
1193 : // visitRangeDel. Keys that would be masked by range key masking (if an
1194 : // appropriate prefix were set) should be exposed, alongside the range key
1195 : // that would have masked it. This method also collapses all point keys into
1196 : // one InternalKey; so only one internal key at most per user key is returned
1197 : // to visitPointKey.
1198 : //
1199 : // If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
1200 : // mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
1201 : // their metadatas truncated to [lower, upper) and passed into visitSharedFile.
1202 : // ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
1203 : // sstable in L5 or L6 is found that is not in shared storage according to
1204 : // provider.IsShared, or an sstable in those levels contains a newer key than the
1205 : // snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
1206 : // of when this could happen could be if Pebble started writing sstables before a
1207 : // creator ID was set (as creator IDs are necessary to enable shared storage)
1208 : // resulting in some lower level SSTs being on non-shared storage. Skip-shared
1209 : // iteration is invalid in those cases.
1210 : func (d *DB) ScanInternal(
1211 : ctx context.Context,
1212 : lower, upper []byte,
1213 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
1214 : visitRangeDel func(start, end []byte, seqNum uint64) error,
1215 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
1216 : visitSharedFile func(sst *SharedSSTMeta) error,
1217 1 : ) error {
1218 1 : scanInternalOpts := &scanInternalOptions{
1219 1 : visitPointKey: visitPointKey,
1220 1 : visitRangeDel: visitRangeDel,
1221 1 : visitRangeKey: visitRangeKey,
1222 1 : visitSharedFile: visitSharedFile,
1223 1 : skipSharedLevels: visitSharedFile != nil,
1224 1 : IterOptions: IterOptions{
1225 1 : KeyTypes: IterKeyTypePointsAndRanges,
1226 1 : LowerBound: lower,
1227 1 : UpperBound: upper,
1228 1 : },
1229 1 : }
1230 1 : iter := d.newInternalIter(snapshotIterOpts{} /* snapshot */, scanInternalOpts)
1231 1 : defer iter.close()
1232 1 : return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
1233 1 : }
1234 :
1235 : // newInternalIter constructs and returns a new scanInternalIterator on this db.
1236 : // If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
1237 : // to the internal iterator.
1238 : //
1239 : // TODO(bilal): This method has a lot of similarities with db.newIter as well as
1240 : // finishInitializingIter. Both pairs of methods should be refactored to reduce
1241 : // this duplication.
1242 1 : func (d *DB) newInternalIter(sOpts snapshotIterOpts, o *scanInternalOptions) *scanInternalIterator {
1243 1 : if err := d.closed.Load(); err != nil {
1244 0 : panic(err)
1245 : }
1246 : // Grab and reference the current readState. This prevents the underlying
1247 : // files in the associated version from being deleted if there is a current
1248 : // compaction. The readState is unref'd by Iterator.Close().
1249 1 : var readState *readState
1250 1 : if sOpts.vers == nil {
1251 1 : readState = d.loadReadState()
1252 1 : }
1253 1 : if sOpts.vers != nil {
1254 1 : sOpts.vers.Ref()
1255 1 : }
1256 :
1257 : // Determine the seqnum to read at after grabbing the read state (current and
1258 : // memtables) above.
1259 1 : seqNum := sOpts.seqNum
1260 1 : if seqNum == 0 {
1261 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
1262 1 : }
1263 :
1264 : // Bundle various structures under a single umbrella in order to allocate
1265 : // them together.
1266 1 : buf := iterAllocPool.Get().(*iterAlloc)
1267 1 : dbi := &scanInternalIterator{
1268 1 : db: d,
1269 1 : comparer: d.opts.Comparer,
1270 1 : merge: d.opts.Merger.Merge,
1271 1 : readState: readState,
1272 1 : version: sOpts.vers,
1273 1 : alloc: buf,
1274 1 : newIters: d.newIters,
1275 1 : newIterRangeKey: d.tableNewRangeKeyIter,
1276 1 : seqNum: seqNum,
1277 1 : mergingIter: &buf.merging,
1278 1 : }
1279 1 : if o != nil {
1280 1 : dbi.opts = *o
1281 1 : }
1282 1 : dbi.opts.logger = d.opts.Logger
1283 1 : if d.opts.private.disableLazyCombinedIteration {
1284 0 : dbi.opts.disableLazyCombinedIteration = true
1285 0 : }
1286 1 : return finishInitializingInternalIter(buf, dbi)
1287 : }
1288 :
1289 1 : func finishInitializingInternalIter(buf *iterAlloc, i *scanInternalIterator) *scanInternalIterator {
1290 1 : // Short-hand.
1291 1 : var memtables flushableList
1292 1 : if i.readState != nil {
1293 1 : memtables = i.readState.memtables
1294 1 : }
1295 : // We only need to read from memtables which contain sequence numbers older
1296 : // than seqNum. Trim off newer memtables.
1297 1 : for j := len(memtables) - 1; j >= 0; j-- {
1298 1 : if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
1299 1 : break
1300 : }
1301 1 : memtables = memtables[:j]
1302 : }
1303 1 : i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
1304 1 :
1305 1 : i.constructPointIter(memtables, buf)
1306 1 :
1307 1 : // For internal iterators, we skip the lazy combined iteration optimization
1308 1 : // entirely, and create the range key iterator stack directly.
1309 1 : i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1310 1 : i.rangeKey.init(i.comparer.Compare, i.comparer.Split, &i.opts.IterOptions)
1311 1 : i.constructRangeKeyIter()
1312 1 :
1313 1 : // Wrap the point iterator (currently i.iter) with an interleaving
1314 1 : // iterator that interleaves range keys pulled from
1315 1 : // i.rangeKey.rangeKeyIter.
1316 1 : i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
1317 1 : keyspan.InterleavingIterOpts{
1318 1 : LowerBound: i.opts.LowerBound,
1319 1 : UpperBound: i.opts.UpperBound,
1320 1 : })
1321 1 : i.iter = &i.rangeKey.iiter
1322 1 :
1323 1 : return i
1324 : }
1325 :
1326 : func (i *Iterator) constructPointIter(
1327 : ctx context.Context, memtables flushableList, buf *iterAlloc,
1328 2 : ) {
1329 2 : if i.pointIter != nil {
1330 2 : // Already have one.
1331 2 : return
1332 2 : }
1333 2 : internalOpts := internalIterOpts{stats: &i.stats.InternalStats}
1334 2 : if i.opts.RangeKeyMasking.Filter != nil {
1335 2 : internalOpts.boundLimitedFilter = &i.rangeKeyMasking
1336 2 : }
1337 :
1338 : // Merging levels and levels from iterAlloc.
1339 2 : mlevels := buf.mlevels[:0]
1340 2 : levels := buf.levels[:0]
1341 2 :
1342 2 : // We compute the number of levels needed ahead of time and reallocate a slice if
1343 2 : // the array from the iterAlloc isn't large enough. Doing this allocation once
1344 2 : // should improve the performance.
1345 2 : numMergingLevels := 0
1346 2 : numLevelIters := 0
1347 2 : if i.batch != nil {
1348 2 : numMergingLevels++
1349 2 : }
1350 2 : numMergingLevels += len(memtables)
1351 2 :
1352 2 : current := i.version
1353 2 : if current == nil {
1354 2 : current = i.readState.current
1355 2 : }
1356 2 : numMergingLevels += len(current.L0SublevelFiles)
1357 2 : numLevelIters += len(current.L0SublevelFiles)
1358 2 : for level := 1; level < len(current.Levels); level++ {
1359 2 : if current.Levels[level].Empty() {
1360 2 : continue
1361 : }
1362 2 : numMergingLevels++
1363 2 : numLevelIters++
1364 : }
1365 :
1366 2 : if numMergingLevels > cap(mlevels) {
1367 1 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
1368 1 : }
1369 2 : if numLevelIters > cap(levels) {
1370 1 : levels = make([]levelIter, 0, numLevelIters)
1371 1 : }
1372 :
1373 : // Top-level is the batch, if any.
1374 2 : if i.batch != nil {
1375 2 : if i.batch.index == nil {
1376 0 : // This isn't an indexed batch. Include an error iterator so that
1377 0 : // the resulting iterator correctly surfaces ErrIndexed.
1378 0 : mlevels = append(mlevels, mergingIterLevel{
1379 0 : iter: newErrorIter(ErrNotIndexed),
1380 0 : rangeDelIter: newErrorKeyspanIter(ErrNotIndexed),
1381 0 : })
1382 2 : } else {
1383 2 : i.batch.initInternalIter(&i.opts, &i.batchPointIter)
1384 2 : i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum)
1385 2 : // Only include the batch's rangedel iterator if it's non-empty.
1386 2 : // This requires some subtle logic in the case a rangedel is later
1387 2 : // written to the batch and the view of the batch is refreshed
1388 2 : // during a call to SetOptions—in this case, we need to reconstruct
1389 2 : // the point iterator to add the batch rangedel iterator.
1390 2 : var rangeDelIter keyspan.FragmentIterator
1391 2 : if i.batchRangeDelIter.Count() > 0 {
1392 2 : rangeDelIter = &i.batchRangeDelIter
1393 2 : }
1394 2 : mlevels = append(mlevels, mergingIterLevel{
1395 2 : iter: &i.batchPointIter,
1396 2 : rangeDelIter: rangeDelIter,
1397 2 : })
1398 : }
1399 : }
1400 :
1401 : // Next are the memtables.
1402 2 : for j := len(memtables) - 1; j >= 0; j-- {
1403 2 : mem := memtables[j]
1404 2 : mlevels = append(mlevels, mergingIterLevel{
1405 2 : iter: mem.newIter(&i.opts),
1406 2 : rangeDelIter: mem.newRangeDelIter(&i.opts),
1407 2 : })
1408 2 : }
1409 :
1410 : // Next are the file levels: L0 sub-levels followed by lower levels.
1411 2 : mlevelsIndex := len(mlevels)
1412 2 : levelsIndex := len(levels)
1413 2 : mlevels = mlevels[:numMergingLevels]
1414 2 : levels = levels[:numLevelIters]
1415 2 : i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
1416 2 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
1417 2 : li := &levels[levelsIndex]
1418 2 :
1419 2 : li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
1420 2 : li.initRangeDel(&mlevels[mlevelsIndex].rangeDelIter)
1421 2 : li.initBoundaryContext(&mlevels[mlevelsIndex].levelIterBoundaryContext)
1422 2 : li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
1423 2 : mlevels[mlevelsIndex].levelIter = li
1424 2 : mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
1425 2 :
1426 2 : levelsIndex++
1427 2 : mlevelsIndex++
1428 2 : }
1429 :
1430 : // Add level iterators for the L0 sublevels, iterating from newest to
1431 : // oldest.
1432 2 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
1433 2 : addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
1434 2 : }
1435 :
1436 : // Add level iterators for the non-empty non-L0 levels.
1437 2 : for level := 1; level < len(current.Levels); level++ {
1438 2 : if current.Levels[level].Empty() {
1439 2 : continue
1440 : }
1441 2 : addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
1442 : }
1443 2 : buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
1444 2 : if len(mlevels) <= cap(buf.levelsPositioned) {
1445 2 : buf.merging.levelsPositioned = buf.levelsPositioned[:len(mlevels)]
1446 2 : }
1447 2 : buf.merging.snapshot = i.seqNum
1448 2 : buf.merging.batchSnapshot = i.batchSeqNum
1449 2 : buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
1450 2 : i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging)
1451 2 : i.merging = &buf.merging
1452 : }
1453 :
1454 : // NewBatch returns a new empty write-only batch. Any reads on the batch will
1455 : // return an error. If the batch is committed it will be applied to the DB.
1456 2 : func (d *DB) NewBatch() *Batch {
1457 2 : return newBatch(d)
1458 2 : }
1459 :
1460 : // NewBatchWithSize is mostly identical to NewBatch, but it will allocate the
1461 : // the specified memory space for the internal slice in advance.
1462 0 : func (d *DB) NewBatchWithSize(size int) *Batch {
1463 0 : return newBatchWithSize(d, size)
1464 0 : }
1465 :
1466 : // NewIndexedBatch returns a new empty read-write batch. Any reads on the batch
1467 : // will read from both the batch and the DB. If the batch is committed it will
1468 : // be applied to the DB. An indexed batch is slower that a non-indexed batch
1469 : // for insert operations. If you do not need to perform reads on the batch, use
1470 : // NewBatch instead.
1471 2 : func (d *DB) NewIndexedBatch() *Batch {
1472 2 : return newIndexedBatch(d, d.opts.Comparer)
1473 2 : }
1474 :
1475 : // NewIndexedBatchWithSize is mostly identical to NewIndexedBatch, but it will
1476 : // allocate the the specified memory space for the internal slice in advance.
1477 0 : func (d *DB) NewIndexedBatchWithSize(size int) *Batch {
1478 0 : return newIndexedBatchWithSize(d, d.opts.Comparer, size)
1479 0 : }
1480 :
1481 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
1482 : // return false). The iterator can be positioned via a call to SeekGE, SeekLT,
1483 : // First or Last. The iterator provides a point-in-time view of the current DB
1484 : // state. This view is maintained by preventing file deletions and preventing
1485 : // memtables referenced by the iterator from being deleted. Using an iterator
1486 : // to maintain a long-lived point-in-time view of the DB state can lead to an
1487 : // apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for
1488 : // point-in-time snapshots which avoids these problems.
1489 2 : func (d *DB) NewIter(o *IterOptions) (*Iterator, error) {
1490 2 : return d.NewIterWithContext(context.Background(), o)
1491 2 : }
1492 :
1493 : // NewIterWithContext is like NewIter, and additionally accepts a context for
1494 : // tracing.
1495 2 : func (d *DB) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
1496 2 : return d.newIter(ctx, nil /* batch */, snapshotIterOpts{}, o), nil
1497 2 : }
1498 :
1499 : // NewSnapshot returns a point-in-time view of the current DB state. Iterators
1500 : // created with this handle will all observe a stable snapshot of the current
1501 : // DB state. The caller must call Snapshot.Close() when the snapshot is no
1502 : // longer needed. Snapshots are not persisted across DB restarts (close ->
1503 : // open). Unlike the implicit snapshot maintained by an iterator, a snapshot
1504 : // will not prevent memtables from being released or sstables from being
1505 : // deleted. Instead, a snapshot prevents deletion of sequence numbers
1506 : // referenced by the snapshot.
1507 2 : func (d *DB) NewSnapshot() *Snapshot {
1508 2 : if err := d.closed.Load(); err != nil {
1509 1 : panic(err)
1510 : }
1511 :
1512 2 : d.mu.Lock()
1513 2 : s := &Snapshot{
1514 2 : db: d,
1515 2 : seqNum: d.mu.versions.visibleSeqNum.Load(),
1516 2 : }
1517 2 : d.mu.snapshots.pushBack(s)
1518 2 : d.mu.Unlock()
1519 2 : return s
1520 : }
1521 :
1522 : // NewEventuallyFileOnlySnapshot returns a point-in-time view of the current DB
1523 : // state, similar to NewSnapshot, but with consistency constrained to the
1524 : // provided set of key ranges. See the comment at EventuallyFileOnlySnapshot for
1525 : // its semantics.
1526 2 : func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
1527 2 : if err := d.closed.Load(); err != nil {
1528 0 : panic(err)
1529 : }
1530 :
1531 2 : internalKeyRanges := make([]internalKeyRange, len(keyRanges))
1532 2 : for i := range keyRanges {
1533 2 : if i > 0 && d.cmp(keyRanges[i-1].End, keyRanges[i].Start) > 0 {
1534 0 : panic("pebble: key ranges for eventually-file-only-snapshot not in order")
1535 : }
1536 2 : internalKeyRanges[i] = internalKeyRange{
1537 2 : smallest: base.MakeInternalKey(keyRanges[i].Start, InternalKeySeqNumMax, InternalKeyKindMax),
1538 2 : largest: base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, keyRanges[i].End),
1539 2 : }
1540 : }
1541 :
1542 2 : return d.makeEventuallyFileOnlySnapshot(keyRanges, internalKeyRanges)
1543 : }
1544 :
1545 : // Close closes the DB.
1546 : //
1547 : // It is not safe to close a DB until all outstanding iterators are closed
1548 : // or to call Close concurrently with any other DB method. It is not valid
1549 : // to call any of a DB's methods after the DB has been closed.
1550 2 : func (d *DB) Close() error {
1551 2 : // Lock the commit pipeline for the duration of Close. This prevents a race
1552 2 : // with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires
1553 2 : // dropping d.mu several times for I/O. If Close only holds d.mu, an
1554 2 : // in-progress WAL rotation may re-acquire d.mu only once the database is
1555 2 : // closed.
1556 2 : //
1557 2 : // Additionally, locking the commit pipeline makes it more likely that
1558 2 : // (illegal) concurrent writes will observe d.closed.Load() != nil, creating
1559 2 : // more understable panics if the database is improperly used concurrently
1560 2 : // during Close.
1561 2 : d.commit.mu.Lock()
1562 2 : defer d.commit.mu.Unlock()
1563 2 : d.mu.Lock()
1564 2 : defer d.mu.Unlock()
1565 2 : if err := d.closed.Load(); err != nil {
1566 1 : panic(err)
1567 : }
1568 :
1569 : // Clear the finalizer that is used to check that an unreferenced DB has been
1570 : // closed. We're closing the DB here, so the check performed by that
1571 : // finalizer isn't necessary.
1572 : //
1573 : // Note: this is a no-op if invariants are disabled or race is enabled.
1574 2 : invariants.SetFinalizer(d.closed, nil)
1575 2 :
1576 2 : d.closed.Store(errors.WithStack(ErrClosed))
1577 2 : close(d.closedCh)
1578 2 :
1579 2 : defer d.opts.Cache.Unref()
1580 2 :
1581 2 : for d.mu.compact.compactingCount > 0 || d.mu.compact.flushing {
1582 2 : d.mu.compact.cond.Wait()
1583 2 : }
1584 2 : for d.mu.tableStats.loading {
1585 2 : d.mu.tableStats.cond.Wait()
1586 2 : }
1587 2 : for d.mu.tableValidation.validating {
1588 2 : d.mu.tableValidation.cond.Wait()
1589 2 : }
1590 :
1591 2 : var err error
1592 2 : if n := len(d.mu.compact.inProgress); n > 0 {
1593 1 : err = errors.Errorf("pebble: %d unexpected in-progress compactions", errors.Safe(n))
1594 1 : }
1595 2 : err = firstError(err, d.mu.formatVers.marker.Close())
1596 2 : err = firstError(err, d.tableCache.close())
1597 2 : if !d.opts.ReadOnly {
1598 2 : err = firstError(err, d.mu.log.Close())
1599 2 : } else if d.mu.log.LogWriter != nil {
1600 0 : panic("pebble: log-writer should be nil in read-only mode")
1601 : }
1602 2 : err = firstError(err, d.fileLock.Close())
1603 2 :
1604 2 : // Note that versionSet.close() only closes the MANIFEST. The versions list
1605 2 : // is still valid for the checks below.
1606 2 : err = firstError(err, d.mu.versions.close())
1607 2 :
1608 2 : err = firstError(err, d.dataDir.Close())
1609 2 : if d.dataDir != d.walDir {
1610 2 : err = firstError(err, d.walDir.Close())
1611 2 : }
1612 :
1613 2 : d.readState.val.unrefLocked()
1614 2 :
1615 2 : current := d.mu.versions.currentVersion()
1616 2 : for v := d.mu.versions.versions.Front(); true; v = v.Next() {
1617 2 : refs := v.Refs()
1618 2 : if v == current {
1619 2 : if refs != 1 {
1620 1 : err = firstError(err, errors.Errorf("leaked iterators: current\n%s", v))
1621 1 : }
1622 2 : break
1623 : }
1624 0 : if refs != 0 {
1625 0 : err = firstError(err, errors.Errorf("leaked iterators:\n%s", v))
1626 0 : }
1627 : }
1628 :
1629 2 : for _, mem := range d.mu.mem.queue {
1630 2 : // Usually, we'd want to delete the files returned by readerUnref. But
1631 2 : // in this case, even if we're unreferencing the flushables, the
1632 2 : // flushables aren't obsolete. They will be reconstructed during WAL
1633 2 : // replay.
1634 2 : mem.readerUnrefLocked(false)
1635 2 : }
1636 : // If there's an unused, recycled memtable, we need to release its memory.
1637 2 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
1638 2 : d.freeMemTable(obsoleteMemTable)
1639 2 : }
1640 2 : if reserved := d.memTableReserved.Load(); reserved != 0 {
1641 1 : err = firstError(err, errors.Errorf("leaked memtable reservation: %d", errors.Safe(reserved)))
1642 1 : }
1643 :
1644 : // Since we called d.readState.val.unrefLocked() above, we are expected to
1645 : // manually schedule deletion of obsolete files.
1646 2 : if len(d.mu.versions.obsoleteTables) > 0 {
1647 2 : d.deleteObsoleteFiles(d.mu.nextJobID)
1648 2 : }
1649 :
1650 2 : d.mu.Unlock()
1651 2 : d.compactionSchedulers.Wait()
1652 2 :
1653 2 : // Wait for all cleaning jobs to finish.
1654 2 : d.cleanupManager.Close()
1655 2 :
1656 2 : // Sanity check metrics.
1657 2 : if invariants.Enabled {
1658 2 : m := d.Metrics()
1659 2 : if m.Compact.NumInProgress > 0 || m.Compact.InProgressBytes > 0 {
1660 0 : d.mu.Lock()
1661 0 : panic(fmt.Sprintf("invalid metrics on close:\n%s", m))
1662 : }
1663 : }
1664 :
1665 2 : d.mu.Lock()
1666 2 :
1667 2 : // As a sanity check, ensure that there are no zombie tables. A non-zero count
1668 2 : // hints at a reference count leak.
1669 2 : if ztbls := len(d.mu.versions.zombieTables); ztbls > 0 {
1670 0 : err = firstError(err, errors.Errorf("non-zero zombie file count: %d", ztbls))
1671 0 : }
1672 :
1673 2 : err = firstError(err, d.objProvider.Close())
1674 2 :
1675 2 : // If the options include a closer to 'close' the filesystem, close it.
1676 2 : if d.opts.private.fsCloser != nil {
1677 2 : d.opts.private.fsCloser.Close()
1678 2 : }
1679 :
1680 : // Return an error if the user failed to close all open snapshots.
1681 2 : if v := d.mu.snapshots.count(); v > 0 {
1682 1 : err = firstError(err, errors.Errorf("leaked snapshots: %d open snapshots on DB %p", v, d))
1683 1 : }
1684 :
1685 2 : return err
1686 : }
1687 :
1688 : // Compact the specified range of keys in the database.
1689 2 : func (d *DB) Compact(start, end []byte, parallelize bool) error {
1690 2 : if err := d.closed.Load(); err != nil {
1691 1 : panic(err)
1692 : }
1693 2 : if d.opts.ReadOnly {
1694 1 : return ErrReadOnly
1695 1 : }
1696 2 : if d.cmp(start, end) >= 0 {
1697 2 : return errors.Errorf("Compact start %s is not less than end %s",
1698 2 : d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
1699 2 : }
1700 2 : iStart := base.MakeInternalKey(start, InternalKeySeqNumMax, InternalKeyKindMax)
1701 2 : iEnd := base.MakeInternalKey(end, 0, 0)
1702 2 : m := (&fileMetadata{}).ExtendPointKeyBounds(d.cmp, iStart, iEnd)
1703 2 : meta := []*fileMetadata{m}
1704 2 :
1705 2 : d.mu.Lock()
1706 2 : maxLevelWithFiles := 1
1707 2 : cur := d.mu.versions.currentVersion()
1708 2 : for level := 0; level < numLevels; level++ {
1709 2 : overlaps := cur.Overlaps(level, d.cmp, start, end, iEnd.IsExclusiveSentinel())
1710 2 : if !overlaps.Empty() {
1711 2 : maxLevelWithFiles = level + 1
1712 2 : }
1713 : }
1714 :
1715 2 : keyRanges := make([]internalKeyRange, len(meta))
1716 2 : for i := range meta {
1717 2 : keyRanges[i] = internalKeyRange{smallest: m.Smallest, largest: m.Largest}
1718 2 : }
1719 : // Determine if any memtable overlaps with the compaction range. We wait for
1720 : // any such overlap to flush (initiating a flush if necessary).
1721 2 : mem, err := func() (*flushableEntry, error) {
1722 2 : // Check to see if any files overlap with any of the memtables. The queue
1723 2 : // is ordered from oldest to newest with the mutable memtable being the
1724 2 : // last element in the slice. We want to wait for the newest table that
1725 2 : // overlaps.
1726 2 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1727 2 : mem := d.mu.mem.queue[i]
1728 2 : if ingestMemtableOverlaps(d.cmp, mem, keyRanges) {
1729 2 : var err error
1730 2 : if mem.flushable == d.mu.mem.mutable {
1731 2 : // We have to hold both commitPipeline.mu and DB.mu when calling
1732 2 : // makeRoomForWrite(). Lock order requirements elsewhere force us to
1733 2 : // unlock DB.mu in order to grab commitPipeline.mu first.
1734 2 : d.mu.Unlock()
1735 2 : d.commit.mu.Lock()
1736 2 : d.mu.Lock()
1737 2 : defer d.commit.mu.Unlock()
1738 2 : if mem.flushable == d.mu.mem.mutable {
1739 2 : // Only flush if the active memtable is unchanged.
1740 2 : err = d.makeRoomForWrite(nil)
1741 2 : }
1742 : }
1743 2 : mem.flushForced = true
1744 2 : d.maybeScheduleFlush()
1745 2 : return mem, err
1746 : }
1747 : }
1748 2 : return nil, nil
1749 : }()
1750 :
1751 2 : d.mu.Unlock()
1752 2 :
1753 2 : if err != nil {
1754 0 : return err
1755 0 : }
1756 2 : if mem != nil {
1757 2 : <-mem.flushed
1758 2 : }
1759 :
1760 2 : for level := 0; level < maxLevelWithFiles; {
1761 2 : if err := d.manualCompact(
1762 2 : iStart.UserKey, iEnd.UserKey, level, parallelize); err != nil {
1763 1 : return err
1764 1 : }
1765 2 : level++
1766 2 : if level == numLevels-1 {
1767 2 : // A manual compaction of the bottommost level occurred.
1768 2 : // There is no next level to try and compact.
1769 2 : break
1770 : }
1771 : }
1772 2 : return nil
1773 : }
1774 :
1775 2 : func (d *DB) manualCompact(start, end []byte, level int, parallelize bool) error {
1776 2 : d.mu.Lock()
1777 2 : curr := d.mu.versions.currentVersion()
1778 2 : files := curr.Overlaps(level, d.cmp, start, end, false)
1779 2 : if files.Empty() {
1780 2 : d.mu.Unlock()
1781 2 : return nil
1782 2 : }
1783 :
1784 2 : var compactions []*manualCompaction
1785 2 : if parallelize {
1786 2 : compactions = append(compactions, d.splitManualCompaction(start, end, level)...)
1787 2 : } else {
1788 2 : compactions = append(compactions, &manualCompaction{
1789 2 : level: level,
1790 2 : done: make(chan error, 1),
1791 2 : start: start,
1792 2 : end: end,
1793 2 : })
1794 2 : }
1795 2 : d.mu.compact.manual = append(d.mu.compact.manual, compactions...)
1796 2 : d.maybeScheduleCompaction()
1797 2 : d.mu.Unlock()
1798 2 :
1799 2 : // Each of the channels is guaranteed to be eventually sent to once. After a
1800 2 : // compaction is possibly picked in d.maybeScheduleCompaction(), either the
1801 2 : // compaction is dropped, executed after being scheduled, or retried later.
1802 2 : // Assuming eventual progress when a compaction is retried, all outcomes send
1803 2 : // a value to the done channel. Since the channels are buffered, it is not
1804 2 : // necessary to read from each channel, and so we can exit early in the event
1805 2 : // of an error.
1806 2 : for _, compaction := range compactions {
1807 2 : if err := <-compaction.done; err != nil {
1808 1 : return err
1809 1 : }
1810 : }
1811 2 : return nil
1812 : }
1813 :
1814 : // splitManualCompaction splits a manual compaction over [start,end] on level
1815 : // such that the resulting compactions have no key overlap.
1816 : func (d *DB) splitManualCompaction(
1817 : start, end []byte, level int,
1818 2 : ) (splitCompactions []*manualCompaction) {
1819 2 : curr := d.mu.versions.currentVersion()
1820 2 : endLevel := level + 1
1821 2 : baseLevel := d.mu.versions.picker.getBaseLevel()
1822 2 : if level == 0 {
1823 2 : endLevel = baseLevel
1824 2 : }
1825 2 : keyRanges := calculateInuseKeyRanges(curr, d.cmp, level, endLevel, start, end)
1826 2 : for _, keyRange := range keyRanges {
1827 2 : splitCompactions = append(splitCompactions, &manualCompaction{
1828 2 : level: level,
1829 2 : done: make(chan error, 1),
1830 2 : start: keyRange.Start,
1831 2 : end: keyRange.End,
1832 2 : split: true,
1833 2 : })
1834 2 : }
1835 2 : return splitCompactions
1836 : }
1837 :
1838 : // DownloadSpan is a key range passed to the Download method.
1839 : type DownloadSpan struct {
1840 : StartKey []byte
1841 : // EndKey is exclusive.
1842 : EndKey []byte
1843 : }
1844 :
1845 : // Download ensures that the LSM does not use any external sstables for the
1846 : // given key ranges. It does so by performing appropriate compactions so that
1847 : // all external data becomes available locally.
1848 : //
1849 : // Note that calling this method does not imply that all other compactions stop;
1850 : // it simply informs Pebble of a list of spans for which external data should be
1851 : // downloaded with high priority.
1852 : //
1853 : // The method returns once no external sstasbles overlap the given spans, the
1854 : // context is canceled, or an error is hit.
1855 : //
1856 : // TODO(radu): consider passing a priority/impact knob to express how important
1857 : // the download is (versus live traffic performance, LSM health).
1858 0 : func (d *DB) Download(ctx context.Context, spans []DownloadSpan) error {
1859 0 : return errors.Errorf("not implemented")
1860 0 : }
1861 :
1862 : // Flush the memtable to stable storage.
1863 2 : func (d *DB) Flush() error {
1864 2 : flushDone, err := d.AsyncFlush()
1865 2 : if err != nil {
1866 1 : return err
1867 1 : }
1868 2 : <-flushDone
1869 2 : return nil
1870 : }
1871 :
1872 : // AsyncFlush asynchronously flushes the memtable to stable storage.
1873 : //
1874 : // If no error is returned, the caller can receive from the returned channel in
1875 : // order to wait for the flush to complete.
1876 2 : func (d *DB) AsyncFlush() (<-chan struct{}, error) {
1877 2 : if err := d.closed.Load(); err != nil {
1878 1 : panic(err)
1879 : }
1880 2 : if d.opts.ReadOnly {
1881 1 : return nil, ErrReadOnly
1882 1 : }
1883 :
1884 2 : d.commit.mu.Lock()
1885 2 : defer d.commit.mu.Unlock()
1886 2 : d.mu.Lock()
1887 2 : defer d.mu.Unlock()
1888 2 : flushed := d.mu.mem.queue[len(d.mu.mem.queue)-1].flushed
1889 2 : err := d.makeRoomForWrite(nil)
1890 2 : if err != nil {
1891 0 : return nil, err
1892 0 : }
1893 2 : return flushed, nil
1894 : }
1895 :
1896 : // Metrics returns metrics about the database.
1897 2 : func (d *DB) Metrics() *Metrics {
1898 2 : metrics := &Metrics{}
1899 2 : recycledLogsCount, recycledLogSize := d.logRecycler.stats()
1900 2 :
1901 2 : d.mu.Lock()
1902 2 : vers := d.mu.versions.currentVersion()
1903 2 : *metrics = d.mu.versions.metrics
1904 2 : metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt(0)
1905 2 : metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load()
1906 2 : metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount)
1907 2 : metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction
1908 2 : metrics.Compact.Duration = d.mu.compact.duration
1909 2 : for c := range d.mu.compact.inProgress {
1910 1 : if c.kind != compactionKindFlush {
1911 1 : metrics.Compact.Duration += d.timeNow().Sub(c.beganAt)
1912 1 : }
1913 : }
1914 :
1915 2 : for _, m := range d.mu.mem.queue {
1916 2 : metrics.MemTable.Size += m.totalBytes()
1917 2 : }
1918 2 : metrics.Snapshots.Count = d.mu.snapshots.count()
1919 2 : if metrics.Snapshots.Count > 0 {
1920 1 : metrics.Snapshots.EarliestSeqNum = d.mu.snapshots.earliest()
1921 1 : }
1922 2 : metrics.Snapshots.PinnedKeys = d.mu.snapshots.cumulativePinnedCount
1923 2 : metrics.Snapshots.PinnedSize = d.mu.snapshots.cumulativePinnedSize
1924 2 : metrics.MemTable.Count = int64(len(d.mu.mem.queue))
1925 2 : metrics.MemTable.ZombieCount = d.memTableCount.Load() - metrics.MemTable.Count
1926 2 : metrics.MemTable.ZombieSize = uint64(d.memTableReserved.Load()) - metrics.MemTable.Size
1927 2 : metrics.WAL.ObsoleteFiles = int64(recycledLogsCount)
1928 2 : metrics.WAL.ObsoletePhysicalSize = recycledLogSize
1929 2 : metrics.WAL.Size = d.logSize.Load()
1930 2 : // The current WAL size (d.atomic.logSize) is the current logical size,
1931 2 : // which may be less than the WAL's physical size if it was recycled.
1932 2 : // The file sizes in d.mu.log.queue are updated to the physical size
1933 2 : // during WAL rotation. Use the larger of the two for the current WAL. All
1934 2 : // the previous WALs's fileSizes in d.mu.log.queue are already updated.
1935 2 : metrics.WAL.PhysicalSize = metrics.WAL.Size
1936 2 : if len(d.mu.log.queue) > 0 && metrics.WAL.PhysicalSize < d.mu.log.queue[len(d.mu.log.queue)-1].fileSize {
1937 1 : metrics.WAL.PhysicalSize = d.mu.log.queue[len(d.mu.log.queue)-1].fileSize
1938 1 : }
1939 2 : for i, n := 0, len(d.mu.log.queue)-1; i < n; i++ {
1940 2 : metrics.WAL.PhysicalSize += d.mu.log.queue[i].fileSize
1941 2 : }
1942 :
1943 2 : metrics.WAL.BytesIn = d.mu.log.bytesIn // protected by d.mu
1944 2 : for i, n := 0, len(d.mu.mem.queue)-1; i < n; i++ {
1945 2 : metrics.WAL.Size += d.mu.mem.queue[i].logSize
1946 2 : }
1947 2 : metrics.WAL.BytesWritten = metrics.Levels[0].BytesIn + metrics.WAL.Size
1948 2 : if p := d.mu.versions.picker; p != nil {
1949 2 : compactions := d.getInProgressCompactionInfoLocked(nil)
1950 2 : for level, score := range p.getScores(compactions) {
1951 2 : metrics.Levels[level].Score = score
1952 2 : }
1953 : }
1954 2 : metrics.Table.ZombieCount = int64(len(d.mu.versions.zombieTables))
1955 2 : for _, size := range d.mu.versions.zombieTables {
1956 1 : metrics.Table.ZombieSize += size
1957 1 : }
1958 2 : metrics.private.optionsFileSize = d.optionsFileSize
1959 2 :
1960 2 : // TODO(jackson): Consider making these metrics optional.
1961 2 : metrics.Keys.RangeKeySetsCount = countRangeKeySetFragments(vers)
1962 2 : metrics.Keys.TombstoneCount = countTombstones(vers)
1963 2 :
1964 2 : d.mu.versions.logLock()
1965 2 : metrics.private.manifestFileSize = uint64(d.mu.versions.manifest.Size())
1966 2 : d.mu.versions.logUnlock()
1967 2 :
1968 2 : metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
1969 2 : if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
1970 0 : d.opts.Logger.Infof("metrics error: %s", err)
1971 0 : }
1972 2 : metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput
1973 2 : if d.mu.compact.flushing {
1974 1 : metrics.Flush.NumInProgress = 1
1975 1 : }
1976 2 : for i := 0; i < numLevels; i++ {
1977 2 : metrics.Levels[i].Additional.ValueBlocksSize = valueBlocksSizeForLevel(vers, i)
1978 2 : }
1979 :
1980 2 : d.mu.Unlock()
1981 2 :
1982 2 : metrics.BlockCache = d.opts.Cache.Metrics()
1983 2 : metrics.TableCache, metrics.Filter = d.tableCache.metrics()
1984 2 : metrics.TableIters = int64(d.tableCache.iterCount())
1985 2 :
1986 2 : metrics.SecondaryCacheMetrics = d.objProvider.Metrics()
1987 2 :
1988 2 : metrics.Uptime = d.timeNow().Sub(d.openedAt)
1989 2 :
1990 2 : return metrics
1991 : }
1992 :
1993 : // sstablesOptions hold the optional parameters to retrieve TableInfo for all sstables.
1994 : type sstablesOptions struct {
1995 : // set to true will return the sstable properties in TableInfo
1996 : withProperties bool
1997 :
1998 : // if set, return sstables that overlap the key range (end-exclusive)
1999 : start []byte
2000 : end []byte
2001 :
2002 : withApproximateSpanBytes bool
2003 : }
2004 :
2005 : // SSTablesOption set optional parameter used by `DB.SSTables`.
2006 : type SSTablesOption func(*sstablesOptions)
2007 :
2008 : // WithProperties enable return sstable properties in each TableInfo.
2009 : //
2010 : // NOTE: if most of the sstable properties need to be read from disk,
2011 : // this options may make method `SSTables` quite slow.
2012 1 : func WithProperties() SSTablesOption {
2013 1 : return func(opt *sstablesOptions) {
2014 1 : opt.withProperties = true
2015 1 : }
2016 : }
2017 :
2018 : // WithKeyRangeFilter ensures returned sstables overlap start and end (end-exclusive)
2019 : // if start and end are both nil these properties have no effect.
2020 1 : func WithKeyRangeFilter(start, end []byte) SSTablesOption {
2021 1 : return func(opt *sstablesOptions) {
2022 1 : opt.end = end
2023 1 : opt.start = start
2024 1 : }
2025 : }
2026 :
2027 : // WithApproximateSpanBytes enables capturing the approximate number of bytes that
2028 : // overlap the provided key span for each sstable.
2029 : // NOTE: this option can only be used with WithKeyRangeFilter and WithProperties
2030 : // provided.
2031 1 : func WithApproximateSpanBytes() SSTablesOption {
2032 1 : return func(opt *sstablesOptions) {
2033 1 : opt.withApproximateSpanBytes = true
2034 1 : }
2035 : }
2036 :
2037 : // BackingType denotes the type of storage backing a given sstable.
2038 : type BackingType int
2039 :
2040 : const (
2041 : // BackingTypeLocal denotes an sstable stored on local disk according to the
2042 : // objprovider. This file is completely owned by us.
2043 : BackingTypeLocal BackingType = iota
2044 : // BackingTypeShared denotes an sstable stored on shared storage, created
2045 : // by this Pebble instance and possibly shared by other Pebble instances.
2046 : // These types of files have lifecycle managed by Pebble.
2047 : BackingTypeShared
2048 : // BackingTypeSharedForeign denotes an sstable stored on shared storage,
2049 : // created by a Pebble instance other than this one. These types of files have
2050 : // lifecycle managed by Pebble.
2051 : BackingTypeSharedForeign
2052 : // BackingTypeExternal denotes an sstable stored on external storage,
2053 : // not owned by any Pebble instance and with no refcounting/cleanup methods
2054 : // or lifecycle management. An example of an external file is a file restored
2055 : // from a backup.
2056 : BackingTypeExternal
2057 : )
2058 :
2059 : // SSTableInfo export manifest.TableInfo with sstable.Properties alongside
2060 : // other file backing info.
2061 : type SSTableInfo struct {
2062 : manifest.TableInfo
2063 : // Virtual indicates whether the sstable is virtual.
2064 : Virtual bool
2065 : // BackingSSTNum is the file number associated with backing sstable which
2066 : // backs the sstable associated with this SSTableInfo. If Virtual is false,
2067 : // then BackingSSTNum == FileNum.
2068 : BackingSSTNum base.FileNum
2069 : // BackingType is the type of storage backing this sstable.
2070 : BackingType BackingType
2071 : // Locator is the remote.Locator backing this sstable, if the backing type is
2072 : // not BackingTypeLocal.
2073 : Locator remote.Locator
2074 :
2075 : // Properties is the sstable properties of this table. If Virtual is true,
2076 : // then the Properties are associated with the backing sst.
2077 : Properties *sstable.Properties
2078 : }
2079 :
2080 : // SSTables retrieves the current sstables. The returned slice is indexed by
2081 : // level and each level is indexed by the position of the sstable within the
2082 : // level. Note that this information may be out of date due to concurrent
2083 : // flushes and compactions.
2084 1 : func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
2085 1 : opt := &sstablesOptions{}
2086 1 : for _, fn := range opts {
2087 1 : fn(opt)
2088 1 : }
2089 :
2090 1 : if opt.withApproximateSpanBytes && !opt.withProperties {
2091 1 : return nil, errors.Errorf("Cannot use WithApproximateSpanBytes without WithProperties option.")
2092 1 : }
2093 1 : if opt.withApproximateSpanBytes && (opt.start == nil || opt.end == nil) {
2094 1 : return nil, errors.Errorf("Cannot use WithApproximateSpanBytes without WithKeyRangeFilter option.")
2095 1 : }
2096 :
2097 : // Grab and reference the current readState.
2098 1 : readState := d.loadReadState()
2099 1 : defer readState.unref()
2100 1 :
2101 1 : // TODO(peter): This is somewhat expensive, especially on a large
2102 1 : // database. It might be worthwhile to unify TableInfo and FileMetadata and
2103 1 : // then we could simply return current.Files. Note that RocksDB is doing
2104 1 : // something similar to the current code, so perhaps it isn't too bad.
2105 1 : srcLevels := readState.current.Levels
2106 1 : var totalTables int
2107 1 : for i := range srcLevels {
2108 1 : totalTables += srcLevels[i].Len()
2109 1 : }
2110 :
2111 1 : destTables := make([]SSTableInfo, totalTables)
2112 1 : destLevels := make([][]SSTableInfo, len(srcLevels))
2113 1 : for i := range destLevels {
2114 1 : iter := srcLevels[i].Iter()
2115 1 : j := 0
2116 1 : for m := iter.First(); m != nil; m = iter.Next() {
2117 1 : if opt.start != nil && opt.end != nil && !m.Overlaps(d.opts.Comparer.Compare, opt.start, opt.end, true /* exclusive end */) {
2118 1 : continue
2119 : }
2120 1 : destTables[j] = SSTableInfo{TableInfo: m.TableInfo()}
2121 1 : if opt.withProperties {
2122 1 : p, err := d.tableCache.getTableProperties(
2123 1 : m,
2124 1 : )
2125 1 : if err != nil {
2126 0 : return nil, err
2127 0 : }
2128 1 : destTables[j].Properties = p
2129 : }
2130 1 : destTables[j].Virtual = m.Virtual
2131 1 : destTables[j].BackingSSTNum = m.FileBacking.DiskFileNum.FileNum()
2132 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, m.FileBacking.DiskFileNum)
2133 1 : if err != nil {
2134 0 : return nil, err
2135 0 : }
2136 1 : if objMeta.IsRemote() {
2137 0 : if objMeta.IsShared() {
2138 0 : if d.objProvider.IsSharedForeign(objMeta) {
2139 0 : destTables[j].BackingType = BackingTypeSharedForeign
2140 0 : } else {
2141 0 : destTables[j].BackingType = BackingTypeShared
2142 0 : }
2143 0 : } else {
2144 0 : destTables[j].BackingType = BackingTypeExternal
2145 0 : }
2146 0 : destTables[j].Locator = objMeta.Remote.Locator
2147 1 : } else {
2148 1 : destTables[j].BackingType = BackingTypeLocal
2149 1 : }
2150 :
2151 1 : if opt.withApproximateSpanBytes {
2152 1 : var spanBytes uint64
2153 1 : if m.ContainedWithinSpan(d.opts.Comparer.Compare, opt.start, opt.end) {
2154 0 : spanBytes = m.Size
2155 1 : } else {
2156 1 : size, err := d.tableCache.estimateSize(m, opt.start, opt.end)
2157 1 : if err != nil {
2158 0 : return nil, err
2159 0 : }
2160 1 : spanBytes = size
2161 : }
2162 1 : propertiesCopy := *destTables[j].Properties
2163 1 :
2164 1 : // Deep copy user properties so approximate span bytes can be added.
2165 1 : propertiesCopy.UserProperties = make(map[string]string, len(destTables[j].Properties.UserProperties)+1)
2166 1 : for k, v := range destTables[j].Properties.UserProperties {
2167 0 : propertiesCopy.UserProperties[k] = v
2168 0 : }
2169 1 : propertiesCopy.UserProperties["approximate-span-bytes"] = strconv.FormatUint(spanBytes, 10)
2170 1 : destTables[j].Properties = &propertiesCopy
2171 : }
2172 1 : j++
2173 : }
2174 1 : destLevels[i] = destTables[:j]
2175 1 : destTables = destTables[j:]
2176 : }
2177 :
2178 1 : return destLevels, nil
2179 : }
2180 :
2181 : // EstimateDiskUsage returns the estimated filesystem space used in bytes for
2182 : // storing the range `[start, end]`. The estimation is computed as follows:
2183 : //
2184 : // - For sstables fully contained in the range the whole file size is included.
2185 : // - For sstables partially contained in the range the overlapping data block sizes
2186 : // are included. Even if a data block partially overlaps, or we cannot determine
2187 : // overlap due to abbreviated index keys, the full data block size is included in
2188 : // the estimation. Note that unlike fully contained sstables, none of the
2189 : // meta-block space is counted for partially overlapped files.
2190 : // - For virtual sstables, we use the overlap between start, end and the virtual
2191 : // sstable bounds to determine disk usage.
2192 : // - There may also exist WAL entries for unflushed keys in this range. This
2193 : // estimation currently excludes space used for the range in the WAL.
2194 1 : func (d *DB) EstimateDiskUsage(start, end []byte) (uint64, error) {
2195 1 : bytes, _, _, err := d.EstimateDiskUsageByBackingType(start, end)
2196 1 : return bytes, err
2197 1 : }
2198 :
2199 : // EstimateDiskUsageByBackingType is like EstimateDiskUsage but additionally
2200 : // returns the subsets of that size in remote ane external files.
2201 : func (d *DB) EstimateDiskUsageByBackingType(
2202 : start, end []byte,
2203 1 : ) (totalSize, remoteSize, externalSize uint64, _ error) {
2204 1 : if err := d.closed.Load(); err != nil {
2205 0 : panic(err)
2206 : }
2207 1 : if d.opts.Comparer.Compare(start, end) > 0 {
2208 0 : return 0, 0, 0, errors.New("invalid key-range specified (start > end)")
2209 0 : }
2210 :
2211 : // Grab and reference the current readState. This prevents the underlying
2212 : // files in the associated version from being deleted if there is a concurrent
2213 : // compaction.
2214 1 : readState := d.loadReadState()
2215 1 : defer readState.unref()
2216 1 :
2217 1 : for level, files := range readState.current.Levels {
2218 1 : iter := files.Iter()
2219 1 : if level > 0 {
2220 1 : // We can only use `Overlaps` to restrict `files` at L1+ since at L0 it
2221 1 : // expands the range iteratively until it has found a set of files that
2222 1 : // do not overlap any other L0 files outside that set.
2223 1 : overlaps := readState.current.Overlaps(level, d.opts.Comparer.Compare, start, end, false /* exclusiveEnd */)
2224 1 : iter = overlaps.Iter()
2225 1 : }
2226 1 : for file := iter.First(); file != nil; file = iter.Next() {
2227 1 : if d.opts.Comparer.Compare(start, file.Smallest.UserKey) <= 0 &&
2228 1 : d.opts.Comparer.Compare(file.Largest.UserKey, end) <= 0 {
2229 1 : // The range fully contains the file, so skip looking it up in
2230 1 : // table cache/looking at its indexes, and add the full file size.
2231 1 : meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
2232 1 : if err != nil {
2233 0 : return 0, 0, 0, err
2234 0 : }
2235 1 : if meta.IsRemote() {
2236 0 : remoteSize += file.Size
2237 0 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
2238 0 : externalSize += file.Size
2239 0 : }
2240 : }
2241 1 : totalSize += file.Size
2242 1 : } else if d.opts.Comparer.Compare(file.Smallest.UserKey, end) <= 0 &&
2243 1 : d.opts.Comparer.Compare(start, file.Largest.UserKey) <= 0 {
2244 1 : var size uint64
2245 1 : var err error
2246 1 : if file.Virtual {
2247 0 : err = d.tableCache.withVirtualReader(
2248 0 : file.VirtualMeta(),
2249 0 : func(r sstable.VirtualReader) (err error) {
2250 0 : size, err = r.EstimateDiskUsage(start, end)
2251 0 : return err
2252 0 : },
2253 : )
2254 1 : } else {
2255 1 : err = d.tableCache.withReader(
2256 1 : file.PhysicalMeta(),
2257 1 : func(r *sstable.Reader) (err error) {
2258 1 : size, err = r.EstimateDiskUsage(start, end)
2259 1 : return err
2260 1 : },
2261 : )
2262 : }
2263 1 : if err != nil {
2264 0 : return 0, 0, 0, err
2265 0 : }
2266 1 : meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
2267 1 : if err != nil {
2268 0 : return 0, 0, 0, err
2269 0 : }
2270 1 : if meta.IsRemote() {
2271 0 : remoteSize += size
2272 0 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
2273 0 : externalSize += size
2274 0 : }
2275 : }
2276 1 : totalSize += size
2277 : }
2278 : }
2279 : }
2280 1 : return totalSize, remoteSize, externalSize, nil
2281 : }
2282 :
2283 2 : func (d *DB) walPreallocateSize() int {
2284 2 : // Set the WAL preallocate size to 110% of the memtable size. Note that there
2285 2 : // is a bit of apples and oranges in units here as the memtabls size
2286 2 : // corresponds to the memory usage of the memtable while the WAL size is the
2287 2 : // size of the batches (plus overhead) stored in the WAL.
2288 2 : //
2289 2 : // TODO(peter): 110% of the memtable size is quite hefty for a block
2290 2 : // size. This logic is taken from GetWalPreallocateBlockSize in
2291 2 : // RocksDB. Could a smaller preallocation block size be used?
2292 2 : size := d.opts.MemTableSize
2293 2 : size = (size / 10) + size
2294 2 : return int(size)
2295 2 : }
2296 :
2297 2 : func (d *DB) newMemTable(logNum FileNum, logSeqNum uint64) (*memTable, *flushableEntry) {
2298 2 : size := d.mu.mem.nextSize
2299 2 : if d.mu.mem.nextSize < d.opts.MemTableSize {
2300 2 : d.mu.mem.nextSize *= 2
2301 2 : if d.mu.mem.nextSize > d.opts.MemTableSize {
2302 1 : d.mu.mem.nextSize = d.opts.MemTableSize
2303 1 : }
2304 : }
2305 :
2306 2 : memtblOpts := memTableOptions{
2307 2 : Options: d.opts,
2308 2 : logSeqNum: logSeqNum,
2309 2 : }
2310 2 :
2311 2 : // Before attempting to allocate a new memtable, check if there's one
2312 2 : // available for recycling in memTableRecycle. Large contiguous allocations
2313 2 : // can be costly as fragmentation makes it more difficult to find a large
2314 2 : // contiguous free space. We've observed 64MB allocations taking 10ms+.
2315 2 : //
2316 2 : // To reduce these costly allocations, up to 1 obsolete memtable is stashed
2317 2 : // in `d.memTableRecycle` to allow a future memtable rotation to reuse
2318 2 : // existing memory.
2319 2 : var mem *memTable
2320 2 : mem = d.memTableRecycle.Swap(nil)
2321 2 : if mem != nil && uint64(len(mem.arenaBuf)) != size {
2322 2 : d.freeMemTable(mem)
2323 2 : mem = nil
2324 2 : }
2325 2 : if mem != nil {
2326 2 : // Carry through the existing buffer and memory reservation.
2327 2 : memtblOpts.arenaBuf = mem.arenaBuf
2328 2 : memtblOpts.releaseAccountingReservation = mem.releaseAccountingReservation
2329 2 : } else {
2330 2 : mem = new(memTable)
2331 2 : memtblOpts.arenaBuf = manual.New(int(size))
2332 2 : memtblOpts.releaseAccountingReservation = d.opts.Cache.Reserve(int(size))
2333 2 : d.memTableCount.Add(1)
2334 2 : d.memTableReserved.Add(int64(size))
2335 2 :
2336 2 : // Note: this is a no-op if invariants are disabled or race is enabled.
2337 2 : invariants.SetFinalizer(mem, checkMemTable)
2338 2 : }
2339 2 : mem.init(memtblOpts)
2340 2 :
2341 2 : entry := d.newFlushableEntry(mem, logNum, logSeqNum)
2342 2 : entry.releaseMemAccounting = func() {
2343 2 : // If the user leaks iterators, we may be releasing the memtable after
2344 2 : // the DB is already closed. In this case, we want to just release the
2345 2 : // memory because DB.Close won't come along to free it for us.
2346 2 : if err := d.closed.Load(); err != nil {
2347 2 : d.freeMemTable(mem)
2348 2 : return
2349 2 : }
2350 :
2351 : // The next memtable allocation might be able to reuse this memtable.
2352 : // Stash it on d.memTableRecycle.
2353 2 : if unusedMem := d.memTableRecycle.Swap(mem); unusedMem != nil {
2354 2 : // There was already a memtable waiting to be recycled. We're now
2355 2 : // responsible for freeing it.
2356 2 : d.freeMemTable(unusedMem)
2357 2 : }
2358 : }
2359 2 : return mem, entry
2360 : }
2361 :
2362 2 : func (d *DB) freeMemTable(m *memTable) {
2363 2 : d.memTableCount.Add(-1)
2364 2 : d.memTableReserved.Add(-int64(len(m.arenaBuf)))
2365 2 : m.free()
2366 2 : }
2367 :
2368 2 : func (d *DB) newFlushableEntry(f flushable, logNum FileNum, logSeqNum uint64) *flushableEntry {
2369 2 : fe := &flushableEntry{
2370 2 : flushable: f,
2371 2 : flushed: make(chan struct{}),
2372 2 : logNum: logNum,
2373 2 : logSeqNum: logSeqNum,
2374 2 : deleteFn: d.mu.versions.addObsolete,
2375 2 : deleteFnLocked: d.mu.versions.addObsoleteLocked,
2376 2 : }
2377 2 : fe.readerRefs.Store(1)
2378 2 : return fe
2379 2 : }
2380 :
2381 : // makeRoomForWrite ensures that the memtable has room to hold the contents of
2382 : // Batch. It reserves the space in the memtable and adds a reference to the
2383 : // memtable. The caller must later ensure that the memtable is unreferenced. If
2384 : // the memtable is full, or a nil Batch is provided, the current memtable is
2385 : // rotated (marked as immutable) and a new mutable memtable is allocated. This
2386 : // memtable rotation also causes a log rotation.
2387 : //
2388 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2389 : // may be released and reacquired.
2390 2 : func (d *DB) makeRoomForWrite(b *Batch) error {
2391 2 : if b != nil && b.ingestedSSTBatch {
2392 0 : panic("pebble: invalid function call")
2393 : }
2394 :
2395 2 : force := b == nil || b.flushable != nil
2396 2 : stalled := false
2397 2 : for {
2398 2 : if b != nil && b.flushable == nil {
2399 2 : err := d.mu.mem.mutable.prepare(b)
2400 2 : if err != arenaskl.ErrArenaFull {
2401 2 : if stalled {
2402 2 : d.opts.EventListener.WriteStallEnd()
2403 2 : }
2404 2 : return err
2405 : }
2406 2 : } else if !force {
2407 2 : if stalled {
2408 2 : d.opts.EventListener.WriteStallEnd()
2409 2 : }
2410 2 : return nil
2411 : }
2412 : // force || err == ErrArenaFull, so we need to rotate the current memtable.
2413 2 : {
2414 2 : var size uint64
2415 2 : for i := range d.mu.mem.queue {
2416 2 : size += d.mu.mem.queue[i].totalBytes()
2417 2 : }
2418 2 : if size >= uint64(d.opts.MemTableStopWritesThreshold)*d.opts.MemTableSize {
2419 2 : // We have filled up the current memtable, but already queued memtables
2420 2 : // are still flushing, so we wait.
2421 2 : if !stalled {
2422 2 : stalled = true
2423 2 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2424 2 : Reason: "memtable count limit reached",
2425 2 : })
2426 2 : }
2427 2 : now := time.Now()
2428 2 : d.mu.compact.cond.Wait()
2429 2 : if b != nil {
2430 2 : b.commitStats.MemTableWriteStallDuration += time.Since(now)
2431 2 : }
2432 2 : continue
2433 : }
2434 : }
2435 2 : l0ReadAmp := d.mu.versions.currentVersion().L0Sublevels.ReadAmplification()
2436 2 : if l0ReadAmp >= d.opts.L0StopWritesThreshold {
2437 2 : // There are too many level-0 files, so we wait.
2438 2 : if !stalled {
2439 2 : stalled = true
2440 2 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2441 2 : Reason: "L0 file count limit exceeded",
2442 2 : })
2443 2 : }
2444 2 : now := time.Now()
2445 2 : d.mu.compact.cond.Wait()
2446 2 : if b != nil {
2447 2 : b.commitStats.L0ReadAmpWriteStallDuration += time.Since(now)
2448 2 : }
2449 2 : continue
2450 : }
2451 :
2452 2 : var newLogNum base.FileNum
2453 2 : var prevLogSize uint64
2454 2 : if !d.opts.DisableWAL {
2455 2 : now := time.Now()
2456 2 : newLogNum, prevLogSize = d.recycleWAL()
2457 2 : if b != nil {
2458 2 : b.commitStats.WALRotationDuration += time.Since(now)
2459 2 : }
2460 : }
2461 :
2462 2 : immMem := d.mu.mem.mutable
2463 2 : imm := d.mu.mem.queue[len(d.mu.mem.queue)-1]
2464 2 : imm.logSize = prevLogSize
2465 2 : imm.flushForced = imm.flushForced || (b == nil)
2466 2 :
2467 2 : // If we are manually flushing and we used less than half of the bytes in
2468 2 : // the memtable, don't increase the size for the next memtable. This
2469 2 : // reduces memtable memory pressure when an application is frequently
2470 2 : // manually flushing.
2471 2 : if (b == nil) && uint64(immMem.availBytes()) > immMem.totalBytes()/2 {
2472 2 : d.mu.mem.nextSize = immMem.totalBytes()
2473 2 : }
2474 :
2475 2 : if b != nil && b.flushable != nil {
2476 2 : // The batch is too large to fit in the memtable so add it directly to
2477 2 : // the immutable queue. The flushable batch is associated with the same
2478 2 : // log as the immutable memtable, but logically occurs after it in
2479 2 : // seqnum space. We ensure while flushing that the flushable batch
2480 2 : // is flushed along with the previous memtable in the flushable
2481 2 : // queue. See the top level comment in DB.flush1 to learn how this
2482 2 : // is ensured.
2483 2 : //
2484 2 : // See DB.commitWrite for the special handling of log writes for large
2485 2 : // batches. In particular, the large batch has already written to
2486 2 : // imm.logNum.
2487 2 : entry := d.newFlushableEntry(b.flushable, imm.logNum, b.SeqNum())
2488 2 : // The large batch is by definition large. Reserve space from the cache
2489 2 : // for it until it is flushed.
2490 2 : entry.releaseMemAccounting = d.opts.Cache.Reserve(int(b.flushable.totalBytes()))
2491 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2492 2 : }
2493 :
2494 2 : var logSeqNum uint64
2495 2 : if b != nil {
2496 2 : logSeqNum = b.SeqNum()
2497 2 : if b.flushable != nil {
2498 2 : logSeqNum += uint64(b.Count())
2499 2 : }
2500 2 : } else {
2501 2 : logSeqNum = d.mu.versions.logSeqNum.Load()
2502 2 : }
2503 2 : d.rotateMemtable(newLogNum, logSeqNum, immMem)
2504 2 : force = false
2505 : }
2506 : }
2507 :
2508 : // Both DB.mu and commitPipeline.mu must be held by the caller.
2509 2 : func (d *DB) rotateMemtable(newLogNum FileNum, logSeqNum uint64, prev *memTable) {
2510 2 : // Create a new memtable, scheduling the previous one for flushing. We do
2511 2 : // this even if the previous memtable was empty because the DB.Flush
2512 2 : // mechanism is dependent on being able to wait for the empty memtable to
2513 2 : // flush. We can't just mark the empty memtable as flushed here because we
2514 2 : // also have to wait for all previous immutable tables to
2515 2 : // flush. Additionally, the memtable is tied to particular WAL file and we
2516 2 : // want to go through the flush path in order to recycle that WAL file.
2517 2 : //
2518 2 : // NB: newLogNum corresponds to the WAL that contains mutations that are
2519 2 : // present in the new memtable. When immutable memtables are flushed to
2520 2 : // disk, a VersionEdit will be created telling the manifest the minimum
2521 2 : // unflushed log number (which will be the next one in d.mu.mem.mutable
2522 2 : // that was not flushed).
2523 2 : //
2524 2 : // NB: prev should be the current mutable memtable.
2525 2 : var entry *flushableEntry
2526 2 : d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum)
2527 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2528 2 : d.updateReadStateLocked(nil)
2529 2 : if prev.writerUnref() {
2530 2 : d.maybeScheduleFlush()
2531 2 : }
2532 : }
2533 :
2534 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2535 : // may be released and reacquired.
2536 2 : func (d *DB) recycleWAL() (newLogNum FileNum, prevLogSize uint64) {
2537 2 : if d.opts.DisableWAL {
2538 0 : panic("pebble: invalid function call")
2539 : }
2540 :
2541 2 : jobID := d.mu.nextJobID
2542 2 : d.mu.nextJobID++
2543 2 : newLogNum = d.mu.versions.getNextFileNum()
2544 2 :
2545 2 : prevLogSize = uint64(d.mu.log.Size())
2546 2 :
2547 2 : // The previous log may have grown past its original physical
2548 2 : // size. Update its file size in the queue so we have a proper
2549 2 : // accounting of its file size.
2550 2 : if d.mu.log.queue[len(d.mu.log.queue)-1].fileSize < prevLogSize {
2551 2 : d.mu.log.queue[len(d.mu.log.queue)-1].fileSize = prevLogSize
2552 2 : }
2553 2 : d.mu.Unlock()
2554 2 :
2555 2 : var err error
2556 2 : // Close the previous log first. This writes an EOF trailer
2557 2 : // signifying the end of the file and syncs it to disk. We must
2558 2 : // close the previous log before linking the new log file,
2559 2 : // otherwise a crash could leave both logs with unclean tails, and
2560 2 : // Open will treat the previous log as corrupt.
2561 2 : err = d.mu.log.LogWriter.Close()
2562 2 : metrics := d.mu.log.LogWriter.Metrics()
2563 2 : d.mu.Lock()
2564 2 : if err := d.mu.log.metrics.Merge(metrics); err != nil {
2565 0 : d.opts.Logger.Infof("metrics error: %s", err)
2566 0 : }
2567 2 : d.mu.Unlock()
2568 2 :
2569 2 : newLogName := base.MakeFilepath(d.opts.FS, d.walDirname, fileTypeLog, newLogNum.DiskFileNum())
2570 2 :
2571 2 : // Try to use a recycled log file. Recycling log files is an important
2572 2 : // performance optimization as it is faster to sync a file that has
2573 2 : // already been written, than one which is being written for the first
2574 2 : // time. This is due to the need to sync file metadata when a file is
2575 2 : // being written for the first time. Note this is true even if file
2576 2 : // preallocation is performed (e.g. fallocate).
2577 2 : var recycleLog fileInfo
2578 2 : var recycleOK bool
2579 2 : var newLogFile vfs.File
2580 2 : if err == nil {
2581 2 : recycleLog, recycleOK = d.logRecycler.peek()
2582 2 : if recycleOK {
2583 1 : recycleLogName := base.MakeFilepath(d.opts.FS, d.walDirname, fileTypeLog, recycleLog.fileNum)
2584 1 : newLogFile, err = d.opts.FS.ReuseForWrite(recycleLogName, newLogName)
2585 1 : base.MustExist(d.opts.FS, newLogName, d.opts.Logger, err)
2586 2 : } else {
2587 2 : newLogFile, err = d.opts.FS.Create(newLogName)
2588 2 : base.MustExist(d.opts.FS, newLogName, d.opts.Logger, err)
2589 2 : }
2590 : }
2591 :
2592 2 : var newLogSize uint64
2593 2 : if err == nil && recycleOK {
2594 1 : // Figure out the recycled WAL size. This Stat is necessary
2595 1 : // because ReuseForWrite's contract allows for removing the
2596 1 : // old file and creating a new one. We don't know whether the
2597 1 : // WAL was actually recycled.
2598 1 : // TODO(jackson): Adding a boolean to the ReuseForWrite return
2599 1 : // value indicating whether or not the file was actually
2600 1 : // reused would allow us to skip the stat and use
2601 1 : // recycleLog.fileSize.
2602 1 : var finfo os.FileInfo
2603 1 : finfo, err = newLogFile.Stat()
2604 1 : if err == nil {
2605 1 : newLogSize = uint64(finfo.Size())
2606 1 : }
2607 : }
2608 :
2609 2 : if err == nil {
2610 2 : // TODO(peter): RocksDB delays sync of the parent directory until the
2611 2 : // first time the log is synced. Is that worthwhile?
2612 2 : err = d.walDir.Sync()
2613 2 : }
2614 :
2615 2 : if err != nil && newLogFile != nil {
2616 0 : newLogFile.Close()
2617 2 : } else if err == nil {
2618 2 : newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{
2619 2 : NoSyncOnClose: d.opts.NoSyncOnClose,
2620 2 : BytesPerSync: d.opts.WALBytesPerSync,
2621 2 : PreallocateSize: d.walPreallocateSize(),
2622 2 : })
2623 2 : }
2624 :
2625 2 : if recycleOK {
2626 1 : err = firstError(err, d.logRecycler.pop(recycleLog.fileNum.FileNum()))
2627 1 : }
2628 :
2629 2 : d.opts.EventListener.WALCreated(WALCreateInfo{
2630 2 : JobID: jobID,
2631 2 : Path: newLogName,
2632 2 : FileNum: newLogNum,
2633 2 : RecycledFileNum: recycleLog.fileNum.FileNum(),
2634 2 : Err: err,
2635 2 : })
2636 2 :
2637 2 : d.mu.Lock()
2638 2 :
2639 2 : d.mu.versions.metrics.WAL.Files++
2640 2 :
2641 2 : if err != nil {
2642 0 : // TODO(peter): avoid chewing through file numbers in a tight loop if there
2643 0 : // is an error here.
2644 0 : //
2645 0 : // What to do here? Stumbling on doesn't seem worthwhile. If we failed to
2646 0 : // close the previous log it is possible we lost a write.
2647 0 : panic(err)
2648 : }
2649 :
2650 2 : d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum.DiskFileNum(), fileSize: newLogSize})
2651 2 : d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{
2652 2 : WALFsyncLatency: d.mu.log.metrics.fsyncLatency,
2653 2 : WALMinSyncInterval: d.opts.WALMinSyncInterval,
2654 2 : QueueSemChan: d.commit.logSyncQSem,
2655 2 : })
2656 2 : if d.mu.log.registerLogWriterForTesting != nil {
2657 0 : d.mu.log.registerLogWriterForTesting(d.mu.log.LogWriter)
2658 0 : }
2659 :
2660 2 : return
2661 : }
2662 :
2663 2 : func (d *DB) getEarliestUnflushedSeqNumLocked() uint64 {
2664 2 : seqNum := InternalKeySeqNumMax
2665 2 : for i := range d.mu.mem.queue {
2666 2 : logSeqNum := d.mu.mem.queue[i].logSeqNum
2667 2 : if seqNum > logSeqNum {
2668 2 : seqNum = logSeqNum
2669 2 : }
2670 : }
2671 2 : return seqNum
2672 : }
2673 :
2674 2 : func (d *DB) getInProgressCompactionInfoLocked(finishing *compaction) (rv []compactionInfo) {
2675 2 : for c := range d.mu.compact.inProgress {
2676 2 : if len(c.flushing) == 0 && (finishing == nil || c != finishing) {
2677 2 : info := compactionInfo{
2678 2 : versionEditApplied: c.versionEditApplied,
2679 2 : inputs: c.inputs,
2680 2 : smallest: c.smallest,
2681 2 : largest: c.largest,
2682 2 : outputLevel: -1,
2683 2 : }
2684 2 : if c.outputLevel != nil {
2685 2 : info.outputLevel = c.outputLevel.level
2686 2 : }
2687 2 : rv = append(rv, info)
2688 : }
2689 : }
2690 2 : return
2691 : }
2692 :
2693 2 : func inProgressL0Compactions(inProgress []compactionInfo) []manifest.L0Compaction {
2694 2 : var compactions []manifest.L0Compaction
2695 2 : for _, info := range inProgress {
2696 2 : // Skip in-progress compactions that have already committed; the L0
2697 2 : // sublevels initialization code requires the set of in-progress
2698 2 : // compactions to be consistent with the current version. Compactions
2699 2 : // with versionEditApplied=true are already applied to the current
2700 2 : // version and but are performing cleanup without the database mutex.
2701 2 : if info.versionEditApplied {
2702 2 : continue
2703 : }
2704 2 : l0 := false
2705 2 : for _, cl := range info.inputs {
2706 2 : l0 = l0 || cl.level == 0
2707 2 : }
2708 2 : if !l0 {
2709 2 : continue
2710 : }
2711 2 : compactions = append(compactions, manifest.L0Compaction{
2712 2 : Smallest: info.smallest,
2713 2 : Largest: info.largest,
2714 2 : IsIntraL0: info.outputLevel == 0,
2715 2 : })
2716 : }
2717 2 : return compactions
2718 : }
2719 :
2720 : // firstError returns the first non-nil error of err0 and err1, or nil if both
2721 : // are nil.
2722 2 : func firstError(err0, err1 error) error {
2723 2 : if err0 != nil {
2724 2 : return err0
2725 2 : }
2726 2 : return err1
2727 : }
2728 :
2729 : // SetCreatorID sets the CreatorID which is needed in order to use shared objects.
2730 : // Remote object usage is disabled until this method is called the first time.
2731 : // Once set, the Creator ID is persisted and cannot change.
2732 : //
2733 : // Does nothing if SharedStorage was not set in the options when the DB was
2734 : // opened or if the DB is in read-only mode.
2735 2 : func (d *DB) SetCreatorID(creatorID uint64) error {
2736 2 : if d.opts.Experimental.RemoteStorage == nil || d.opts.ReadOnly {
2737 0 : return nil
2738 0 : }
2739 2 : return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
2740 : }
2741 :
2742 : // KeyStatistics keeps track of the number of keys that have been pinned by a
2743 : // snapshot as well as counts of the different key kinds in the lsm.
2744 : type KeyStatistics struct {
2745 : // when a compaction determines a key is obsolete, but cannot elide the key
2746 : // because it's required by an open snapshot.
2747 : SnapshotPinnedKeys int
2748 : // the total number of bytes of all snapshot pinned keys.
2749 : SnapshotPinnedKeysBytes uint64
2750 : // Note: these fields are currently only populated for point keys (including range deletes).
2751 : KindsCount [InternalKeyKindMax + 1]int
2752 : }
2753 :
2754 : // LSMKeyStatistics is used by DB.ScanStatistics.
2755 : type LSMKeyStatistics struct {
2756 : Accumulated KeyStatistics
2757 : // Levels contains statistics only for point keys. Range deletions and range keys will
2758 : // appear in Accumulated but not Levels.
2759 : Levels [numLevels]KeyStatistics
2760 : // BytesRead represents the logical, pre-compression size of keys and values read
2761 : BytesRead uint64
2762 : }
2763 :
2764 : // ScanStatisticsOptions is used by DB.ScanStatistics.
2765 : type ScanStatisticsOptions struct {
2766 : // LimitBytesPerSecond indicates the number of bytes that are able to be read
2767 : // per second using ScanInternal.
2768 : // A value of 0 indicates that there is no limit set.
2769 : LimitBytesPerSecond int64
2770 : }
2771 :
2772 : // ScanStatistics returns the count of different key kinds within the lsm for a
2773 : // key span [lower, upper) as well as the number of snapshot keys.
2774 : func (d *DB) ScanStatistics(
2775 : ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions,
2776 1 : ) (LSMKeyStatistics, error) {
2777 1 : stats := LSMKeyStatistics{}
2778 1 : var prevKey InternalKey
2779 1 : var rateLimitFunc func(key *InternalKey, val LazyValue) error
2780 1 : tb := tokenbucket.TokenBucket{}
2781 1 :
2782 1 : if opts.LimitBytesPerSecond != 0 {
2783 0 : // Each "token" roughly corresponds to a byte that was read.
2784 0 : tb.Init(tokenbucket.TokensPerSecond(opts.LimitBytesPerSecond), tokenbucket.Tokens(1024))
2785 0 : rateLimitFunc = func(key *InternalKey, val LazyValue) error {
2786 0 : return tb.WaitCtx(ctx, tokenbucket.Tokens(key.Size()+val.Len()))
2787 0 : }
2788 : }
2789 :
2790 1 : scanInternalOpts := &scanInternalOptions{
2791 1 : visitPointKey: func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
2792 1 : // If the previous key is equal to the current point key, the current key was
2793 1 : // pinned by a snapshot.
2794 1 : size := uint64(key.Size())
2795 1 : kind := key.Kind()
2796 1 : if iterInfo.Kind == IteratorLevelLSM && d.equal(prevKey.UserKey, key.UserKey) {
2797 1 : stats.Levels[iterInfo.Level].SnapshotPinnedKeys++
2798 1 : stats.Levels[iterInfo.Level].SnapshotPinnedKeysBytes += size
2799 1 : stats.Accumulated.SnapshotPinnedKeys++
2800 1 : stats.Accumulated.SnapshotPinnedKeysBytes += size
2801 1 : }
2802 1 : if iterInfo.Kind == IteratorLevelLSM {
2803 1 : stats.Levels[iterInfo.Level].KindsCount[kind]++
2804 1 : }
2805 :
2806 1 : stats.Accumulated.KindsCount[kind]++
2807 1 : prevKey.CopyFrom(*key)
2808 1 : stats.BytesRead += uint64(key.Size() + value.Len())
2809 1 : return nil
2810 : },
2811 0 : visitRangeDel: func(start, end []byte, seqNum uint64) error {
2812 0 : stats.Accumulated.KindsCount[InternalKeyKindRangeDelete]++
2813 0 : stats.BytesRead += uint64(len(start) + len(end))
2814 0 : return nil
2815 0 : },
2816 0 : visitRangeKey: func(start, end []byte, keys []rangekey.Key) error {
2817 0 : stats.BytesRead += uint64(len(start) + len(end))
2818 0 : for _, key := range keys {
2819 0 : stats.Accumulated.KindsCount[key.Kind()]++
2820 0 : stats.BytesRead += uint64(len(key.Value) + len(key.Suffix))
2821 0 : }
2822 0 : return nil
2823 : },
2824 : includeObsoleteKeys: true,
2825 : IterOptions: IterOptions{
2826 : KeyTypes: IterKeyTypePointsAndRanges,
2827 : LowerBound: lower,
2828 : UpperBound: upper,
2829 : },
2830 : rateLimitFunc: rateLimitFunc,
2831 : }
2832 1 : iter := d.newInternalIter(snapshotIterOpts{}, scanInternalOpts)
2833 1 : defer iter.close()
2834 1 :
2835 1 : err := scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
2836 1 :
2837 1 : if err != nil {
2838 0 : return LSMKeyStatistics{}, err
2839 0 : }
2840 :
2841 1 : return stats, nil
2842 : }
2843 :
2844 : // ObjProvider returns the objstorage.Provider for this database. Meant to be
2845 : // used for internal purposes only.
2846 1 : func (d *DB) ObjProvider() objstorage.Provider {
2847 1 : return d.objProvider
2848 1 : }
2849 :
2850 2 : func (d *DB) checkVirtualBounds(m *fileMetadata) {
2851 2 : if !invariants.Enabled {
2852 0 : return
2853 0 : }
2854 :
2855 2 : if m.HasPointKeys {
2856 2 : pointIter, rangeDelIter, err := d.newIters(context.TODO(), m, nil, internalIterOpts{})
2857 2 : if err != nil {
2858 0 : panic(errors.Wrap(err, "pebble: error creating point iterator"))
2859 : }
2860 :
2861 2 : defer pointIter.Close()
2862 2 : if rangeDelIter != nil {
2863 1 : defer rangeDelIter.Close()
2864 1 : }
2865 :
2866 2 : pointKey, _ := pointIter.First()
2867 2 : var rangeDel *keyspan.Span
2868 2 : if rangeDelIter != nil {
2869 1 : rangeDel = rangeDelIter.First()
2870 1 : }
2871 :
2872 : // Check that the lower bound is tight.
2873 2 : if (rangeDel == nil || d.cmp(rangeDel.SmallestKey().UserKey, m.SmallestPointKey.UserKey) != 0) &&
2874 2 : (pointKey == nil || d.cmp(pointKey.UserKey, m.SmallestPointKey.UserKey) != 0) {
2875 0 : panic(errors.Newf("pebble: virtual sstable %s lower point key bound is not tight", m.FileNum))
2876 : }
2877 :
2878 2 : pointKey, _ = pointIter.Last()
2879 2 : rangeDel = nil
2880 2 : if rangeDelIter != nil {
2881 1 : rangeDel = rangeDelIter.Last()
2882 1 : }
2883 :
2884 : // Check that the upper bound is tight.
2885 2 : if (rangeDel == nil || d.cmp(rangeDel.LargestKey().UserKey, m.LargestPointKey.UserKey) != 0) &&
2886 2 : (pointKey == nil || d.cmp(pointKey.UserKey, m.LargestPointKey.UserKey) != 0) {
2887 0 : panic(errors.Newf("pebble: virtual sstable %s upper point key bound is not tight", m.FileNum))
2888 : }
2889 :
2890 : // Check that iterator keys are within bounds.
2891 2 : for key, _ := pointIter.First(); key != nil; key, _ = pointIter.Next() {
2892 2 : if d.cmp(key.UserKey, m.SmallestPointKey.UserKey) < 0 || d.cmp(key.UserKey, m.LargestPointKey.UserKey) > 0 {
2893 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.UserKey))
2894 : }
2895 : }
2896 :
2897 2 : if rangeDelIter != nil {
2898 1 : for key := rangeDelIter.First(); key != nil; key = rangeDelIter.Next() {
2899 1 : if d.cmp(key.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
2900 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey))
2901 : }
2902 :
2903 1 : if d.cmp(key.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
2904 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey))
2905 : }
2906 : }
2907 : }
2908 : }
2909 :
2910 2 : if !m.HasRangeKeys {
2911 2 : return
2912 2 : }
2913 :
2914 2 : rangeKeyIter, err := d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
2915 2 : defer rangeKeyIter.Close()
2916 2 :
2917 2 : if err != nil {
2918 0 : panic(errors.Wrap(err, "pebble: error creating range key iterator"))
2919 : }
2920 :
2921 : // Check that the lower bound is tight.
2922 2 : if d.cmp(rangeKeyIter.First().SmallestKey().UserKey, m.SmallestRangeKey.UserKey) != 0 {
2923 0 : panic(errors.Newf("pebble: virtual sstable %s lower range key bound is not tight", m.FileNum))
2924 : }
2925 :
2926 : // Check that upper bound is tight.
2927 2 : if d.cmp(rangeKeyIter.Last().LargestKey().UserKey, m.LargestRangeKey.UserKey) != 0 {
2928 0 : panic(errors.Newf("pebble: virtual sstable %s upper range key bound is not tight", m.FileNum))
2929 : }
2930 :
2931 2 : for key := rangeKeyIter.First(); key != nil; key = rangeKeyIter.Next() {
2932 2 : if d.cmp(key.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) < 0 {
2933 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey))
2934 : }
2935 2 : if d.cmp(key.LargestKey().UserKey, m.LargestRangeKey.UserKey) > 0 {
2936 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey))
2937 : }
2938 : }
2939 : }
|