Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package pebble provides an ordered key/value store.
6 : package pebble // import "github.com/cockroachdb/pebble"
7 :
8 : import (
9 : "context"
10 : "fmt"
11 : "io"
12 : "strconv"
13 : "sync"
14 : "sync/atomic"
15 : "time"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/arenaskl"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invalidating"
21 : "github.com/cockroachdb/pebble/internal/invariants"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/cockroachdb/pebble/internal/manual"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/remote"
28 : "github.com/cockroachdb/pebble/rangekey"
29 : "github.com/cockroachdb/pebble/record"
30 : "github.com/cockroachdb/pebble/sstable"
31 : "github.com/cockroachdb/pebble/vfs"
32 : "github.com/cockroachdb/pebble/vfs/atomicfs"
33 : "github.com/cockroachdb/pebble/wal"
34 : "github.com/cockroachdb/tokenbucket"
35 : "github.com/prometheus/client_golang/prometheus"
36 : )
37 :
38 : const (
39 : // minTableCacheSize is the minimum size of the table cache, for a single db.
40 : minTableCacheSize = 64
41 :
42 : // numNonTableCacheFiles is an approximation for the number of files
43 : // that we don't use for table caches, for a given db.
44 : numNonTableCacheFiles = 10
45 : )
46 :
47 : var (
48 : // ErrNotFound is returned when a get operation does not find the requested
49 : // key.
50 : ErrNotFound = base.ErrNotFound
51 : // ErrClosed is panicked when an operation is performed on a closed snapshot or
52 : // DB. Use errors.Is(err, ErrClosed) to check for this error.
53 : ErrClosed = errors.New("pebble: closed")
54 : // ErrReadOnly is returned when a write operation is performed on a read-only
55 : // database.
56 : ErrReadOnly = errors.New("pebble: read-only")
57 : // errNoSplit indicates that the user is trying to perform a range key
58 : // operation but the configured Comparer does not provide a Split
59 : // implementation.
60 : errNoSplit = errors.New("pebble: Comparer.Split required for range key operations")
61 : )
62 :
63 : // Reader is a readable key/value store.
64 : //
65 : // It is safe to call Get and NewIter from concurrent goroutines.
66 : type Reader interface {
67 : // Get gets the value for the given key. It returns ErrNotFound if the DB
68 : // does not contain the key.
69 : //
70 : // The caller should not modify the contents of the returned slice, but it is
71 : // safe to modify the contents of the argument after Get returns. The
72 : // returned slice will remain valid until the returned Closer is closed. On
73 : // success, the caller MUST call closer.Close() or a memory leak will occur.
74 : Get(key []byte) (value []byte, closer io.Closer, err error)
75 :
76 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
77 : // return false). The iterator can be positioned via a call to SeekGE,
78 : // SeekLT, First or Last.
79 : NewIter(o *IterOptions) (*Iterator, error)
80 :
81 : // NewIterWithContext is like NewIter, and additionally accepts a context
82 : // for tracing.
83 : NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error)
84 :
85 : // Close closes the Reader. It may or may not close any underlying io.Reader
86 : // or io.Writer, depending on how the DB was created.
87 : //
88 : // It is not safe to close a DB until all outstanding iterators are closed.
89 : // It is valid to call Close multiple times. Other methods should not be
90 : // called after the DB has been closed.
91 : Close() error
92 : }
93 :
94 : // Writer is a writable key/value store.
95 : //
96 : // Goroutine safety is dependent on the specific implementation.
97 : type Writer interface {
98 : // Apply the operations contained in the batch to the DB.
99 : //
100 : // It is safe to modify the contents of the arguments after Apply returns.
101 : Apply(batch *Batch, o *WriteOptions) error
102 :
103 : // Delete deletes the value for the given key. Deletes are blind all will
104 : // succeed even if the given key does not exist.
105 : //
106 : // It is safe to modify the contents of the arguments after Delete returns.
107 : Delete(key []byte, o *WriteOptions) error
108 :
109 : // DeleteSized behaves identically to Delete, but takes an additional
110 : // argument indicating the size of the value being deleted. DeleteSized
111 : // should be preferred when the caller has the expectation that there exists
112 : // a single internal KV pair for the key (eg, the key has not been
113 : // overwritten recently), and the caller knows the size of its value.
114 : //
115 : // DeleteSized will record the value size within the tombstone and use it to
116 : // inform compaction-picking heuristics which strive to reduce space
117 : // amplification in the LSM. This "calling your shot" mechanic allows the
118 : // storage engine to more accurately estimate and reduce space
119 : // amplification.
120 : //
121 : // It is safe to modify the contents of the arguments after DeleteSized
122 : // returns.
123 : DeleteSized(key []byte, valueSize uint32, _ *WriteOptions) error
124 :
125 : // SingleDelete is similar to Delete in that it deletes the value for the given key. Like Delete,
126 : // it is a blind operation that will succeed even if the given key does not exist.
127 : //
128 : // WARNING: Undefined (non-deterministic) behavior will result if a key is overwritten and
129 : // then deleted using SingleDelete. The record may appear deleted immediately, but be
130 : // resurrected at a later time after compactions have been performed. Or the record may
131 : // be deleted permanently. A Delete operation lays down a "tombstone" which shadows all
132 : // previous versions of a key. The SingleDelete operation is akin to "anti-matter" and will
133 : // only delete the most recently written version for a key. These different semantics allow
134 : // the DB to avoid propagating a SingleDelete operation during a compaction as soon as the
135 : // corresponding Set operation is encountered. These semantics require extreme care to handle
136 : // properly. Only use if you have a workload where the performance gain is critical and you
137 : // can guarantee that a record is written once and then deleted once.
138 : //
139 : // SingleDelete is internally transformed into a Delete if the most recent record for a key is either
140 : // a Merge or Delete record.
141 : //
142 : // It is safe to modify the contents of the arguments after SingleDelete returns.
143 : SingleDelete(key []byte, o *WriteOptions) error
144 :
145 : // DeleteRange deletes all of the point keys (and values) in the range
146 : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
147 : // delete overlapping range keys (eg, keys set via RangeKeySet).
148 : //
149 : // It is safe to modify the contents of the arguments after DeleteRange
150 : // returns.
151 : DeleteRange(start, end []byte, o *WriteOptions) error
152 :
153 : // LogData adds the specified to the batch. The data will be written to the
154 : // WAL, but not added to memtables or sstables. Log data is never indexed,
155 : // which makes it useful for testing WAL performance.
156 : //
157 : // It is safe to modify the contents of the argument after LogData returns.
158 : LogData(data []byte, opts *WriteOptions) error
159 :
160 : // Merge merges the value for the given key. The details of the merge are
161 : // dependent upon the configured merge operation.
162 : //
163 : // It is safe to modify the contents of the arguments after Merge returns.
164 : Merge(key, value []byte, o *WriteOptions) error
165 :
166 : // Set sets the value for the given key. It overwrites any previous value
167 : // for that key; a DB is not a multi-map.
168 : //
169 : // It is safe to modify the contents of the arguments after Set returns.
170 : Set(key, value []byte, o *WriteOptions) error
171 :
172 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
173 : // timestamp suffix to value. The suffix is optional. If any portion of the key
174 : // range [start, end) is already set by a range key with the same suffix value,
175 : // RangeKeySet overrides it.
176 : //
177 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
178 : RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error
179 :
180 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
181 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
182 : // range key. RangeKeyUnset only removes portions of range keys that fall within
183 : // the [start, end) key span, and only range keys with suffixes that exactly
184 : // match the unset suffix.
185 : //
186 : // It is safe to modify the contents of the arguments after RangeKeyUnset
187 : // returns.
188 : RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error
189 :
190 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
191 : // (inclusive on start, exclusive on end). It does not delete point keys (for
192 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
193 : // bounds, including those with or without suffixes.
194 : //
195 : // It is safe to modify the contents of the arguments after RangeKeyDelete
196 : // returns.
197 : RangeKeyDelete(start, end []byte, opts *WriteOptions) error
198 : }
199 :
200 : // CPUWorkHandle represents a handle used by the CPUWorkPermissionGranter API.
201 : type CPUWorkHandle interface {
202 : // Permitted indicates whether Pebble can use additional CPU resources.
203 : Permitted() bool
204 : }
205 :
206 : // CPUWorkPermissionGranter is used to request permission to opportunistically
207 : // use additional CPUs to speed up internal background work.
208 : type CPUWorkPermissionGranter interface {
209 : // GetPermission returns a handle regardless of whether permission is granted
210 : // or not. In the latter case, the handle is only useful for recording
211 : // the CPU time actually spent on this calling goroutine.
212 : GetPermission(time.Duration) CPUWorkHandle
213 : // CPUWorkDone must be called regardless of whether CPUWorkHandle.Permitted
214 : // returns true or false.
215 : CPUWorkDone(CPUWorkHandle)
216 : }
217 :
218 : // Use a default implementation for the CPU work granter to avoid excessive nil
219 : // checks in the code.
220 : type defaultCPUWorkHandle struct{}
221 :
222 1 : func (d defaultCPUWorkHandle) Permitted() bool {
223 1 : return false
224 1 : }
225 :
226 : type defaultCPUWorkGranter struct{}
227 :
228 1 : func (d defaultCPUWorkGranter) GetPermission(_ time.Duration) CPUWorkHandle {
229 1 : return defaultCPUWorkHandle{}
230 1 : }
231 :
232 1 : func (d defaultCPUWorkGranter) CPUWorkDone(_ CPUWorkHandle) {}
233 :
234 : // DB provides a concurrent, persistent ordered key/value store.
235 : //
236 : // A DB's basic operations (Get, Set, Delete) should be self-explanatory. Get
237 : // and Delete will return ErrNotFound if the requested key is not in the store.
238 : // Callers are free to ignore this error.
239 : //
240 : // A DB also allows for iterating over the key/value pairs in key order. If d
241 : // is a DB, the code below prints all key/value pairs whose keys are 'greater
242 : // than or equal to' k:
243 : //
244 : // iter := d.NewIter(readOptions)
245 : // for iter.SeekGE(k); iter.Valid(); iter.Next() {
246 : // fmt.Printf("key=%q value=%q\n", iter.Key(), iter.Value())
247 : // }
248 : // return iter.Close()
249 : //
250 : // The Options struct holds the optional parameters for the DB, including a
251 : // Comparer to define a 'less than' relationship over keys. It is always valid
252 : // to pass a nil *Options, which means to use the default parameter values. Any
253 : // zero field of a non-nil *Options also means to use the default value for
254 : // that parameter. Thus, the code below uses a custom Comparer, but the default
255 : // values for every other parameter:
256 : //
257 : // db := pebble.Open(&Options{
258 : // Comparer: myComparer,
259 : // })
260 : type DB struct {
261 : // The count and size of referenced memtables. This includes memtables
262 : // present in DB.mu.mem.queue, as well as memtables that have been flushed
263 : // but are still referenced by an inuse readState, as well as up to one
264 : // memTable waiting to be reused and stored in d.memTableRecycle.
265 : memTableCount atomic.Int64
266 : memTableReserved atomic.Int64 // number of bytes reserved in the cache for memtables
267 : // memTableRecycle holds a pointer to an obsolete memtable. The next
268 : // memtable allocation will reuse this memtable if it has not already been
269 : // recycled.
270 : memTableRecycle atomic.Pointer[memTable]
271 :
272 : // The logical size of the current WAL.
273 : logSize atomic.Uint64
274 : // The number of input bytes to the log. This is the raw size of the
275 : // batches written to the WAL, without the overhead of the record
276 : // envelopes.
277 : logBytesIn atomic.Uint64
278 :
279 : // The number of bytes available on disk.
280 : diskAvailBytes atomic.Uint64
281 :
282 : cacheID uint64
283 : dirname string
284 : opts *Options
285 : cmp Compare
286 : equal Equal
287 : merge Merge
288 : split Split
289 : abbreviatedKey AbbreviatedKey
290 : // The threshold for determining when a batch is "large" and will skip being
291 : // inserted into a memtable.
292 : largeBatchThreshold uint64
293 : // The current OPTIONS file number.
294 : optionsFileNum base.DiskFileNum
295 : // The on-disk size of the current OPTIONS file.
296 : optionsFileSize uint64
297 :
298 : // objProvider is used to access and manage SSTs.
299 : objProvider objstorage.Provider
300 :
301 : fileLock *Lock
302 : dataDir vfs.File
303 :
304 : tableCache *tableCacheContainer
305 : newIters tableNewIters
306 : tableNewRangeKeyIter keyspanimpl.TableNewSpanIter
307 :
308 : commit *commitPipeline
309 :
310 : // readState provides access to the state needed for reading without needing
311 : // to acquire DB.mu.
312 : readState struct {
313 : sync.RWMutex
314 : val *readState
315 : }
316 :
317 : closed *atomic.Value
318 : closedCh chan struct{}
319 :
320 : cleanupManager *cleanupManager
321 :
322 : // During an iterator close, we may asynchronously schedule read compactions.
323 : // We want to wait for those goroutines to finish, before closing the DB.
324 : // compactionShedulers.Wait() should not be called while the DB.mu is held.
325 : compactionSchedulers sync.WaitGroup
326 :
327 : // The main mutex protecting internal DB state. This mutex encompasses many
328 : // fields because those fields need to be accessed and updated atomically. In
329 : // particular, the current version, log.*, mem.*, and snapshot list need to
330 : // be accessed and updated atomically during compaction.
331 : //
332 : // Care is taken to avoid holding DB.mu during IO operations. Accomplishing
333 : // this sometimes requires releasing DB.mu in a method that was called with
334 : // it held. See versionSet.logAndApply() and DB.makeRoomForWrite() for
335 : // examples. This is a common pattern, so be careful about expectations that
336 : // DB.mu will be held continuously across a set of calls.
337 : mu struct {
338 : sync.Mutex
339 :
340 : formatVers struct {
341 : // vers is the database's current format major version.
342 : // Backwards-incompatible features are gated behind new
343 : // format major versions and not enabled until a database's
344 : // version is ratcheted upwards.
345 : //
346 : // Although this is under the `mu` prefix, readers may read vers
347 : // atomically without holding d.mu. Writers must only write to this
348 : // value through finalizeFormatVersUpgrade which requires d.mu is
349 : // held.
350 : vers atomic.Uint64
351 : // marker is the atomic marker for the format major version.
352 : // When a database's version is ratcheted upwards, the
353 : // marker is moved in order to atomically record the new
354 : // version.
355 : marker *atomicfs.Marker
356 : // ratcheting when set to true indicates that the database is
357 : // currently in the process of ratcheting the format major version
358 : // to vers + 1. As a part of ratcheting the format major version,
359 : // migrations may drop and re-acquire the mutex.
360 : ratcheting bool
361 : }
362 :
363 : // The ID of the next job. Job IDs are passed to event listener
364 : // notifications and act as a mechanism for tying together the events and
365 : // log messages for a single job such as a flush, compaction, or file
366 : // ingestion. Job IDs are not serialized to disk or used for correctness.
367 : nextJobID JobID
368 :
369 : // The collection of immutable versions and state about the log and visible
370 : // sequence numbers. Use the pointer here to ensure the atomic fields in
371 : // version set are aligned properly.
372 : versions *versionSet
373 :
374 : log struct {
375 : // manager is not protected by mu, but calls to Create must be
376 : // serialized, and happen after the previous writer is closed.
377 : manager wal.Manager
378 : // The Writer is protected by commitPipeline.mu. This allows log writes
379 : // to be performed without holding DB.mu, but requires both
380 : // commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
381 : // (i.e. makeRoomForWrite). Can be nil.
382 : writer wal.Writer
383 : metrics struct {
384 : // fsyncLatency has its own internal synchronization, and is not
385 : // protected by mu.
386 : fsyncLatency prometheus.Histogram
387 : // Updated whenever a wal.Writer is closed.
388 : record.LogWriterMetrics
389 : }
390 : }
391 :
392 : mem struct {
393 : // The current mutable memTable. Readers of the pointer may hold
394 : // either DB.mu or commitPipeline.mu.
395 : //
396 : // Its internal fields are protected by commitPipeline.mu. This
397 : // allows batch commits to be performed without DB.mu as long as no
398 : // memtable rotation is required.
399 : //
400 : // Both commitPipeline.mu and DB.mu must be held when rotating the
401 : // memtable.
402 : mutable *memTable
403 : // Queue of flushables (the mutable memtable is at end). Elements are
404 : // added to the end of the slice and removed from the beginning. Once an
405 : // index is set it is never modified making a fixed slice immutable and
406 : // safe for concurrent reads.
407 : queue flushableList
408 : // nextSize is the size of the next memtable. The memtable size starts at
409 : // min(256KB,Options.MemTableSize) and doubles each time a new memtable
410 : // is allocated up to Options.MemTableSize. This reduces the memory
411 : // footprint of memtables when lots of DB instances are used concurrently
412 : // in test environments.
413 : nextSize uint64
414 : }
415 :
416 : compact struct {
417 : // Condition variable used to signal when a flush or compaction has
418 : // completed. Used by the write-stall mechanism to wait for the stall
419 : // condition to clear. See DB.makeRoomForWrite().
420 : cond sync.Cond
421 : // True when a flush is in progress.
422 : flushing bool
423 : // The number of ongoing non-download compactions.
424 : compactingCount int
425 : // The number of download compactions.
426 : downloadingCount int
427 : // The list of deletion hints, suggesting ranges for delete-only
428 : // compactions.
429 : deletionHints []deleteCompactionHint
430 : // The list of manual compactions. The next manual compaction to perform
431 : // is at the start of the list. New entries are added to the end.
432 : manual []*manualCompaction
433 : // downloads is the list of pending download tasks. The next download to
434 : // perform is at the start of the list. New entries are added to the end.
435 : downloads []*downloadSpanTask
436 : // inProgress is the set of in-progress flushes and compactions.
437 : // It's used in the calculation of some metrics and to initialize L0
438 : // sublevels' state. Some of the compactions contained within this
439 : // map may have already committed an edit to the version but are
440 : // lingering performing cleanup, like deleting obsolete files.
441 : inProgress map[*compaction]struct{}
442 :
443 : // rescheduleReadCompaction indicates to an iterator that a read compaction
444 : // should be scheduled.
445 : rescheduleReadCompaction bool
446 :
447 : // readCompactions is a readCompactionQueue which keeps track of the
448 : // compactions which we might have to perform.
449 : readCompactions readCompactionQueue
450 :
451 : // The cumulative duration of all completed compactions since Open.
452 : // Does not include flushes.
453 : duration time.Duration
454 : // Flush throughput metric.
455 : flushWriteThroughput ThroughputMetric
456 : // The idle start time for the flush "loop", i.e., when the flushing
457 : // bool above transitions to false.
458 : noOngoingFlushStartTime time.Time
459 : }
460 :
461 : // Non-zero when file cleaning is disabled. The disabled count acts as a
462 : // reference count to prohibit file cleaning. See
463 : // DB.{disable,Enable}FileDeletions().
464 : disableFileDeletions int
465 :
466 : snapshots struct {
467 : // The list of active snapshots.
468 : snapshotList
469 :
470 : // The cumulative count and size of snapshot-pinned keys written to
471 : // sstables.
472 : cumulativePinnedCount uint64
473 : cumulativePinnedSize uint64
474 : }
475 :
476 : tableStats struct {
477 : // Condition variable used to signal the completion of a
478 : // job to collect table stats.
479 : cond sync.Cond
480 : // True when a stat collection operation is in progress.
481 : loading bool
482 : // True if stat collection has loaded statistics for all tables
483 : // other than those listed explicitly in pending. This flag starts
484 : // as false when a database is opened and flips to true once stat
485 : // collection has caught up.
486 : loadedInitial bool
487 : // A slice of files for which stats have not been computed.
488 : // Compactions, ingests, flushes append files to be processed. An
489 : // active stat collection goroutine clears the list and processes
490 : // them.
491 : pending []manifest.NewFileEntry
492 : }
493 :
494 : tableValidation struct {
495 : // cond is a condition variable used to signal the completion of a
496 : // job to validate one or more sstables.
497 : cond sync.Cond
498 : // pending is a slice of metadata for sstables waiting to be
499 : // validated. Only physical sstables should be added to the pending
500 : // queue.
501 : pending []newFileEntry
502 : // validating is set to true when validation is running.
503 : validating bool
504 : }
505 : }
506 :
507 : // Normally equal to time.Now() but may be overridden in tests.
508 : timeNow func() time.Time
509 : // the time at database Open; may be used to compute metrics like effective
510 : // compaction concurrency
511 : openedAt time.Time
512 : }
513 :
514 : var _ Reader = (*DB)(nil)
515 : var _ Writer = (*DB)(nil)
516 :
517 : // TestOnlyWaitForCleaning MUST only be used in tests.
518 1 : func (d *DB) TestOnlyWaitForCleaning() {
519 1 : d.cleanupManager.Wait()
520 1 : }
521 :
522 : // Get gets the value for the given key. It returns ErrNotFound if the DB does
523 : // not contain the key.
524 : //
525 : // The caller should not modify the contents of the returned slice, but it is
526 : // safe to modify the contents of the argument after Get returns. The returned
527 : // slice will remain valid until the returned Closer is closed. On success, the
528 : // caller MUST call closer.Close() or a memory leak will occur.
529 1 : func (d *DB) Get(key []byte) ([]byte, io.Closer, error) {
530 1 : return d.getInternal(key, nil /* batch */, nil /* snapshot */)
531 1 : }
532 :
533 : type getIterAlloc struct {
534 : dbi Iterator
535 : keyBuf []byte
536 : get getIter
537 : }
538 :
539 : var getIterAllocPool = sync.Pool{
540 1 : New: func() interface{} {
541 1 : return &getIterAlloc{}
542 1 : },
543 : }
544 :
545 1 : func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer, error) {
546 1 : if err := d.closed.Load(); err != nil {
547 1 : panic(err)
548 : }
549 :
550 : // Grab and reference the current readState. This prevents the underlying
551 : // files in the associated version from being deleted if there is a current
552 : // compaction. The readState is unref'd by Iterator.Close().
553 1 : readState := d.loadReadState()
554 1 :
555 1 : // Determine the seqnum to read at after grabbing the read state (current and
556 1 : // memtables) above.
557 1 : var seqNum base.SeqNum
558 1 : if s != nil {
559 1 : seqNum = s.seqNum
560 1 : } else {
561 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
562 1 : }
563 :
564 1 : buf := getIterAllocPool.Get().(*getIterAlloc)
565 1 :
566 1 : get := &buf.get
567 1 : *get = getIter{
568 1 : comparer: d.opts.Comparer,
569 1 : newIters: d.newIters,
570 1 : snapshot: seqNum,
571 1 : iterOpts: IterOptions{
572 1 : // TODO(sumeer): replace with a parameter provided by the caller.
573 1 : CategoryAndQoS: sstable.CategoryAndQoS{
574 1 : Category: "pebble-get",
575 1 : QoSLevel: sstable.LatencySensitiveQoSLevel,
576 1 : },
577 1 : logger: d.opts.Logger,
578 1 : snapshotForHideObsoletePoints: seqNum,
579 1 : },
580 1 : key: key,
581 1 : // Compute the key prefix for bloom filtering.
582 1 : prefix: key[:d.opts.Comparer.Split(key)],
583 1 : batch: b,
584 1 : mem: readState.memtables,
585 1 : l0: readState.current.L0SublevelFiles,
586 1 : version: readState.current,
587 1 : }
588 1 :
589 1 : // Strip off memtables which cannot possibly contain the seqNum being read
590 1 : // at.
591 1 : for len(get.mem) > 0 {
592 1 : n := len(get.mem)
593 1 : if logSeqNum := get.mem[n-1].logSeqNum; logSeqNum < seqNum {
594 1 : break
595 : }
596 1 : get.mem = get.mem[:n-1]
597 : }
598 :
599 1 : i := &buf.dbi
600 1 : pointIter := get
601 1 : *i = Iterator{
602 1 : ctx: context.Background(),
603 1 : getIterAlloc: buf,
604 1 : iter: pointIter,
605 1 : pointIter: pointIter,
606 1 : merge: d.merge,
607 1 : comparer: *d.opts.Comparer,
608 1 : readState: readState,
609 1 : keyBuf: buf.keyBuf,
610 1 : }
611 1 :
612 1 : if !i.First() {
613 1 : err := i.Close()
614 1 : if err != nil {
615 1 : return nil, nil, err
616 1 : }
617 1 : return nil, nil, ErrNotFound
618 : }
619 1 : return i.Value(), i, nil
620 : }
621 :
622 : // Set sets the value for the given key. It overwrites any previous value
623 : // for that key; a DB is not a multi-map.
624 : //
625 : // It is safe to modify the contents of the arguments after Set returns.
626 1 : func (d *DB) Set(key, value []byte, opts *WriteOptions) error {
627 1 : b := newBatch(d)
628 1 : _ = b.Set(key, value, opts)
629 1 : if err := d.Apply(b, opts); err != nil {
630 1 : return err
631 1 : }
632 : // Only release the batch on success.
633 1 : return b.Close()
634 : }
635 :
636 : // Delete deletes the value for the given key. Deletes are blind all will
637 : // succeed even if the given key does not exist.
638 : //
639 : // It is safe to modify the contents of the arguments after Delete returns.
640 1 : func (d *DB) Delete(key []byte, opts *WriteOptions) error {
641 1 : b := newBatch(d)
642 1 : _ = b.Delete(key, opts)
643 1 : if err := d.Apply(b, opts); err != nil {
644 1 : return err
645 1 : }
646 : // Only release the batch on success.
647 1 : return b.Close()
648 : }
649 :
650 : // DeleteSized behaves identically to Delete, but takes an additional
651 : // argument indicating the size of the value being deleted. DeleteSized
652 : // should be preferred when the caller has the expectation that there exists
653 : // a single internal KV pair for the key (eg, the key has not been
654 : // overwritten recently), and the caller knows the size of its value.
655 : //
656 : // DeleteSized will record the value size within the tombstone and use it to
657 : // inform compaction-picking heuristics which strive to reduce space
658 : // amplification in the LSM. This "calling your shot" mechanic allows the
659 : // storage engine to more accurately estimate and reduce space amplification.
660 : //
661 : // It is safe to modify the contents of the arguments after DeleteSized
662 : // returns.
663 1 : func (d *DB) DeleteSized(key []byte, valueSize uint32, opts *WriteOptions) error {
664 1 : b := newBatch(d)
665 1 : _ = b.DeleteSized(key, valueSize, opts)
666 1 : if err := d.Apply(b, opts); err != nil {
667 0 : return err
668 0 : }
669 : // Only release the batch on success.
670 1 : return b.Close()
671 : }
672 :
673 : // SingleDelete adds an action to the batch that single deletes the entry for key.
674 : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
675 : //
676 : // It is safe to modify the contents of the arguments after SingleDelete returns.
677 1 : func (d *DB) SingleDelete(key []byte, opts *WriteOptions) error {
678 1 : b := newBatch(d)
679 1 : _ = b.SingleDelete(key, opts)
680 1 : if err := d.Apply(b, opts); err != nil {
681 0 : return err
682 0 : }
683 : // Only release the batch on success.
684 1 : return b.Close()
685 : }
686 :
687 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
688 : // (inclusive on start, exclusive on end).
689 : //
690 : // It is safe to modify the contents of the arguments after DeleteRange
691 : // returns.
692 1 : func (d *DB) DeleteRange(start, end []byte, opts *WriteOptions) error {
693 1 : b := newBatch(d)
694 1 : _ = b.DeleteRange(start, end, opts)
695 1 : if err := d.Apply(b, opts); err != nil {
696 1 : return err
697 1 : }
698 : // Only release the batch on success.
699 1 : return b.Close()
700 : }
701 :
702 : // Merge adds an action to the DB that merges the value at key with the new
703 : // value. The details of the merge are dependent upon the configured merge
704 : // operator.
705 : //
706 : // It is safe to modify the contents of the arguments after Merge returns.
707 1 : func (d *DB) Merge(key, value []byte, opts *WriteOptions) error {
708 1 : b := newBatch(d)
709 1 : _ = b.Merge(key, value, opts)
710 1 : if err := d.Apply(b, opts); err != nil {
711 1 : return err
712 1 : }
713 : // Only release the batch on success.
714 1 : return b.Close()
715 : }
716 :
717 : // LogData adds the specified to the batch. The data will be written to the
718 : // WAL, but not added to memtables or sstables. Log data is never indexed,
719 : // which makes it useful for testing WAL performance.
720 : //
721 : // It is safe to modify the contents of the argument after LogData returns.
722 1 : func (d *DB) LogData(data []byte, opts *WriteOptions) error {
723 1 : b := newBatch(d)
724 1 : _ = b.LogData(data, opts)
725 1 : if err := d.Apply(b, opts); err != nil {
726 1 : return err
727 1 : }
728 : // Only release the batch on success.
729 1 : return b.Close()
730 : }
731 :
732 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
733 : // timestamp suffix to value. The suffix is optional. If any portion of the key
734 : // range [start, end) is already set by a range key with the same suffix value,
735 : // RangeKeySet overrides it.
736 : //
737 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
738 1 : func (d *DB) RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error {
739 1 : b := newBatch(d)
740 1 : _ = b.RangeKeySet(start, end, suffix, value, opts)
741 1 : if err := d.Apply(b, opts); err != nil {
742 0 : return err
743 0 : }
744 : // Only release the batch on success.
745 1 : return b.Close()
746 : }
747 :
748 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
749 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
750 : // range key. RangeKeyUnset only removes portions of range keys that fall within
751 : // the [start, end) key span, and only range keys with suffixes that exactly
752 : // match the unset suffix.
753 : //
754 : // It is safe to modify the contents of the arguments after RangeKeyUnset
755 : // returns.
756 1 : func (d *DB) RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error {
757 1 : b := newBatch(d)
758 1 : _ = b.RangeKeyUnset(start, end, suffix, opts)
759 1 : if err := d.Apply(b, opts); err != nil {
760 0 : return err
761 0 : }
762 : // Only release the batch on success.
763 1 : return b.Close()
764 : }
765 :
766 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
767 : // (inclusive on start, exclusive on end). It does not delete point keys (for
768 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
769 : // bounds, including those with or without suffixes.
770 : //
771 : // It is safe to modify the contents of the arguments after RangeKeyDelete
772 : // returns.
773 1 : func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error {
774 1 : b := newBatch(d)
775 1 : _ = b.RangeKeyDelete(start, end, opts)
776 1 : if err := d.Apply(b, opts); err != nil {
777 0 : return err
778 0 : }
779 : // Only release the batch on success.
780 1 : return b.Close()
781 : }
782 :
783 : // Apply the operations contained in the batch to the DB. If the batch is large
784 : // the contents of the batch may be retained by the database. If that occurs
785 : // the batch contents will be cleared preventing the caller from attempting to
786 : // reuse them.
787 : //
788 : // It is safe to modify the contents of the arguments after Apply returns.
789 : //
790 : // Apply returns ErrInvalidBatch if the provided batch is invalid in any way.
791 1 : func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
792 1 : return d.applyInternal(batch, opts, false)
793 1 : }
794 :
795 : // ApplyNoSyncWait must only be used when opts.Sync is true and the caller
796 : // does not want to wait for the WAL fsync to happen. The method will return
797 : // once the mutation is applied to the memtable and is visible (note that a
798 : // mutation is visible before the WAL sync even in the wait case, so we have
799 : // not weakened the durability semantics). The caller must call Batch.SyncWait
800 : // to wait for the WAL fsync. The caller must not Close the batch without
801 : // first calling Batch.SyncWait.
802 : //
803 : // RECOMMENDATION: Prefer using Apply unless you really understand why you
804 : // need ApplyNoSyncWait.
805 : // EXPERIMENTAL: API/feature subject to change. Do not yet use outside
806 : // CockroachDB.
807 1 : func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error {
808 1 : if !opts.Sync {
809 0 : return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false")
810 0 : }
811 1 : return d.applyInternal(batch, opts, true)
812 : }
813 :
814 : // REQUIRES: noSyncWait => opts.Sync
815 1 : func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error {
816 1 : if err := d.closed.Load(); err != nil {
817 1 : panic(err)
818 : }
819 1 : if batch.committing {
820 0 : panic("pebble: batch already committing")
821 : }
822 1 : if batch.applied.Load() {
823 0 : panic("pebble: batch already applied")
824 : }
825 1 : if d.opts.ReadOnly {
826 1 : return ErrReadOnly
827 1 : }
828 1 : if batch.db != nil && batch.db != d {
829 1 : panic(fmt.Sprintf("pebble: batch db mismatch: %p != %p", batch.db, d))
830 : }
831 :
832 1 : sync := opts.GetSync()
833 1 : if sync && d.opts.DisableWAL {
834 0 : return errors.New("pebble: WAL disabled")
835 0 : }
836 :
837 1 : if fmv := d.FormatMajorVersion(); fmv < batch.minimumFormatMajorVersion {
838 0 : panic(fmt.Sprintf(
839 0 : "pebble: batch requires at least format major version %d (current: %d)",
840 0 : batch.minimumFormatMajorVersion, fmv,
841 0 : ))
842 : }
843 :
844 1 : if batch.countRangeKeys > 0 {
845 1 : if d.split == nil {
846 0 : return errNoSplit
847 0 : }
848 : }
849 1 : batch.committing = true
850 1 :
851 1 : if batch.db == nil {
852 1 : if err := batch.refreshMemTableSize(); err != nil {
853 0 : return err
854 0 : }
855 : }
856 1 : if batch.memTableSize >= d.largeBatchThreshold {
857 1 : var err error
858 1 : batch.flushable, err = newFlushableBatch(batch, d.opts.Comparer)
859 1 : if err != nil {
860 0 : return err
861 0 : }
862 : }
863 1 : if err := d.commit.Commit(batch, sync, noSyncWait); err != nil {
864 0 : // There isn't much we can do on an error here. The commit pipeline will be
865 0 : // horked at this point.
866 0 : d.opts.Logger.Fatalf("pebble: fatal commit error: %v", err)
867 0 : }
868 : // If this is a large batch, we need to clear the batch contents as the
869 : // flushable batch may still be present in the flushables queue.
870 : //
871 : // TODO(peter): Currently large batches are written to the WAL. We could
872 : // skip the WAL write and instead wait for the large batch to be flushed to
873 : // an sstable. For a 100 MB batch, this might actually be faster. For a 1
874 : // GB batch this is almost certainly faster.
875 1 : if batch.flushable != nil {
876 1 : batch.data = nil
877 1 : }
878 1 : return nil
879 : }
880 :
881 1 : func (d *DB) commitApply(b *Batch, mem *memTable) error {
882 1 : if b.flushable != nil {
883 1 : // This is a large batch which was already added to the immutable queue.
884 1 : return nil
885 1 : }
886 1 : err := mem.apply(b, b.SeqNum())
887 1 : if err != nil {
888 0 : return err
889 0 : }
890 :
891 : // If the batch contains range tombstones and the database is configured
892 : // to flush range deletions, schedule a delayed flush so that disk space
893 : // may be reclaimed without additional writes or an explicit flush.
894 1 : if b.countRangeDels > 0 && d.opts.FlushDelayDeleteRange > 0 {
895 1 : d.mu.Lock()
896 1 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayDeleteRange)
897 1 : d.mu.Unlock()
898 1 : }
899 :
900 : // If the batch contains range keys and the database is configured to flush
901 : // range keys, schedule a delayed flush so that the range keys are cleared
902 : // from the memtable.
903 1 : if b.countRangeKeys > 0 && d.opts.FlushDelayRangeKey > 0 {
904 1 : d.mu.Lock()
905 1 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayRangeKey)
906 1 : d.mu.Unlock()
907 1 : }
908 :
909 1 : if mem.writerUnref() {
910 1 : d.mu.Lock()
911 1 : d.maybeScheduleFlush()
912 1 : d.mu.Unlock()
913 1 : }
914 1 : return nil
915 : }
916 :
917 1 : func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
918 1 : var size int64
919 1 : repr := b.Repr()
920 1 :
921 1 : if b.flushable != nil {
922 1 : // We have a large batch. Such batches are special in that they don't get
923 1 : // added to the memtable, and are instead inserted into the queue of
924 1 : // memtables. The call to makeRoomForWrite with this batch will force the
925 1 : // current memtable to be flushed. We want the large batch to be part of
926 1 : // the same log, so we add it to the WAL here, rather than after the call
927 1 : // to makeRoomForWrite().
928 1 : //
929 1 : // Set the sequence number since it was not set to the correct value earlier
930 1 : // (see comment in newFlushableBatch()).
931 1 : b.flushable.setSeqNum(b.SeqNum())
932 1 : if !d.opts.DisableWAL {
933 1 : var err error
934 1 : size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
935 1 : if err != nil {
936 0 : panic(err)
937 : }
938 : }
939 : }
940 :
941 1 : var err error
942 1 : // Grab a reference to the memtable. We don't hold DB.mu, but we do hold
943 1 : // d.commit.mu. It's okay for readers of d.mu.mem.mutable to only hold one of
944 1 : // d.commit.mu or d.mu, because memtable rotations require holding both.
945 1 : mem := d.mu.mem.mutable
946 1 : // Batches which contain keys of kind InternalKeyKindIngestSST will
947 1 : // never be applied to the memtable, so we don't need to make room for
948 1 : // write.
949 1 : if !b.ingestedSSTBatch {
950 1 : // Flushable batches will require a rotation of the memtable regardless,
951 1 : // so only attempt an optimistic reservation of space in the current
952 1 : // memtable if this batch is not a large flushable batch.
953 1 : if b.flushable == nil {
954 1 : err = d.mu.mem.mutable.prepare(b)
955 1 : }
956 1 : if b.flushable != nil || err == arenaskl.ErrArenaFull {
957 1 : // Slow path.
958 1 : // We need to acquire DB.mu and rotate the memtable.
959 1 : func() {
960 1 : d.mu.Lock()
961 1 : defer d.mu.Unlock()
962 1 : err = d.makeRoomForWrite(b)
963 1 : mem = d.mu.mem.mutable
964 1 : }()
965 : }
966 : }
967 1 : if err != nil {
968 0 : return nil, err
969 0 : }
970 1 : if d.opts.DisableWAL {
971 1 : return mem, nil
972 1 : }
973 1 : d.logBytesIn.Add(uint64(len(repr)))
974 1 :
975 1 : if b.flushable == nil {
976 1 : size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
977 1 : if err != nil {
978 0 : panic(err)
979 : }
980 : }
981 :
982 1 : d.logSize.Store(uint64(size))
983 1 : return mem, err
984 : }
985 :
986 : type iterAlloc struct {
987 : dbi Iterator
988 : keyBuf []byte
989 : boundsBuf [2][]byte
990 : prefixOrFullSeekKey []byte
991 : merging mergingIter
992 : mlevels [3 + numLevels]mergingIterLevel
993 : levels [3 + numLevels]levelIter
994 : levelsPositioned [3 + numLevels]bool
995 : }
996 :
997 : var iterAllocPool = sync.Pool{
998 1 : New: func() interface{} {
999 1 : return &iterAlloc{}
1000 1 : },
1001 : }
1002 :
1003 : // snapshotIterOpts denotes snapshot-related iterator options when calling
1004 : // newIter. These are the possible cases for a snapshotIterOpts:
1005 : // - No snapshot: All fields are zero values.
1006 : // - Classic snapshot: Only `seqNum` is set. The latest readState will be used
1007 : // and the specified seqNum will be used as the snapshot seqNum.
1008 : // - EventuallyFileOnlySnapshot (EFOS) behaving as a classic snapshot. Only
1009 : // the `seqNum` is set. The latest readState will be used
1010 : // and the specified seqNum will be used as the snapshot seqNum.
1011 : // - EFOS in file-only state: Only `seqNum` and `vers` are set. All the
1012 : // relevant SSTs are referenced by the *version.
1013 : // - EFOS that has been excised but is in alwaysCreateIters mode (tests only).
1014 : // Only `seqNum` and `readState` are set.
1015 : type snapshotIterOpts struct {
1016 : seqNum base.SeqNum
1017 : vers *version
1018 : readState *readState
1019 : }
1020 :
1021 : type batchIterOpts struct {
1022 : batchOnly bool
1023 : }
1024 : type newIterOpts struct {
1025 : snapshot snapshotIterOpts
1026 : batch batchIterOpts
1027 : }
1028 :
1029 : // newIter constructs a new iterator, merging in batch iterators as an extra
1030 : // level.
1031 : func (d *DB) newIter(
1032 : ctx context.Context, batch *Batch, internalOpts newIterOpts, o *IterOptions,
1033 1 : ) *Iterator {
1034 1 : if internalOpts.batch.batchOnly {
1035 1 : if batch == nil {
1036 0 : panic("batchOnly is true, but batch is nil")
1037 : }
1038 1 : if internalOpts.snapshot.vers != nil {
1039 0 : panic("batchOnly is true, but snapshotIterOpts is initialized")
1040 : }
1041 : }
1042 1 : if err := d.closed.Load(); err != nil {
1043 1 : panic(err)
1044 : }
1045 1 : seqNum := internalOpts.snapshot.seqNum
1046 1 : if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
1047 0 : panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
1048 : }
1049 1 : if (batch != nil || seqNum != 0) && (o != nil && o.OnlyReadGuaranteedDurable) {
1050 1 : // We could add support for OnlyReadGuaranteedDurable on snapshots if
1051 1 : // there was a need: this would require checking that the sequence number
1052 1 : // of the snapshot has been flushed, by comparing with
1053 1 : // DB.mem.queue[0].logSeqNum.
1054 1 : panic("OnlyReadGuaranteedDurable is not supported for batches or snapshots")
1055 : }
1056 1 : var readState *readState
1057 1 : var newIters tableNewIters
1058 1 : var newIterRangeKey keyspanimpl.TableNewSpanIter
1059 1 : if !internalOpts.batch.batchOnly {
1060 1 : // Grab and reference the current readState. This prevents the underlying
1061 1 : // files in the associated version from being deleted if there is a current
1062 1 : // compaction. The readState is unref'd by Iterator.Close().
1063 1 : if internalOpts.snapshot.vers == nil {
1064 1 : if internalOpts.snapshot.readState != nil {
1065 0 : readState = internalOpts.snapshot.readState
1066 0 : readState.ref()
1067 1 : } else {
1068 1 : // NB: loadReadState() calls readState.ref().
1069 1 : readState = d.loadReadState()
1070 1 : }
1071 1 : } else {
1072 1 : // vers != nil
1073 1 : internalOpts.snapshot.vers.Ref()
1074 1 : }
1075 :
1076 : // Determine the seqnum to read at after grabbing the read state (current and
1077 : // memtables) above.
1078 1 : if seqNum == 0 {
1079 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
1080 1 : }
1081 1 : newIters = d.newIters
1082 1 : newIterRangeKey = d.tableNewRangeKeyIter
1083 : }
1084 :
1085 : // Bundle various structures under a single umbrella in order to allocate
1086 : // them together.
1087 1 : buf := iterAllocPool.Get().(*iterAlloc)
1088 1 : dbi := &buf.dbi
1089 1 : *dbi = Iterator{
1090 1 : ctx: ctx,
1091 1 : alloc: buf,
1092 1 : merge: d.merge,
1093 1 : comparer: *d.opts.Comparer,
1094 1 : readState: readState,
1095 1 : version: internalOpts.snapshot.vers,
1096 1 : keyBuf: buf.keyBuf,
1097 1 : prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
1098 1 : boundsBuf: buf.boundsBuf,
1099 1 : batch: batch,
1100 1 : newIters: newIters,
1101 1 : newIterRangeKey: newIterRangeKey,
1102 1 : seqNum: seqNum,
1103 1 : batchOnlyIter: internalOpts.batch.batchOnly,
1104 1 : }
1105 1 : if o != nil {
1106 1 : dbi.opts = *o
1107 1 : dbi.processBounds(o.LowerBound, o.UpperBound)
1108 1 : }
1109 1 : dbi.opts.logger = d.opts.Logger
1110 1 : if d.opts.private.disableLazyCombinedIteration {
1111 0 : dbi.opts.disableLazyCombinedIteration = true
1112 0 : }
1113 1 : if batch != nil {
1114 1 : dbi.batchSeqNum = dbi.batch.nextSeqNum()
1115 1 : }
1116 1 : return finishInitializingIter(ctx, buf)
1117 : }
1118 :
1119 : // finishInitializingIter is a helper for doing the non-trivial initialization
1120 : // of an Iterator. It's invoked to perform the initial initialization of an
1121 : // Iterator during NewIter or Clone, and to perform reinitialization due to a
1122 : // change in IterOptions by a call to Iterator.SetOptions.
1123 1 : func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
1124 1 : // Short-hand.
1125 1 : dbi := &buf.dbi
1126 1 : var memtables flushableList
1127 1 : if dbi.readState != nil {
1128 1 : memtables = dbi.readState.memtables
1129 1 : }
1130 1 : if dbi.opts.OnlyReadGuaranteedDurable {
1131 1 : memtables = nil
1132 1 : } else {
1133 1 : // We only need to read from memtables which contain sequence numbers older
1134 1 : // than seqNum. Trim off newer memtables.
1135 1 : for i := len(memtables) - 1; i >= 0; i-- {
1136 1 : if logSeqNum := memtables[i].logSeqNum; logSeqNum < dbi.seqNum {
1137 1 : break
1138 : }
1139 1 : memtables = memtables[:i]
1140 : }
1141 : }
1142 :
1143 1 : if dbi.opts.pointKeys() {
1144 1 : // Construct the point iterator, initializing dbi.pointIter to point to
1145 1 : // dbi.merging. If this is called during a SetOptions call and this
1146 1 : // Iterator has already initialized dbi.merging, constructPointIter is a
1147 1 : // noop and an initialized pointIter already exists in dbi.pointIter.
1148 1 : dbi.constructPointIter(ctx, memtables, buf)
1149 1 : dbi.iter = dbi.pointIter
1150 1 : } else {
1151 1 : dbi.iter = emptyIter
1152 1 : }
1153 :
1154 1 : if dbi.opts.rangeKeys() {
1155 1 : dbi.rangeKeyMasking.init(dbi, dbi.comparer.Compare, dbi.comparer.Split)
1156 1 :
1157 1 : // When iterating over both point and range keys, don't create the
1158 1 : // range-key iterator stack immediately if we can avoid it. This
1159 1 : // optimization takes advantage of the expected sparseness of range
1160 1 : // keys, and configures the point-key iterator to dynamically switch to
1161 1 : // combined iteration when it observes a file containing range keys.
1162 1 : //
1163 1 : // Lazy combined iteration is not possible if a batch or a memtable
1164 1 : // contains any range keys.
1165 1 : useLazyCombinedIteration := dbi.rangeKey == nil &&
1166 1 : dbi.opts.KeyTypes == IterKeyTypePointsAndRanges &&
1167 1 : (dbi.batch == nil || dbi.batch.countRangeKeys == 0) &&
1168 1 : !dbi.opts.disableLazyCombinedIteration
1169 1 : if useLazyCombinedIteration {
1170 1 : // The user requested combined iteration, and there's no indexed
1171 1 : // batch currently containing range keys that would prevent lazy
1172 1 : // combined iteration. Check the memtables to see if they contain
1173 1 : // any range keys.
1174 1 : for i := range memtables {
1175 1 : if memtables[i].containsRangeKeys() {
1176 1 : useLazyCombinedIteration = false
1177 1 : break
1178 : }
1179 : }
1180 : }
1181 :
1182 1 : if useLazyCombinedIteration {
1183 1 : dbi.lazyCombinedIter = lazyCombinedIter{
1184 1 : parent: dbi,
1185 1 : pointIter: dbi.pointIter,
1186 1 : combinedIterState: combinedIterState{
1187 1 : initialized: false,
1188 1 : },
1189 1 : }
1190 1 : dbi.iter = &dbi.lazyCombinedIter
1191 1 : dbi.iter = invalidating.MaybeWrapIfInvariants(dbi.iter)
1192 1 : } else {
1193 1 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{
1194 1 : initialized: true,
1195 1 : }
1196 1 : if dbi.rangeKey == nil {
1197 1 : dbi.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1198 1 : dbi.rangeKey.init(dbi.comparer.Compare, dbi.comparer.Split, &dbi.opts)
1199 1 : dbi.constructRangeKeyIter()
1200 1 : } else {
1201 1 : dbi.rangeKey.iterConfig.SetBounds(dbi.opts.LowerBound, dbi.opts.UpperBound)
1202 1 : }
1203 :
1204 : // Wrap the point iterator (currently dbi.iter) with an interleaving
1205 : // iterator that interleaves range keys pulled from
1206 : // dbi.rangeKey.rangeKeyIter.
1207 : //
1208 : // NB: The interleaving iterator is always reinitialized, even if
1209 : // dbi already had an initialized range key iterator, in case the point
1210 : // iterator changed or the range key masking suffix changed.
1211 1 : dbi.rangeKey.iiter.Init(&dbi.comparer, dbi.iter, dbi.rangeKey.rangeKeyIter,
1212 1 : keyspan.InterleavingIterOpts{
1213 1 : Mask: &dbi.rangeKeyMasking,
1214 1 : LowerBound: dbi.opts.LowerBound,
1215 1 : UpperBound: dbi.opts.UpperBound,
1216 1 : })
1217 1 : dbi.iter = &dbi.rangeKey.iiter
1218 : }
1219 1 : } else {
1220 1 : // !dbi.opts.rangeKeys()
1221 1 : //
1222 1 : // Reset the combined iterator state. The initialized=true ensures the
1223 1 : // iterator doesn't unnecessarily try to switch to combined iteration.
1224 1 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{initialized: true}
1225 1 : }
1226 1 : return dbi
1227 : }
1228 :
1229 : // ScanInternal scans all internal keys within the specified bounds, truncating
1230 : // any rangedels and rangekeys to those bounds if they span past them. For use
1231 : // when an external user needs to be aware of all internal keys that make up a
1232 : // key range.
1233 : //
1234 : // Keys deleted by range deletions must not be returned or exposed by this
1235 : // method, while the range deletion deleting that key must be exposed using
1236 : // visitRangeDel. Keys that would be masked by range key masking (if an
1237 : // appropriate prefix were set) should be exposed, alongside the range key
1238 : // that would have masked it. This method also collapses all point keys into
1239 : // one InternalKey; so only one internal key at most per user key is returned
1240 : // to visitPointKey.
1241 : //
1242 : // If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
1243 : // mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
1244 : // their metadatas truncated to [lower, upper) and passed into visitSharedFile.
1245 : // ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
1246 : // sstable in L5 or L6 is found that is not in shared storage according to
1247 : // provider.IsShared, or an sstable in those levels contains a newer key than the
1248 : // snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
1249 : // of when this could happen could be if Pebble started writing sstables before a
1250 : // creator ID was set (as creator IDs are necessary to enable shared storage)
1251 : // resulting in some lower level SSTs being on non-shared storage. Skip-shared
1252 : // iteration is invalid in those cases.
1253 : func (d *DB) ScanInternal(
1254 : ctx context.Context,
1255 : categoryAndQoS sstable.CategoryAndQoS,
1256 : lower, upper []byte,
1257 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
1258 : visitRangeDel func(start, end []byte, seqNum SeqNum) error,
1259 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
1260 : visitSharedFile func(sst *SharedSSTMeta) error,
1261 : visitExternalFile func(sst *ExternalFile) error,
1262 1 : ) error {
1263 1 : scanInternalOpts := &scanInternalOptions{
1264 1 : CategoryAndQoS: categoryAndQoS,
1265 1 : visitPointKey: visitPointKey,
1266 1 : visitRangeDel: visitRangeDel,
1267 1 : visitRangeKey: visitRangeKey,
1268 1 : visitSharedFile: visitSharedFile,
1269 1 : visitExternalFile: visitExternalFile,
1270 1 : IterOptions: IterOptions{
1271 1 : KeyTypes: IterKeyTypePointsAndRanges,
1272 1 : LowerBound: lower,
1273 1 : UpperBound: upper,
1274 1 : },
1275 1 : }
1276 1 : iter, err := d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, scanInternalOpts)
1277 1 : if err != nil {
1278 0 : return err
1279 0 : }
1280 1 : defer iter.close()
1281 1 : return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
1282 : }
1283 :
1284 : // newInternalIter constructs and returns a new scanInternalIterator on this db.
1285 : // If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
1286 : // to the internal iterator.
1287 : //
1288 : // TODO(bilal): This method has a lot of similarities with db.newIter as well as
1289 : // finishInitializingIter. Both pairs of methods should be refactored to reduce
1290 : // this duplication.
1291 : func (d *DB) newInternalIter(
1292 : ctx context.Context, sOpts snapshotIterOpts, o *scanInternalOptions,
1293 1 : ) (*scanInternalIterator, error) {
1294 1 : if err := d.closed.Load(); err != nil {
1295 0 : panic(err)
1296 : }
1297 : // Grab and reference the current readState. This prevents the underlying
1298 : // files in the associated version from being deleted if there is a current
1299 : // compaction. The readState is unref'd by Iterator.Close().
1300 1 : var readState *readState
1301 1 : if sOpts.vers == nil {
1302 1 : if sOpts.readState != nil {
1303 0 : readState = sOpts.readState
1304 0 : readState.ref()
1305 1 : } else {
1306 1 : readState = d.loadReadState()
1307 1 : }
1308 : }
1309 1 : if sOpts.vers != nil {
1310 1 : sOpts.vers.Ref()
1311 1 : }
1312 :
1313 : // Determine the seqnum to read at after grabbing the read state (current and
1314 : // memtables) above.
1315 1 : seqNum := sOpts.seqNum
1316 1 : if seqNum == 0 {
1317 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
1318 1 : }
1319 :
1320 : // Bundle various structures under a single umbrella in order to allocate
1321 : // them together.
1322 1 : buf := iterAllocPool.Get().(*iterAlloc)
1323 1 : dbi := &scanInternalIterator{
1324 1 : ctx: ctx,
1325 1 : db: d,
1326 1 : comparer: d.opts.Comparer,
1327 1 : merge: d.opts.Merger.Merge,
1328 1 : readState: readState,
1329 1 : version: sOpts.vers,
1330 1 : alloc: buf,
1331 1 : newIters: d.newIters,
1332 1 : newIterRangeKey: d.tableNewRangeKeyIter,
1333 1 : seqNum: seqNum,
1334 1 : mergingIter: &buf.merging,
1335 1 : }
1336 1 : dbi.opts = *o
1337 1 : dbi.opts.logger = d.opts.Logger
1338 1 : if d.opts.private.disableLazyCombinedIteration {
1339 0 : dbi.opts.disableLazyCombinedIteration = true
1340 0 : }
1341 1 : return finishInitializingInternalIter(buf, dbi)
1342 : }
1343 :
1344 : func finishInitializingInternalIter(
1345 : buf *iterAlloc, i *scanInternalIterator,
1346 1 : ) (*scanInternalIterator, error) {
1347 1 : // Short-hand.
1348 1 : var memtables flushableList
1349 1 : if i.readState != nil {
1350 1 : memtables = i.readState.memtables
1351 1 : }
1352 : // We only need to read from memtables which contain sequence numbers older
1353 : // than seqNum. Trim off newer memtables.
1354 1 : for j := len(memtables) - 1; j >= 0; j-- {
1355 1 : if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
1356 1 : break
1357 : }
1358 1 : memtables = memtables[:j]
1359 : }
1360 1 : i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
1361 1 :
1362 1 : if err := i.constructPointIter(i.opts.CategoryAndQoS, memtables, buf); err != nil {
1363 0 : return nil, err
1364 0 : }
1365 :
1366 : // For internal iterators, we skip the lazy combined iteration optimization
1367 : // entirely, and create the range key iterator stack directly.
1368 1 : i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1369 1 : i.rangeKey.init(i.comparer.Compare, i.comparer.Split, &i.opts.IterOptions)
1370 1 : if err := i.constructRangeKeyIter(); err != nil {
1371 0 : return nil, err
1372 0 : }
1373 :
1374 : // Wrap the point iterator (currently i.iter) with an interleaving
1375 : // iterator that interleaves range keys pulled from
1376 : // i.rangeKey.rangeKeyIter.
1377 1 : i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
1378 1 : keyspan.InterleavingIterOpts{
1379 1 : LowerBound: i.opts.LowerBound,
1380 1 : UpperBound: i.opts.UpperBound,
1381 1 : })
1382 1 : i.iter = &i.rangeKey.iiter
1383 1 :
1384 1 : return i, nil
1385 : }
1386 :
1387 : func (i *Iterator) constructPointIter(
1388 : ctx context.Context, memtables flushableList, buf *iterAlloc,
1389 1 : ) {
1390 1 : if i.pointIter != nil {
1391 1 : // Already have one.
1392 1 : return
1393 1 : }
1394 1 : internalOpts := internalIterOpts{stats: &i.stats.InternalStats}
1395 1 : if i.opts.RangeKeyMasking.Filter != nil {
1396 1 : internalOpts.boundLimitedFilter = &i.rangeKeyMasking
1397 1 : }
1398 :
1399 : // Merging levels and levels from iterAlloc.
1400 1 : mlevels := buf.mlevels[:0]
1401 1 : levels := buf.levels[:0]
1402 1 :
1403 1 : // We compute the number of levels needed ahead of time and reallocate a slice if
1404 1 : // the array from the iterAlloc isn't large enough. Doing this allocation once
1405 1 : // should improve the performance.
1406 1 : numMergingLevels := 0
1407 1 : numLevelIters := 0
1408 1 : if i.batch != nil {
1409 1 : numMergingLevels++
1410 1 : }
1411 :
1412 1 : var current *version
1413 1 : if !i.batchOnlyIter {
1414 1 : numMergingLevels += len(memtables)
1415 1 :
1416 1 : current = i.version
1417 1 : if current == nil {
1418 1 : current = i.readState.current
1419 1 : }
1420 1 : numMergingLevels += len(current.L0SublevelFiles)
1421 1 : numLevelIters += len(current.L0SublevelFiles)
1422 1 : for level := 1; level < len(current.Levels); level++ {
1423 1 : if current.Levels[level].Empty() {
1424 1 : continue
1425 : }
1426 1 : numMergingLevels++
1427 1 : numLevelIters++
1428 : }
1429 : }
1430 :
1431 1 : if numMergingLevels > cap(mlevels) {
1432 1 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
1433 1 : }
1434 1 : if numLevelIters > cap(levels) {
1435 1 : levels = make([]levelIter, 0, numLevelIters)
1436 1 : }
1437 :
1438 : // Top-level is the batch, if any.
1439 1 : if i.batch != nil {
1440 1 : if i.batch.index == nil {
1441 0 : // This isn't an indexed batch. We shouldn't have gotten this far.
1442 0 : panic(errors.AssertionFailedf("creating an iterator over an unindexed batch"))
1443 1 : } else {
1444 1 : i.batch.initInternalIter(&i.opts, &i.batchPointIter)
1445 1 : i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum)
1446 1 : // Only include the batch's rangedel iterator if it's non-empty.
1447 1 : // This requires some subtle logic in the case a rangedel is later
1448 1 : // written to the batch and the view of the batch is refreshed
1449 1 : // during a call to SetOptions—in this case, we need to reconstruct
1450 1 : // the point iterator to add the batch rangedel iterator.
1451 1 : var rangeDelIter keyspan.FragmentIterator
1452 1 : if i.batchRangeDelIter.Count() > 0 {
1453 1 : rangeDelIter = &i.batchRangeDelIter
1454 1 : }
1455 1 : mlevels = append(mlevels, mergingIterLevel{
1456 1 : iter: &i.batchPointIter,
1457 1 : rangeDelIter: rangeDelIter,
1458 1 : })
1459 : }
1460 : }
1461 :
1462 1 : if !i.batchOnlyIter {
1463 1 : // Next are the memtables.
1464 1 : for j := len(memtables) - 1; j >= 0; j-- {
1465 1 : mem := memtables[j]
1466 1 : mlevels = append(mlevels, mergingIterLevel{
1467 1 : iter: mem.newIter(&i.opts),
1468 1 : rangeDelIter: mem.newRangeDelIter(&i.opts),
1469 1 : })
1470 1 : }
1471 :
1472 : // Next are the file levels: L0 sub-levels followed by lower levels.
1473 1 : mlevelsIndex := len(mlevels)
1474 1 : levelsIndex := len(levels)
1475 1 : mlevels = mlevels[:numMergingLevels]
1476 1 : levels = levels[:numLevelIters]
1477 1 : i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
1478 1 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
1479 1 : li := &levels[levelsIndex]
1480 1 :
1481 1 : li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
1482 1 : li.initRangeDel(mlevels[mlevelsIndex].setRangeDelIter)
1483 1 : li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
1484 1 : mlevels[mlevelsIndex].levelIter = li
1485 1 : mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
1486 1 :
1487 1 : levelsIndex++
1488 1 : mlevelsIndex++
1489 1 : }
1490 :
1491 : // Add level iterators for the L0 sublevels, iterating from newest to
1492 : // oldest.
1493 1 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
1494 1 : addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
1495 1 : }
1496 :
1497 : // Add level iterators for the non-empty non-L0 levels.
1498 1 : for level := 1; level < len(current.Levels); level++ {
1499 1 : if current.Levels[level].Empty() {
1500 1 : continue
1501 : }
1502 1 : addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
1503 : }
1504 : }
1505 1 : buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
1506 1 : if len(mlevels) <= cap(buf.levelsPositioned) {
1507 1 : buf.merging.levelsPositioned = buf.levelsPositioned[:len(mlevels)]
1508 1 : }
1509 1 : buf.merging.snapshot = i.seqNum
1510 1 : buf.merging.batchSnapshot = i.batchSeqNum
1511 1 : buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
1512 1 : i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging).(topLevelIterator)
1513 1 : i.merging = &buf.merging
1514 : }
1515 :
1516 : // NewBatch returns a new empty write-only batch. Any reads on the batch will
1517 : // return an error. If the batch is committed it will be applied to the DB.
1518 1 : func (d *DB) NewBatch(opts ...BatchOption) *Batch {
1519 1 : return newBatch(d, opts...)
1520 1 : }
1521 :
1522 : // NewBatchWithSize is mostly identical to NewBatch, but it will allocate the
1523 : // the specified memory space for the internal slice in advance.
1524 0 : func (d *DB) NewBatchWithSize(size int, opts ...BatchOption) *Batch {
1525 0 : return newBatchWithSize(d, size, opts...)
1526 0 : }
1527 :
1528 : // NewIndexedBatch returns a new empty read-write batch. Any reads on the batch
1529 : // will read from both the batch and the DB. If the batch is committed it will
1530 : // be applied to the DB. An indexed batch is slower that a non-indexed batch
1531 : // for insert operations. If you do not need to perform reads on the batch, use
1532 : // NewBatch instead.
1533 1 : func (d *DB) NewIndexedBatch() *Batch {
1534 1 : return newIndexedBatch(d, d.opts.Comparer)
1535 1 : }
1536 :
1537 : // NewIndexedBatchWithSize is mostly identical to NewIndexedBatch, but it will
1538 : // allocate the specified memory space for the internal slice in advance.
1539 0 : func (d *DB) NewIndexedBatchWithSize(size int) *Batch {
1540 0 : return newIndexedBatchWithSize(d, d.opts.Comparer, size)
1541 0 : }
1542 :
1543 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
1544 : // return false). The iterator can be positioned via a call to SeekGE, SeekLT,
1545 : // First or Last. The iterator provides a point-in-time view of the current DB
1546 : // state. This view is maintained by preventing file deletions and preventing
1547 : // memtables referenced by the iterator from being deleted. Using an iterator
1548 : // to maintain a long-lived point-in-time view of the DB state can lead to an
1549 : // apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for
1550 : // point-in-time snapshots which avoids these problems.
1551 1 : func (d *DB) NewIter(o *IterOptions) (*Iterator, error) {
1552 1 : return d.NewIterWithContext(context.Background(), o)
1553 1 : }
1554 :
1555 : // NewIterWithContext is like NewIter, and additionally accepts a context for
1556 : // tracing.
1557 1 : func (d *DB) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
1558 1 : return d.newIter(ctx, nil /* batch */, newIterOpts{}, o), nil
1559 1 : }
1560 :
1561 : // NewSnapshot returns a point-in-time view of the current DB state. Iterators
1562 : // created with this handle will all observe a stable snapshot of the current
1563 : // DB state. The caller must call Snapshot.Close() when the snapshot is no
1564 : // longer needed. Snapshots are not persisted across DB restarts (close ->
1565 : // open). Unlike the implicit snapshot maintained by an iterator, a snapshot
1566 : // will not prevent memtables from being released or sstables from being
1567 : // deleted. Instead, a snapshot prevents deletion of sequence numbers
1568 : // referenced by the snapshot.
1569 1 : func (d *DB) NewSnapshot() *Snapshot {
1570 1 : if err := d.closed.Load(); err != nil {
1571 1 : panic(err)
1572 : }
1573 :
1574 1 : d.mu.Lock()
1575 1 : s := &Snapshot{
1576 1 : db: d,
1577 1 : seqNum: d.mu.versions.visibleSeqNum.Load(),
1578 1 : }
1579 1 : d.mu.snapshots.pushBack(s)
1580 1 : d.mu.Unlock()
1581 1 : return s
1582 : }
1583 :
1584 : // NewEventuallyFileOnlySnapshot returns a point-in-time view of the current DB
1585 : // state, similar to NewSnapshot, but with consistency constrained to the
1586 : // provided set of key ranges. See the comment at EventuallyFileOnlySnapshot for
1587 : // its semantics.
1588 1 : func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
1589 1 : if err := d.closed.Load(); err != nil {
1590 0 : panic(err)
1591 : }
1592 1 : for i := range keyRanges {
1593 1 : if i > 0 && d.cmp(keyRanges[i-1].End, keyRanges[i].Start) > 0 {
1594 0 : panic("pebble: key ranges for eventually-file-only-snapshot not in order")
1595 : }
1596 : }
1597 1 : return d.makeEventuallyFileOnlySnapshot(keyRanges)
1598 : }
1599 :
1600 : // Close closes the DB.
1601 : //
1602 : // It is not safe to close a DB until all outstanding iterators are closed
1603 : // or to call Close concurrently with any other DB method. It is not valid
1604 : // to call any of a DB's methods after the DB has been closed.
1605 1 : func (d *DB) Close() error {
1606 1 : // Lock the commit pipeline for the duration of Close. This prevents a race
1607 1 : // with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires
1608 1 : // dropping d.mu several times for I/O. If Close only holds d.mu, an
1609 1 : // in-progress WAL rotation may re-acquire d.mu only once the database is
1610 1 : // closed.
1611 1 : //
1612 1 : // Additionally, locking the commit pipeline makes it more likely that
1613 1 : // (illegal) concurrent writes will observe d.closed.Load() != nil, creating
1614 1 : // more understable panics if the database is improperly used concurrently
1615 1 : // during Close.
1616 1 : d.commit.mu.Lock()
1617 1 : defer d.commit.mu.Unlock()
1618 1 : d.mu.Lock()
1619 1 : defer d.mu.Unlock()
1620 1 : if err := d.closed.Load(); err != nil {
1621 1 : panic(err)
1622 : }
1623 :
1624 : // Clear the finalizer that is used to check that an unreferenced DB has been
1625 : // closed. We're closing the DB here, so the check performed by that
1626 : // finalizer isn't necessary.
1627 : //
1628 : // Note: this is a no-op if invariants are disabled or race is enabled.
1629 1 : invariants.SetFinalizer(d.closed, nil)
1630 1 :
1631 1 : d.closed.Store(errors.WithStack(ErrClosed))
1632 1 : close(d.closedCh)
1633 1 :
1634 1 : defer d.opts.Cache.Unref()
1635 1 :
1636 1 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
1637 1 : d.mu.compact.cond.Wait()
1638 1 : }
1639 1 : for d.mu.tableStats.loading {
1640 1 : d.mu.tableStats.cond.Wait()
1641 1 : }
1642 1 : for d.mu.tableValidation.validating {
1643 1 : d.mu.tableValidation.cond.Wait()
1644 1 : }
1645 :
1646 1 : var err error
1647 1 : if n := len(d.mu.compact.inProgress); n > 0 {
1648 1 : err = errors.Errorf("pebble: %d unexpected in-progress compactions", errors.Safe(n))
1649 1 : }
1650 1 : err = firstError(err, d.mu.formatVers.marker.Close())
1651 1 : err = firstError(err, d.tableCache.close())
1652 1 : if !d.opts.ReadOnly {
1653 1 : if d.mu.log.writer != nil {
1654 1 : _, err2 := d.mu.log.writer.Close()
1655 1 : err = firstError(err, err2)
1656 1 : }
1657 1 : } else if d.mu.log.writer != nil {
1658 0 : panic("pebble: log-writer should be nil in read-only mode")
1659 : }
1660 1 : err = firstError(err, d.mu.log.manager.Close())
1661 1 : err = firstError(err, d.fileLock.Close())
1662 1 :
1663 1 : // Note that versionSet.close() only closes the MANIFEST. The versions list
1664 1 : // is still valid for the checks below.
1665 1 : err = firstError(err, d.mu.versions.close())
1666 1 :
1667 1 : err = firstError(err, d.dataDir.Close())
1668 1 :
1669 1 : d.readState.val.unrefLocked()
1670 1 :
1671 1 : current := d.mu.versions.currentVersion()
1672 1 : for v := d.mu.versions.versions.Front(); true; v = v.Next() {
1673 1 : refs := v.Refs()
1674 1 : if v == current {
1675 1 : if refs != 1 {
1676 1 : err = firstError(err, errors.Errorf("leaked iterators: current\n%s", v))
1677 1 : }
1678 1 : break
1679 : }
1680 0 : if refs != 0 {
1681 0 : err = firstError(err, errors.Errorf("leaked iterators:\n%s", v))
1682 0 : }
1683 : }
1684 :
1685 1 : for _, mem := range d.mu.mem.queue {
1686 1 : // Usually, we'd want to delete the files returned by readerUnref. But
1687 1 : // in this case, even if we're unreferencing the flushables, the
1688 1 : // flushables aren't obsolete. They will be reconstructed during WAL
1689 1 : // replay.
1690 1 : mem.readerUnrefLocked(false)
1691 1 : }
1692 : // If there's an unused, recycled memtable, we need to release its memory.
1693 1 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
1694 1 : d.freeMemTable(obsoleteMemTable)
1695 1 : }
1696 1 : if reserved := d.memTableReserved.Load(); reserved != 0 {
1697 1 : err = firstError(err, errors.Errorf("leaked memtable reservation: %d", errors.Safe(reserved)))
1698 1 : }
1699 :
1700 : // Since we called d.readState.val.unrefLocked() above, we are expected to
1701 : // manually schedule deletion of obsolete files.
1702 1 : if len(d.mu.versions.obsoleteTables) > 0 {
1703 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
1704 1 : }
1705 :
1706 1 : d.mu.Unlock()
1707 1 : d.compactionSchedulers.Wait()
1708 1 :
1709 1 : // Wait for all cleaning jobs to finish.
1710 1 : d.cleanupManager.Close()
1711 1 :
1712 1 : // Sanity check metrics.
1713 1 : if invariants.Enabled {
1714 1 : m := d.Metrics()
1715 1 : if m.Compact.NumInProgress > 0 || m.Compact.InProgressBytes > 0 {
1716 0 : d.mu.Lock()
1717 0 : panic(fmt.Sprintf("invalid metrics on close:\n%s", m))
1718 : }
1719 : }
1720 :
1721 1 : d.mu.Lock()
1722 1 :
1723 1 : // As a sanity check, ensure that there are no zombie tables. A non-zero count
1724 1 : // hints at a reference count leak.
1725 1 : if ztbls := len(d.mu.versions.zombieTables); ztbls > 0 {
1726 0 : err = firstError(err, errors.Errorf("non-zero zombie file count: %d", ztbls))
1727 0 : }
1728 :
1729 1 : err = firstError(err, d.objProvider.Close())
1730 1 :
1731 1 : // If the options include a closer to 'close' the filesystem, close it.
1732 1 : if d.opts.private.fsCloser != nil {
1733 1 : d.opts.private.fsCloser.Close()
1734 1 : }
1735 :
1736 : // Return an error if the user failed to close all open snapshots.
1737 1 : if v := d.mu.snapshots.count(); v > 0 {
1738 0 : err = firstError(err, errors.Errorf("leaked snapshots: %d open snapshots on DB %p", v, d))
1739 0 : }
1740 :
1741 1 : return err
1742 : }
1743 :
1744 : // Compact the specified range of keys in the database.
1745 1 : func (d *DB) Compact(start, end []byte, parallelize bool) error {
1746 1 : if err := d.closed.Load(); err != nil {
1747 1 : panic(err)
1748 : }
1749 1 : if d.opts.ReadOnly {
1750 1 : return ErrReadOnly
1751 1 : }
1752 1 : if d.cmp(start, end) >= 0 {
1753 1 : return errors.Errorf("Compact start %s is not less than end %s",
1754 1 : d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
1755 1 : }
1756 :
1757 1 : d.mu.Lock()
1758 1 : maxLevelWithFiles := 1
1759 1 : cur := d.mu.versions.currentVersion()
1760 1 : for level := 0; level < numLevels; level++ {
1761 1 : overlaps := cur.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
1762 1 : if !overlaps.Empty() {
1763 1 : maxLevelWithFiles = level + 1
1764 1 : }
1765 : }
1766 :
1767 : // Determine if any memtable overlaps with the compaction range. We wait for
1768 : // any such overlap to flush (initiating a flush if necessary).
1769 1 : mem, err := func() (*flushableEntry, error) {
1770 1 : // Check to see if any files overlap with any of the memtables. The queue
1771 1 : // is ordered from oldest to newest with the mutable memtable being the
1772 1 : // last element in the slice. We want to wait for the newest table that
1773 1 : // overlaps.
1774 1 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1775 1 : mem := d.mu.mem.queue[i]
1776 1 : var anyOverlaps bool
1777 1 : mem.computePossibleOverlaps(func(b bounded) shouldContinue {
1778 1 : anyOverlaps = true
1779 1 : return stopIteration
1780 1 : }, KeyRange{Start: start, End: end})
1781 1 : if !anyOverlaps {
1782 1 : continue
1783 : }
1784 1 : var err error
1785 1 : if mem.flushable == d.mu.mem.mutable {
1786 1 : // We have to hold both commitPipeline.mu and DB.mu when calling
1787 1 : // makeRoomForWrite(). Lock order requirements elsewhere force us to
1788 1 : // unlock DB.mu in order to grab commitPipeline.mu first.
1789 1 : d.mu.Unlock()
1790 1 : d.commit.mu.Lock()
1791 1 : d.mu.Lock()
1792 1 : defer d.commit.mu.Unlock()
1793 1 : if mem.flushable == d.mu.mem.mutable {
1794 1 : // Only flush if the active memtable is unchanged.
1795 1 : err = d.makeRoomForWrite(nil)
1796 1 : }
1797 : }
1798 1 : mem.flushForced = true
1799 1 : d.maybeScheduleFlush()
1800 1 : return mem, err
1801 : }
1802 1 : return nil, nil
1803 : }()
1804 :
1805 1 : d.mu.Unlock()
1806 1 :
1807 1 : if err != nil {
1808 0 : return err
1809 0 : }
1810 1 : if mem != nil {
1811 1 : <-mem.flushed
1812 1 : }
1813 :
1814 1 : for level := 0; level < maxLevelWithFiles; {
1815 1 : for {
1816 1 : if err := d.manualCompact(
1817 1 : start, end, level, parallelize); err != nil {
1818 1 : if errors.Is(err, ErrCancelledCompaction) {
1819 1 : continue
1820 : }
1821 1 : return err
1822 : }
1823 1 : break
1824 : }
1825 1 : level++
1826 1 : if level == numLevels-1 {
1827 1 : // A manual compaction of the bottommost level occurred.
1828 1 : // There is no next level to try and compact.
1829 1 : break
1830 : }
1831 : }
1832 1 : return nil
1833 : }
1834 :
1835 1 : func (d *DB) manualCompact(start, end []byte, level int, parallelize bool) error {
1836 1 : d.mu.Lock()
1837 1 : curr := d.mu.versions.currentVersion()
1838 1 : files := curr.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
1839 1 : if files.Empty() {
1840 1 : d.mu.Unlock()
1841 1 : return nil
1842 1 : }
1843 :
1844 1 : var compactions []*manualCompaction
1845 1 : if parallelize {
1846 1 : compactions = append(compactions, d.splitManualCompaction(start, end, level)...)
1847 1 : } else {
1848 1 : compactions = append(compactions, &manualCompaction{
1849 1 : level: level,
1850 1 : done: make(chan error, 1),
1851 1 : start: start,
1852 1 : end: end,
1853 1 : })
1854 1 : }
1855 1 : d.mu.compact.manual = append(d.mu.compact.manual, compactions...)
1856 1 : d.maybeScheduleCompaction()
1857 1 : d.mu.Unlock()
1858 1 :
1859 1 : // Each of the channels is guaranteed to be eventually sent to once. After a
1860 1 : // compaction is possibly picked in d.maybeScheduleCompaction(), either the
1861 1 : // compaction is dropped, executed after being scheduled, or retried later.
1862 1 : // Assuming eventual progress when a compaction is retried, all outcomes send
1863 1 : // a value to the done channel. Since the channels are buffered, it is not
1864 1 : // necessary to read from each channel, and so we can exit early in the event
1865 1 : // of an error.
1866 1 : for _, compaction := range compactions {
1867 1 : if err := <-compaction.done; err != nil {
1868 1 : return err
1869 1 : }
1870 : }
1871 1 : return nil
1872 : }
1873 :
1874 : // splitManualCompaction splits a manual compaction over [start,end] on level
1875 : // such that the resulting compactions have no key overlap.
1876 : func (d *DB) splitManualCompaction(
1877 : start, end []byte, level int,
1878 1 : ) (splitCompactions []*manualCompaction) {
1879 1 : curr := d.mu.versions.currentVersion()
1880 1 : endLevel := level + 1
1881 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
1882 1 : if level == 0 {
1883 1 : endLevel = baseLevel
1884 1 : }
1885 1 : keyRanges := curr.CalculateInuseKeyRanges(level, endLevel, start, end)
1886 1 : for _, keyRange := range keyRanges {
1887 1 : splitCompactions = append(splitCompactions, &manualCompaction{
1888 1 : level: level,
1889 1 : done: make(chan error, 1),
1890 1 : start: keyRange.Start,
1891 1 : end: keyRange.End.Key,
1892 1 : split: true,
1893 1 : })
1894 1 : }
1895 1 : return splitCompactions
1896 : }
1897 :
1898 : // Flush the memtable to stable storage.
1899 1 : func (d *DB) Flush() error {
1900 1 : flushDone, err := d.AsyncFlush()
1901 1 : if err != nil {
1902 1 : return err
1903 1 : }
1904 1 : <-flushDone
1905 1 : return nil
1906 : }
1907 :
1908 : // AsyncFlush asynchronously flushes the memtable to stable storage.
1909 : //
1910 : // If no error is returned, the caller can receive from the returned channel in
1911 : // order to wait for the flush to complete.
1912 1 : func (d *DB) AsyncFlush() (<-chan struct{}, error) {
1913 1 : if err := d.closed.Load(); err != nil {
1914 1 : panic(err)
1915 : }
1916 1 : if d.opts.ReadOnly {
1917 1 : return nil, ErrReadOnly
1918 1 : }
1919 :
1920 1 : d.commit.mu.Lock()
1921 1 : defer d.commit.mu.Unlock()
1922 1 : d.mu.Lock()
1923 1 : defer d.mu.Unlock()
1924 1 : flushed := d.mu.mem.queue[len(d.mu.mem.queue)-1].flushed
1925 1 : err := d.makeRoomForWrite(nil)
1926 1 : if err != nil {
1927 0 : return nil, err
1928 0 : }
1929 1 : return flushed, nil
1930 : }
1931 :
1932 : // Metrics returns metrics about the database.
1933 1 : func (d *DB) Metrics() *Metrics {
1934 1 : metrics := &Metrics{}
1935 1 : walStats := d.mu.log.manager.Stats()
1936 1 :
1937 1 : d.mu.Lock()
1938 1 : vers := d.mu.versions.currentVersion()
1939 1 : *metrics = d.mu.versions.metrics
1940 1 : metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt(0)
1941 1 : metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load()
1942 1 : // TODO(radu): split this to separate the download compactions.
1943 1 : metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount + d.mu.compact.downloadingCount)
1944 1 : metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction
1945 1 : metrics.Compact.Duration = d.mu.compact.duration
1946 1 : for c := range d.mu.compact.inProgress {
1947 1 : if c.kind != compactionKindFlush && c.kind != compactionKindIngestedFlushable {
1948 1 : metrics.Compact.Duration += d.timeNow().Sub(c.beganAt)
1949 1 : }
1950 : }
1951 :
1952 1 : for _, m := range d.mu.mem.queue {
1953 1 : metrics.MemTable.Size += m.totalBytes()
1954 1 : }
1955 1 : metrics.Snapshots.Count = d.mu.snapshots.count()
1956 1 : if metrics.Snapshots.Count > 0 {
1957 0 : metrics.Snapshots.EarliestSeqNum = d.mu.snapshots.earliest()
1958 0 : }
1959 1 : metrics.Snapshots.PinnedKeys = d.mu.snapshots.cumulativePinnedCount
1960 1 : metrics.Snapshots.PinnedSize = d.mu.snapshots.cumulativePinnedSize
1961 1 : metrics.MemTable.Count = int64(len(d.mu.mem.queue))
1962 1 : metrics.MemTable.ZombieCount = d.memTableCount.Load() - metrics.MemTable.Count
1963 1 : metrics.MemTable.ZombieSize = uint64(d.memTableReserved.Load()) - metrics.MemTable.Size
1964 1 : metrics.WAL.ObsoleteFiles = int64(walStats.ObsoleteFileCount)
1965 1 : metrics.WAL.ObsoletePhysicalSize = walStats.ObsoleteFileSize
1966 1 : metrics.WAL.Files = int64(walStats.LiveFileCount)
1967 1 : // The current WAL's size (d.logSize) is the logical size, which may be less
1968 1 : // than the WAL's physical size if it was recycled. walStats.LiveFileSize
1969 1 : // includes the physical size of all live WALs, but for the current WAL it
1970 1 : // reflects the physical size when it was opened. So it is possible that
1971 1 : // d.atomic.logSize has exceeded that physical size. We allow for this
1972 1 : // anomaly.
1973 1 : metrics.WAL.PhysicalSize = walStats.LiveFileSize
1974 1 : metrics.WAL.BytesIn = d.logBytesIn.Load()
1975 1 : metrics.WAL.Size = d.logSize.Load()
1976 1 : for i, n := 0, len(d.mu.mem.queue)-1; i < n; i++ {
1977 1 : metrics.WAL.Size += d.mu.mem.queue[i].logSize
1978 1 : }
1979 1 : metrics.WAL.BytesWritten = metrics.Levels[0].BytesIn + metrics.WAL.Size
1980 1 : metrics.WAL.Failover = walStats.Failover
1981 1 :
1982 1 : if p := d.mu.versions.picker; p != nil {
1983 1 : compactions := d.getInProgressCompactionInfoLocked(nil)
1984 1 : for level, score := range p.getScores(compactions) {
1985 1 : metrics.Levels[level].Score = score
1986 1 : }
1987 : }
1988 1 : metrics.Table.ZombieCount = int64(len(d.mu.versions.zombieTables))
1989 1 : for _, info := range d.mu.versions.zombieTables {
1990 1 : metrics.Table.ZombieSize += info.FileSize
1991 1 : if info.isLocal {
1992 1 : metrics.Table.Local.ZombieSize += info.FileSize
1993 1 : }
1994 : }
1995 1 : metrics.private.optionsFileSize = d.optionsFileSize
1996 1 :
1997 1 : // TODO(jackson): Consider making these metrics optional.
1998 1 : metrics.Keys.RangeKeySetsCount = countRangeKeySetFragments(vers)
1999 1 : metrics.Keys.TombstoneCount = countTombstones(vers)
2000 1 :
2001 1 : d.mu.versions.logLock()
2002 1 : metrics.private.manifestFileSize = uint64(d.mu.versions.manifest.Size())
2003 1 : backingCount, backingTotalSize := d.mu.versions.virtualBackings.Stats()
2004 1 : metrics.Table.BackingTableCount = uint64(backingCount)
2005 1 : metrics.Table.BackingTableSize = backingTotalSize
2006 1 : d.mu.versions.logUnlock()
2007 1 :
2008 1 : metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
2009 1 : if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
2010 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2011 0 : }
2012 1 : metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput
2013 1 : if d.mu.compact.flushing {
2014 1 : metrics.Flush.NumInProgress = 1
2015 1 : }
2016 1 : for i := 0; i < numLevels; i++ {
2017 1 : metrics.Levels[i].Additional.ValueBlocksSize = valueBlocksSizeForLevel(vers, i)
2018 1 : unknown, snappy, none, zstd := compressionTypesForLevel(vers, i)
2019 1 : metrics.Table.CompressedCountUnknown += int64(unknown)
2020 1 : metrics.Table.CompressedCountSnappy += int64(snappy)
2021 1 : metrics.Table.CompressedCountZstd += int64(zstd)
2022 1 : metrics.Table.CompressedCountNone += int64(none)
2023 1 : }
2024 :
2025 1 : d.mu.Unlock()
2026 1 :
2027 1 : metrics.BlockCache = d.opts.Cache.Metrics()
2028 1 : metrics.TableCache, metrics.Filter = d.tableCache.metrics()
2029 1 : metrics.TableIters = int64(d.tableCache.iterCount())
2030 1 : metrics.CategoryStats = d.tableCache.dbOpts.sstStatsCollector.GetStats()
2031 1 :
2032 1 : metrics.SecondaryCacheMetrics = d.objProvider.Metrics()
2033 1 :
2034 1 : metrics.Uptime = d.timeNow().Sub(d.openedAt)
2035 1 :
2036 1 : return metrics
2037 : }
2038 :
2039 : // sstablesOptions hold the optional parameters to retrieve TableInfo for all sstables.
2040 : type sstablesOptions struct {
2041 : // set to true will return the sstable properties in TableInfo
2042 : withProperties bool
2043 :
2044 : // if set, return sstables that overlap the key range (end-exclusive)
2045 : start []byte
2046 : end []byte
2047 :
2048 : withApproximateSpanBytes bool
2049 : }
2050 :
2051 : // SSTablesOption set optional parameter used by `DB.SSTables`.
2052 : type SSTablesOption func(*sstablesOptions)
2053 :
2054 : // WithProperties enable return sstable properties in each TableInfo.
2055 : //
2056 : // NOTE: if most of the sstable properties need to be read from disk,
2057 : // this options may make method `SSTables` quite slow.
2058 1 : func WithProperties() SSTablesOption {
2059 1 : return func(opt *sstablesOptions) {
2060 1 : opt.withProperties = true
2061 1 : }
2062 : }
2063 :
2064 : // WithKeyRangeFilter ensures returned sstables overlap start and end (end-exclusive)
2065 : // if start and end are both nil these properties have no effect.
2066 1 : func WithKeyRangeFilter(start, end []byte) SSTablesOption {
2067 1 : return func(opt *sstablesOptions) {
2068 1 : opt.end = end
2069 1 : opt.start = start
2070 1 : }
2071 : }
2072 :
2073 : // WithApproximateSpanBytes enables capturing the approximate number of bytes that
2074 : // overlap the provided key span for each sstable.
2075 : // NOTE: this option can only be used with WithKeyRangeFilter and WithProperties
2076 : // provided.
2077 1 : func WithApproximateSpanBytes() SSTablesOption {
2078 1 : return func(opt *sstablesOptions) {
2079 1 : opt.withApproximateSpanBytes = true
2080 1 : }
2081 : }
2082 :
2083 : // BackingType denotes the type of storage backing a given sstable.
2084 : type BackingType int
2085 :
2086 : const (
2087 : // BackingTypeLocal denotes an sstable stored on local disk according to the
2088 : // objprovider. This file is completely owned by us.
2089 : BackingTypeLocal BackingType = iota
2090 : // BackingTypeShared denotes an sstable stored on shared storage, created
2091 : // by this Pebble instance and possibly shared by other Pebble instances.
2092 : // These types of files have lifecycle managed by Pebble.
2093 : BackingTypeShared
2094 : // BackingTypeSharedForeign denotes an sstable stored on shared storage,
2095 : // created by a Pebble instance other than this one. These types of files have
2096 : // lifecycle managed by Pebble.
2097 : BackingTypeSharedForeign
2098 : // BackingTypeExternal denotes an sstable stored on external storage,
2099 : // not owned by any Pebble instance and with no refcounting/cleanup methods
2100 : // or lifecycle management. An example of an external file is a file restored
2101 : // from a backup.
2102 : BackingTypeExternal
2103 : )
2104 :
2105 : // SSTableInfo export manifest.TableInfo with sstable.Properties alongside
2106 : // other file backing info.
2107 : type SSTableInfo struct {
2108 : manifest.TableInfo
2109 : // Virtual indicates whether the sstable is virtual.
2110 : Virtual bool
2111 : // BackingSSTNum is the disk file number associated with the backing sstable.
2112 : // If Virtual is false, BackingSSTNum == PhysicalTableDiskFileNum(FileNum).
2113 : BackingSSTNum base.DiskFileNum
2114 : // BackingType is the type of storage backing this sstable.
2115 : BackingType BackingType
2116 : // Locator is the remote.Locator backing this sstable, if the backing type is
2117 : // not BackingTypeLocal.
2118 : Locator remote.Locator
2119 :
2120 : // Properties is the sstable properties of this table. If Virtual is true,
2121 : // then the Properties are associated with the backing sst.
2122 : Properties *sstable.Properties
2123 : }
2124 :
2125 : // SSTables retrieves the current sstables. The returned slice is indexed by
2126 : // level and each level is indexed by the position of the sstable within the
2127 : // level. Note that this information may be out of date due to concurrent
2128 : // flushes and compactions.
2129 1 : func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
2130 1 : opt := &sstablesOptions{}
2131 1 : for _, fn := range opts {
2132 1 : fn(opt)
2133 1 : }
2134 :
2135 1 : if opt.withApproximateSpanBytes && !opt.withProperties {
2136 1 : return nil, errors.Errorf("Cannot use WithApproximateSpanBytes without WithProperties option.")
2137 1 : }
2138 1 : if opt.withApproximateSpanBytes && (opt.start == nil || opt.end == nil) {
2139 1 : return nil, errors.Errorf("Cannot use WithApproximateSpanBytes without WithKeyRangeFilter option.")
2140 1 : }
2141 :
2142 : // Grab and reference the current readState.
2143 1 : readState := d.loadReadState()
2144 1 : defer readState.unref()
2145 1 :
2146 1 : // TODO(peter): This is somewhat expensive, especially on a large
2147 1 : // database. It might be worthwhile to unify TableInfo and FileMetadata and
2148 1 : // then we could simply return current.Files. Note that RocksDB is doing
2149 1 : // something similar to the current code, so perhaps it isn't too bad.
2150 1 : srcLevels := readState.current.Levels
2151 1 : var totalTables int
2152 1 : for i := range srcLevels {
2153 1 : totalTables += srcLevels[i].Len()
2154 1 : }
2155 :
2156 1 : destTables := make([]SSTableInfo, totalTables)
2157 1 : destLevels := make([][]SSTableInfo, len(srcLevels))
2158 1 : for i := range destLevels {
2159 1 : iter := srcLevels[i].Iter()
2160 1 : j := 0
2161 1 : for m := iter.First(); m != nil; m = iter.Next() {
2162 1 : if opt.start != nil && opt.end != nil {
2163 1 : b := base.UserKeyBoundsEndExclusive(opt.start, opt.end)
2164 1 : if !m.Overlaps(d.opts.Comparer.Compare, &b) {
2165 1 : continue
2166 : }
2167 : }
2168 1 : destTables[j] = SSTableInfo{TableInfo: m.TableInfo()}
2169 1 : if opt.withProperties {
2170 1 : p, err := d.tableCache.getTableProperties(
2171 1 : m,
2172 1 : )
2173 1 : if err != nil {
2174 0 : return nil, err
2175 0 : }
2176 1 : destTables[j].Properties = p
2177 : }
2178 1 : destTables[j].Virtual = m.Virtual
2179 1 : destTables[j].BackingSSTNum = m.FileBacking.DiskFileNum
2180 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, m.FileBacking.DiskFileNum)
2181 1 : if err != nil {
2182 0 : return nil, err
2183 0 : }
2184 1 : if objMeta.IsRemote() {
2185 0 : if objMeta.IsShared() {
2186 0 : if d.objProvider.IsSharedForeign(objMeta) {
2187 0 : destTables[j].BackingType = BackingTypeSharedForeign
2188 0 : } else {
2189 0 : destTables[j].BackingType = BackingTypeShared
2190 0 : }
2191 0 : } else {
2192 0 : destTables[j].BackingType = BackingTypeExternal
2193 0 : }
2194 0 : destTables[j].Locator = objMeta.Remote.Locator
2195 1 : } else {
2196 1 : destTables[j].BackingType = BackingTypeLocal
2197 1 : }
2198 :
2199 1 : if opt.withApproximateSpanBytes {
2200 1 : var spanBytes uint64
2201 1 : if m.ContainedWithinSpan(d.opts.Comparer.Compare, opt.start, opt.end) {
2202 0 : spanBytes = m.Size
2203 1 : } else {
2204 1 : size, err := d.tableCache.estimateSize(m, opt.start, opt.end)
2205 1 : if err != nil {
2206 0 : return nil, err
2207 0 : }
2208 1 : spanBytes = size
2209 : }
2210 1 : propertiesCopy := *destTables[j].Properties
2211 1 :
2212 1 : // Deep copy user properties so approximate span bytes can be added.
2213 1 : propertiesCopy.UserProperties = make(map[string]string, len(destTables[j].Properties.UserProperties)+1)
2214 1 : for k, v := range destTables[j].Properties.UserProperties {
2215 0 : propertiesCopy.UserProperties[k] = v
2216 0 : }
2217 1 : propertiesCopy.UserProperties["approximate-span-bytes"] = strconv.FormatUint(spanBytes, 10)
2218 1 : destTables[j].Properties = &propertiesCopy
2219 : }
2220 1 : j++
2221 : }
2222 1 : destLevels[i] = destTables[:j]
2223 1 : destTables = destTables[j:]
2224 : }
2225 :
2226 1 : return destLevels, nil
2227 : }
2228 :
2229 : // EstimateDiskUsage returns the estimated filesystem space used in bytes for
2230 : // storing the range `[start, end]`. The estimation is computed as follows:
2231 : //
2232 : // - For sstables fully contained in the range the whole file size is included.
2233 : // - For sstables partially contained in the range the overlapping data block sizes
2234 : // are included. Even if a data block partially overlaps, or we cannot determine
2235 : // overlap due to abbreviated index keys, the full data block size is included in
2236 : // the estimation. Note that unlike fully contained sstables, none of the
2237 : // meta-block space is counted for partially overlapped files.
2238 : // - For virtual sstables, we use the overlap between start, end and the virtual
2239 : // sstable bounds to determine disk usage.
2240 : // - There may also exist WAL entries for unflushed keys in this range. This
2241 : // estimation currently excludes space used for the range in the WAL.
2242 1 : func (d *DB) EstimateDiskUsage(start, end []byte) (uint64, error) {
2243 1 : bytes, _, _, err := d.EstimateDiskUsageByBackingType(start, end)
2244 1 : return bytes, err
2245 1 : }
2246 :
2247 : // EstimateDiskUsageByBackingType is like EstimateDiskUsage but additionally
2248 : // returns the subsets of that size in remote ane external files.
2249 : func (d *DB) EstimateDiskUsageByBackingType(
2250 : start, end []byte,
2251 1 : ) (totalSize, remoteSize, externalSize uint64, _ error) {
2252 1 : if err := d.closed.Load(); err != nil {
2253 0 : panic(err)
2254 : }
2255 1 : if d.opts.Comparer.Compare(start, end) > 0 {
2256 0 : return 0, 0, 0, errors.New("invalid key-range specified (start > end)")
2257 0 : }
2258 :
2259 : // Grab and reference the current readState. This prevents the underlying
2260 : // files in the associated version from being deleted if there is a concurrent
2261 : // compaction.
2262 1 : readState := d.loadReadState()
2263 1 : defer readState.unref()
2264 1 :
2265 1 : for level, files := range readState.current.Levels {
2266 1 : iter := files.Iter()
2267 1 : if level > 0 {
2268 1 : // We can only use `Overlaps` to restrict `files` at L1+ since at L0 it
2269 1 : // expands the range iteratively until it has found a set of files that
2270 1 : // do not overlap any other L0 files outside that set.
2271 1 : overlaps := readState.current.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
2272 1 : iter = overlaps.Iter()
2273 1 : }
2274 1 : for file := iter.First(); file != nil; file = iter.Next() {
2275 1 : if d.opts.Comparer.Compare(start, file.Smallest.UserKey) <= 0 &&
2276 1 : d.opts.Comparer.Compare(file.Largest.UserKey, end) <= 0 {
2277 1 : // The range fully contains the file, so skip looking it up in
2278 1 : // table cache/looking at its indexes, and add the full file size.
2279 1 : meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
2280 1 : if err != nil {
2281 0 : return 0, 0, 0, err
2282 0 : }
2283 1 : if meta.IsRemote() {
2284 0 : remoteSize += file.Size
2285 0 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
2286 0 : externalSize += file.Size
2287 0 : }
2288 : }
2289 1 : totalSize += file.Size
2290 1 : } else if d.opts.Comparer.Compare(file.Smallest.UserKey, end) <= 0 &&
2291 1 : d.opts.Comparer.Compare(start, file.Largest.UserKey) <= 0 {
2292 1 : var size uint64
2293 1 : var err error
2294 1 : if file.Virtual {
2295 0 : err = d.tableCache.withVirtualReader(
2296 0 : file.VirtualMeta(),
2297 0 : func(r sstable.VirtualReader) (err error) {
2298 0 : size, err = r.EstimateDiskUsage(start, end)
2299 0 : return err
2300 0 : },
2301 : )
2302 1 : } else {
2303 1 : err = d.tableCache.withReader(
2304 1 : file.PhysicalMeta(),
2305 1 : func(r *sstable.Reader) (err error) {
2306 1 : size, err = r.EstimateDiskUsage(start, end)
2307 1 : return err
2308 1 : },
2309 : )
2310 : }
2311 1 : if err != nil {
2312 0 : return 0, 0, 0, err
2313 0 : }
2314 1 : meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
2315 1 : if err != nil {
2316 0 : return 0, 0, 0, err
2317 0 : }
2318 1 : if meta.IsRemote() {
2319 0 : remoteSize += size
2320 0 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
2321 0 : externalSize += size
2322 0 : }
2323 : }
2324 1 : totalSize += size
2325 : }
2326 : }
2327 : }
2328 1 : return totalSize, remoteSize, externalSize, nil
2329 : }
2330 :
2331 1 : func (d *DB) walPreallocateSize() int {
2332 1 : // Set the WAL preallocate size to 110% of the memtable size. Note that there
2333 1 : // is a bit of apples and oranges in units here as the memtabls size
2334 1 : // corresponds to the memory usage of the memtable while the WAL size is the
2335 1 : // size of the batches (plus overhead) stored in the WAL.
2336 1 : //
2337 1 : // TODO(peter): 110% of the memtable size is quite hefty for a block
2338 1 : // size. This logic is taken from GetWalPreallocateBlockSize in
2339 1 : // RocksDB. Could a smaller preallocation block size be used?
2340 1 : size := d.opts.MemTableSize
2341 1 : size = (size / 10) + size
2342 1 : return int(size)
2343 1 : }
2344 :
2345 : func (d *DB) newMemTable(
2346 : logNum base.DiskFileNum, logSeqNum base.SeqNum, minSize uint64,
2347 1 : ) (*memTable, *flushableEntry) {
2348 1 : targetSize := minSize + uint64(memTableEmptySize)
2349 1 : // The targetSize should be less than MemTableSize, because any batch >=
2350 1 : // MemTableSize/2 should be treated as a large flushable batch.
2351 1 : if targetSize > d.opts.MemTableSize {
2352 0 : panic(errors.AssertionFailedf("attempting to allocate memtable larger than MemTableSize"))
2353 : }
2354 : // Double until the next memtable size is at least large enough to fit
2355 : // minSize.
2356 1 : for d.mu.mem.nextSize < targetSize {
2357 1 : d.mu.mem.nextSize = min(2*d.mu.mem.nextSize, d.opts.MemTableSize)
2358 1 : }
2359 1 : size := d.mu.mem.nextSize
2360 1 : // The next memtable should be double the size, up to Options.MemTableSize.
2361 1 : if d.mu.mem.nextSize < d.opts.MemTableSize {
2362 1 : d.mu.mem.nextSize = min(2*d.mu.mem.nextSize, d.opts.MemTableSize)
2363 1 : }
2364 :
2365 1 : memtblOpts := memTableOptions{
2366 1 : Options: d.opts,
2367 1 : logSeqNum: logSeqNum,
2368 1 : }
2369 1 :
2370 1 : // Before attempting to allocate a new memtable, check if there's one
2371 1 : // available for recycling in memTableRecycle. Large contiguous allocations
2372 1 : // can be costly as fragmentation makes it more difficult to find a large
2373 1 : // contiguous free space. We've observed 64MB allocations taking 10ms+.
2374 1 : //
2375 1 : // To reduce these costly allocations, up to 1 obsolete memtable is stashed
2376 1 : // in `d.memTableRecycle` to allow a future memtable rotation to reuse
2377 1 : // existing memory.
2378 1 : var mem *memTable
2379 1 : mem = d.memTableRecycle.Swap(nil)
2380 1 : if mem != nil && uint64(len(mem.arenaBuf)) != size {
2381 1 : d.freeMemTable(mem)
2382 1 : mem = nil
2383 1 : }
2384 1 : if mem != nil {
2385 1 : // Carry through the existing buffer and memory reservation.
2386 1 : memtblOpts.arenaBuf = mem.arenaBuf
2387 1 : memtblOpts.releaseAccountingReservation = mem.releaseAccountingReservation
2388 1 : } else {
2389 1 : mem = new(memTable)
2390 1 : memtblOpts.arenaBuf = manual.New(int(size))
2391 1 : memtblOpts.releaseAccountingReservation = d.opts.Cache.Reserve(int(size))
2392 1 : d.memTableCount.Add(1)
2393 1 : d.memTableReserved.Add(int64(size))
2394 1 :
2395 1 : // Note: this is a no-op if invariants are disabled or race is enabled.
2396 1 : invariants.SetFinalizer(mem, checkMemTable)
2397 1 : }
2398 1 : mem.init(memtblOpts)
2399 1 :
2400 1 : entry := d.newFlushableEntry(mem, logNum, logSeqNum)
2401 1 : entry.releaseMemAccounting = func() {
2402 1 : // If the user leaks iterators, we may be releasing the memtable after
2403 1 : // the DB is already closed. In this case, we want to just release the
2404 1 : // memory because DB.Close won't come along to free it for us.
2405 1 : if err := d.closed.Load(); err != nil {
2406 1 : d.freeMemTable(mem)
2407 1 : return
2408 1 : }
2409 :
2410 : // The next memtable allocation might be able to reuse this memtable.
2411 : // Stash it on d.memTableRecycle.
2412 1 : if unusedMem := d.memTableRecycle.Swap(mem); unusedMem != nil {
2413 1 : // There was already a memtable waiting to be recycled. We're now
2414 1 : // responsible for freeing it.
2415 1 : d.freeMemTable(unusedMem)
2416 1 : }
2417 : }
2418 1 : return mem, entry
2419 : }
2420 :
2421 1 : func (d *DB) freeMemTable(m *memTable) {
2422 1 : d.memTableCount.Add(-1)
2423 1 : d.memTableReserved.Add(-int64(len(m.arenaBuf)))
2424 1 : m.free()
2425 1 : }
2426 :
2427 : func (d *DB) newFlushableEntry(
2428 : f flushable, logNum base.DiskFileNum, logSeqNum base.SeqNum,
2429 1 : ) *flushableEntry {
2430 1 : fe := &flushableEntry{
2431 1 : flushable: f,
2432 1 : flushed: make(chan struct{}),
2433 1 : logNum: logNum,
2434 1 : logSeqNum: logSeqNum,
2435 1 : deleteFn: d.mu.versions.addObsolete,
2436 1 : deleteFnLocked: d.mu.versions.addObsoleteLocked,
2437 1 : }
2438 1 : fe.readerRefs.Store(1)
2439 1 : return fe
2440 1 : }
2441 :
2442 : // maybeInduceWriteStall is called before performing a memtable rotation in
2443 : // makeRoomForWrite. In some conditions, we prefer to stall the user's write
2444 : // workload rather than continuing to accept writes that may result in resource
2445 : // exhaustion or prohibitively slow reads.
2446 : //
2447 : // There are a couple reasons we might wait to rotate the memtable and
2448 : // instead induce a write stall:
2449 : // 1. If too many memtables have queued, we wait for a flush to finish before
2450 : // creating another memtable.
2451 : // 2. If L0 read amplification has grown too high, we wait for compactions
2452 : // to reduce the read amplification before accepting more writes that will
2453 : // increase write pressure.
2454 : //
2455 : // maybeInduceWriteStall checks these stall conditions, and if present, waits
2456 : // for them to abate.
2457 1 : func (d *DB) maybeInduceWriteStall(b *Batch) {
2458 1 : stalled := false
2459 1 : // This function will call EventListener.WriteStallBegin at most once. If
2460 1 : // it does call it, it will call EventListener.WriteStallEnd once before
2461 1 : // returning.
2462 1 : for {
2463 1 : var size uint64
2464 1 : for i := range d.mu.mem.queue {
2465 1 : size += d.mu.mem.queue[i].totalBytes()
2466 1 : }
2467 : // If ElevateWriteStallThresholdForFailover is true, we give an
2468 : // unlimited memory budget for memtables. This is simpler than trying to
2469 : // configure an explicit value, given that memory resources can vary.
2470 : // When using WAL failover in CockroachDB, an OOM risk is worth
2471 : // tolerating for workloads that have a strict latency SLO. Also, an
2472 : // unlimited budget here does not mean that the disk stall in the
2473 : // primary will go unnoticed until the OOM -- CockroachDB is monitoring
2474 : // disk stalls, and we expect it to fail the node after ~60s if the
2475 : // primary is stalled.
2476 1 : if size >= uint64(d.opts.MemTableStopWritesThreshold)*d.opts.MemTableSize &&
2477 1 : !d.mu.log.manager.ElevateWriteStallThresholdForFailover() {
2478 1 : // We have filled up the current memtable, but already queued memtables
2479 1 : // are still flushing, so we wait.
2480 1 : if !stalled {
2481 1 : stalled = true
2482 1 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2483 1 : Reason: "memtable count limit reached",
2484 1 : })
2485 1 : }
2486 1 : now := time.Now()
2487 1 : d.mu.compact.cond.Wait()
2488 1 : if b != nil {
2489 1 : b.commitStats.MemTableWriteStallDuration += time.Since(now)
2490 1 : }
2491 1 : continue
2492 : }
2493 1 : l0ReadAmp := d.mu.versions.currentVersion().L0Sublevels.ReadAmplification()
2494 1 : if l0ReadAmp >= d.opts.L0StopWritesThreshold {
2495 1 : // There are too many level-0 files, so we wait.
2496 1 : if !stalled {
2497 1 : stalled = true
2498 1 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2499 1 : Reason: "L0 file count limit exceeded",
2500 1 : })
2501 1 : }
2502 1 : now := time.Now()
2503 1 : d.mu.compact.cond.Wait()
2504 1 : if b != nil {
2505 1 : b.commitStats.L0ReadAmpWriteStallDuration += time.Since(now)
2506 1 : }
2507 1 : continue
2508 : }
2509 : // Not stalled.
2510 1 : if stalled {
2511 1 : d.opts.EventListener.WriteStallEnd()
2512 1 : }
2513 1 : return
2514 : }
2515 : }
2516 :
2517 : // makeRoomForWrite rotates the current mutable memtable, ensuring that the
2518 : // resulting mutable memtable has room to hold the contents of the provided
2519 : // Batch. The current memtable is rotated (marked as immutable) and a new
2520 : // mutable memtable is allocated. It reserves space in the new memtable and adds
2521 : // a reference to the memtable. The caller must later ensure that the memtable
2522 : // is unreferenced. This memtable rotation also causes a log rotation.
2523 : //
2524 : // If the current memtable is not full but the caller wishes to trigger a
2525 : // rotation regardless, the caller may pass a nil Batch, and no space in the
2526 : // resulting mutable memtable will be reserved.
2527 : //
2528 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2529 : // may be released and reacquired.
2530 1 : func (d *DB) makeRoomForWrite(b *Batch) error {
2531 1 : if b != nil && b.ingestedSSTBatch {
2532 0 : panic("pebble: invalid function call")
2533 : }
2534 1 : d.maybeInduceWriteStall(b)
2535 1 :
2536 1 : var newLogNum base.DiskFileNum
2537 1 : var prevLogSize uint64
2538 1 : if !d.opts.DisableWAL {
2539 1 : now := time.Now()
2540 1 : newLogNum, prevLogSize = d.rotateWAL()
2541 1 : if b != nil {
2542 1 : b.commitStats.WALRotationDuration += time.Since(now)
2543 1 : }
2544 : }
2545 1 : immMem := d.mu.mem.mutable
2546 1 : imm := d.mu.mem.queue[len(d.mu.mem.queue)-1]
2547 1 : imm.logSize = prevLogSize
2548 1 :
2549 1 : var logSeqNum base.SeqNum
2550 1 : var minSize uint64
2551 1 : if b != nil {
2552 1 : logSeqNum = b.SeqNum()
2553 1 : if b.flushable != nil {
2554 1 : logSeqNum += base.SeqNum(b.Count())
2555 1 : // The batch is too large to fit in the memtable so add it directly to
2556 1 : // the immutable queue. The flushable batch is associated with the same
2557 1 : // log as the immutable memtable, but logically occurs after it in
2558 1 : // seqnum space. We ensure while flushing that the flushable batch
2559 1 : // is flushed along with the previous memtable in the flushable
2560 1 : // queue. See the top level comment in DB.flush1 to learn how this
2561 1 : // is ensured.
2562 1 : //
2563 1 : // See DB.commitWrite for the special handling of log writes for large
2564 1 : // batches. In particular, the large batch has already written to
2565 1 : // imm.logNum.
2566 1 : entry := d.newFlushableEntry(b.flushable, imm.logNum, b.SeqNum())
2567 1 : // The large batch is by definition large. Reserve space from the cache
2568 1 : // for it until it is flushed.
2569 1 : entry.releaseMemAccounting = d.opts.Cache.Reserve(int(b.flushable.totalBytes()))
2570 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2571 1 : } else {
2572 1 : minSize = b.memTableSize
2573 1 : }
2574 1 : } else {
2575 1 : // b == nil
2576 1 : //
2577 1 : // This is a manual forced flush.
2578 1 : logSeqNum = base.SeqNum(d.mu.versions.logSeqNum.Load())
2579 1 : imm.flushForced = true
2580 1 : // If we are manually flushing and we used less than half of the bytes in
2581 1 : // the memtable, don't increase the size for the next memtable. This
2582 1 : // reduces memtable memory pressure when an application is frequently
2583 1 : // manually flushing.
2584 1 : if uint64(immMem.availBytes()) > immMem.totalBytes()/2 {
2585 1 : d.mu.mem.nextSize = immMem.totalBytes()
2586 1 : }
2587 : }
2588 1 : d.rotateMemtable(newLogNum, logSeqNum, immMem, minSize)
2589 1 : if b != nil && b.flushable == nil {
2590 1 : err := d.mu.mem.mutable.prepare(b)
2591 1 : // Reserving enough space for the batch after rotation must never fail.
2592 1 : // We pass in a minSize that's equal to b.memtableSize to ensure that
2593 1 : // memtable rotation allocates a memtable sufficiently large. We also
2594 1 : // held d.commit.mu for the entirety of this function, ensuring that no
2595 1 : // other committers may have reserved memory in the new memtable yet.
2596 1 : if err == arenaskl.ErrArenaFull {
2597 0 : panic(errors.AssertionFailedf("memtable still full after rotation"))
2598 : }
2599 1 : return err
2600 : }
2601 1 : return nil
2602 : }
2603 :
2604 : // Both DB.mu and commitPipeline.mu must be held by the caller.
2605 : func (d *DB) rotateMemtable(
2606 : newLogNum base.DiskFileNum, logSeqNum base.SeqNum, prev *memTable, minSize uint64,
2607 1 : ) {
2608 1 : // Create a new memtable, scheduling the previous one for flushing. We do
2609 1 : // this even if the previous memtable was empty because the DB.Flush
2610 1 : // mechanism is dependent on being able to wait for the empty memtable to
2611 1 : // flush. We can't just mark the empty memtable as flushed here because we
2612 1 : // also have to wait for all previous immutable tables to
2613 1 : // flush. Additionally, the memtable is tied to particular WAL file and we
2614 1 : // want to go through the flush path in order to recycle that WAL file.
2615 1 : //
2616 1 : // NB: newLogNum corresponds to the WAL that contains mutations that are
2617 1 : // present in the new memtable. When immutable memtables are flushed to
2618 1 : // disk, a VersionEdit will be created telling the manifest the minimum
2619 1 : // unflushed log number (which will be the next one in d.mu.mem.mutable
2620 1 : // that was not flushed).
2621 1 : //
2622 1 : // NB: prev should be the current mutable memtable.
2623 1 : var entry *flushableEntry
2624 1 : d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum, minSize)
2625 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2626 1 : // d.logSize tracks the log size of the WAL file corresponding to the most
2627 1 : // recent flushable. The log size of the previous mutable memtable no longer
2628 1 : // applies to the current mutable memtable.
2629 1 : //
2630 1 : // It's tempting to perform this update in rotateWAL, but that would not be
2631 1 : // atomic with the enqueue of the new flushable. A call to DB.Metrics()
2632 1 : // could acquire DB.mu after the WAL has been rotated but before the new
2633 1 : // memtable has been appended; this would result in omitting the log size of
2634 1 : // the most recent flushable.
2635 1 : d.logSize.Store(0)
2636 1 : d.updateReadStateLocked(nil)
2637 1 : if prev.writerUnref() {
2638 1 : d.maybeScheduleFlush()
2639 1 : }
2640 : }
2641 :
2642 : // rotateWAL creates a new write-ahead log, possibly recycling a previous WAL's
2643 : // files. It returns the file number assigned to the new WAL, and the size of
2644 : // the previous WAL file.
2645 : //
2646 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2647 : // may be released and reacquired.
2648 1 : func (d *DB) rotateWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
2649 1 : if d.opts.DisableWAL {
2650 0 : panic("pebble: invalid function call")
2651 : }
2652 1 : jobID := d.newJobIDLocked()
2653 1 : newLogNum = d.mu.versions.getNextDiskFileNum()
2654 1 :
2655 1 : d.mu.Unlock()
2656 1 : // Close the previous log first. This writes an EOF trailer
2657 1 : // signifying the end of the file and syncs it to disk. We must
2658 1 : // close the previous log before linking the new log file,
2659 1 : // otherwise a crash could leave both logs with unclean tails, and
2660 1 : // Open will treat the previous log as corrupt.
2661 1 : offset, err := d.mu.log.writer.Close()
2662 1 : if err != nil {
2663 0 : // What to do here? Stumbling on doesn't seem worthwhile. If we failed to
2664 0 : // close the previous log it is possible we lost a write.
2665 0 : panic(err)
2666 : }
2667 1 : prevLogSize = uint64(offset)
2668 1 : metrics := d.mu.log.writer.Metrics()
2669 1 :
2670 1 : d.mu.Lock()
2671 1 : if err := d.mu.log.metrics.LogWriterMetrics.Merge(&metrics); err != nil {
2672 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2673 0 : }
2674 :
2675 1 : d.mu.Unlock()
2676 1 : writer, err := d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
2677 1 : if err != nil {
2678 0 : panic(err)
2679 : }
2680 :
2681 1 : d.mu.Lock()
2682 1 : d.mu.log.writer = writer
2683 1 : return newLogNum, prevLogSize
2684 : }
2685 :
2686 1 : func (d *DB) getEarliestUnflushedSeqNumLocked() base.SeqNum {
2687 1 : seqNum := base.SeqNumMax
2688 1 : for i := range d.mu.mem.queue {
2689 1 : logSeqNum := d.mu.mem.queue[i].logSeqNum
2690 1 : if seqNum > logSeqNum {
2691 1 : seqNum = logSeqNum
2692 1 : }
2693 : }
2694 1 : return seqNum
2695 : }
2696 :
2697 1 : func (d *DB) getInProgressCompactionInfoLocked(finishing *compaction) (rv []compactionInfo) {
2698 1 : for c := range d.mu.compact.inProgress {
2699 1 : if len(c.flushing) == 0 && (finishing == nil || c != finishing) {
2700 1 : info := compactionInfo{
2701 1 : versionEditApplied: c.versionEditApplied,
2702 1 : inputs: c.inputs,
2703 1 : smallest: c.smallest,
2704 1 : largest: c.largest,
2705 1 : outputLevel: -1,
2706 1 : }
2707 1 : if c.outputLevel != nil {
2708 1 : info.outputLevel = c.outputLevel.level
2709 1 : }
2710 1 : rv = append(rv, info)
2711 : }
2712 : }
2713 1 : return
2714 : }
2715 :
2716 1 : func inProgressL0Compactions(inProgress []compactionInfo) []manifest.L0Compaction {
2717 1 : var compactions []manifest.L0Compaction
2718 1 : for _, info := range inProgress {
2719 1 : // Skip in-progress compactions that have already committed; the L0
2720 1 : // sublevels initialization code requires the set of in-progress
2721 1 : // compactions to be consistent with the current version. Compactions
2722 1 : // with versionEditApplied=true are already applied to the current
2723 1 : // version and but are performing cleanup without the database mutex.
2724 1 : if info.versionEditApplied {
2725 1 : continue
2726 : }
2727 1 : l0 := false
2728 1 : for _, cl := range info.inputs {
2729 1 : l0 = l0 || cl.level == 0
2730 1 : }
2731 1 : if !l0 {
2732 1 : continue
2733 : }
2734 1 : compactions = append(compactions, manifest.L0Compaction{
2735 1 : Smallest: info.smallest,
2736 1 : Largest: info.largest,
2737 1 : IsIntraL0: info.outputLevel == 0,
2738 1 : })
2739 : }
2740 1 : return compactions
2741 : }
2742 :
2743 : // firstError returns the first non-nil error of err0 and err1, or nil if both
2744 : // are nil.
2745 1 : func firstError(err0, err1 error) error {
2746 1 : if err0 != nil {
2747 1 : return err0
2748 1 : }
2749 1 : return err1
2750 : }
2751 :
2752 : // SetCreatorID sets the CreatorID which is needed in order to use shared objects.
2753 : // Remote object usage is disabled until this method is called the first time.
2754 : // Once set, the Creator ID is persisted and cannot change.
2755 : //
2756 : // Does nothing if SharedStorage was not set in the options when the DB was
2757 : // opened or if the DB is in read-only mode.
2758 1 : func (d *DB) SetCreatorID(creatorID uint64) error {
2759 1 : if d.opts.Experimental.RemoteStorage == nil || d.opts.ReadOnly {
2760 0 : return nil
2761 0 : }
2762 1 : return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
2763 : }
2764 :
2765 : // KeyStatistics keeps track of the number of keys that have been pinned by a
2766 : // snapshot as well as counts of the different key kinds in the lsm.
2767 : //
2768 : // One way of using the accumulated stats, when we only have sets and dels,
2769 : // and say the counts are represented as del_count, set_count,
2770 : // del_latest_count, set_latest_count, snapshot_pinned_count.
2771 : //
2772 : // - del_latest_count + set_latest_count is the set of unique user keys
2773 : // (unique).
2774 : //
2775 : // - set_latest_count is the set of live unique user keys (live_unique).
2776 : //
2777 : // - Garbage is del_count + set_count - live_unique.
2778 : //
2779 : // - If everything were in the LSM, del_count+set_count-snapshot_pinned_count
2780 : // would also be the set of unique user keys (note that
2781 : // snapshot_pinned_count is counting something different -- see comment below).
2782 : // But snapshot_pinned_count only counts keys in the LSM so the excess here
2783 : // must be keys in memtables.
2784 : type KeyStatistics struct {
2785 : // TODO(sumeer): the SnapshotPinned* are incorrect in that these older
2786 : // versions can be in a different level. Either fix the accounting or
2787 : // rename these fields.
2788 :
2789 : // SnapshotPinnedKeys represents obsolete keys that cannot be elided during
2790 : // a compaction, because they are required by an open snapshot.
2791 : SnapshotPinnedKeys int
2792 : // SnapshotPinnedKeysBytes is the total number of bytes of all snapshot
2793 : // pinned keys.
2794 : SnapshotPinnedKeysBytes uint64
2795 : // KindsCount is the count for each kind of key. It includes point keys,
2796 : // range deletes and range keys.
2797 : KindsCount [InternalKeyKindMax + 1]int
2798 : // LatestKindsCount is the count for each kind of key when it is the latest
2799 : // kind for a user key. It is only populated for point keys.
2800 : LatestKindsCount [InternalKeyKindMax + 1]int
2801 : }
2802 :
2803 : // LSMKeyStatistics is used by DB.ScanStatistics.
2804 : type LSMKeyStatistics struct {
2805 : Accumulated KeyStatistics
2806 : // Levels contains statistics only for point keys. Range deletions and range keys will
2807 : // appear in Accumulated but not Levels.
2808 : Levels [numLevels]KeyStatistics
2809 : // BytesRead represents the logical, pre-compression size of keys and values read
2810 : BytesRead uint64
2811 : }
2812 :
2813 : // ScanStatisticsOptions is used by DB.ScanStatistics.
2814 : type ScanStatisticsOptions struct {
2815 : // LimitBytesPerSecond indicates the number of bytes that are able to be read
2816 : // per second using ScanInternal.
2817 : // A value of 0 indicates that there is no limit set.
2818 : LimitBytesPerSecond int64
2819 : }
2820 :
2821 : // ScanStatistics returns the count of different key kinds within the lsm for a
2822 : // key span [lower, upper) as well as the number of snapshot keys.
2823 : func (d *DB) ScanStatistics(
2824 : ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions,
2825 1 : ) (LSMKeyStatistics, error) {
2826 1 : stats := LSMKeyStatistics{}
2827 1 : var prevKey InternalKey
2828 1 : var rateLimitFunc func(key *InternalKey, val LazyValue) error
2829 1 : tb := tokenbucket.TokenBucket{}
2830 1 :
2831 1 : if opts.LimitBytesPerSecond != 0 {
2832 0 : // Each "token" roughly corresponds to a byte that was read.
2833 0 : tb.Init(tokenbucket.TokensPerSecond(opts.LimitBytesPerSecond), tokenbucket.Tokens(1024))
2834 0 : rateLimitFunc = func(key *InternalKey, val LazyValue) error {
2835 0 : return tb.WaitCtx(ctx, tokenbucket.Tokens(key.Size()+val.Len()))
2836 0 : }
2837 : }
2838 :
2839 1 : scanInternalOpts := &scanInternalOptions{
2840 1 : visitPointKey: func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
2841 1 : // If the previous key is equal to the current point key, the current key was
2842 1 : // pinned by a snapshot.
2843 1 : size := uint64(key.Size())
2844 1 : kind := key.Kind()
2845 1 : sameKey := d.equal(prevKey.UserKey, key.UserKey)
2846 1 : if iterInfo.Kind == IteratorLevelLSM && sameKey {
2847 1 : stats.Levels[iterInfo.Level].SnapshotPinnedKeys++
2848 1 : stats.Levels[iterInfo.Level].SnapshotPinnedKeysBytes += size
2849 1 : stats.Accumulated.SnapshotPinnedKeys++
2850 1 : stats.Accumulated.SnapshotPinnedKeysBytes += size
2851 1 : }
2852 1 : if iterInfo.Kind == IteratorLevelLSM {
2853 1 : stats.Levels[iterInfo.Level].KindsCount[kind]++
2854 1 : }
2855 1 : if !sameKey {
2856 1 : if iterInfo.Kind == IteratorLevelLSM {
2857 1 : stats.Levels[iterInfo.Level].LatestKindsCount[kind]++
2858 1 : }
2859 1 : stats.Accumulated.LatestKindsCount[kind]++
2860 : }
2861 :
2862 1 : stats.Accumulated.KindsCount[kind]++
2863 1 : prevKey.CopyFrom(*key)
2864 1 : stats.BytesRead += uint64(key.Size() + value.Len())
2865 1 : return nil
2866 : },
2867 0 : visitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
2868 0 : stats.Accumulated.KindsCount[InternalKeyKindRangeDelete]++
2869 0 : stats.BytesRead += uint64(len(start) + len(end))
2870 0 : return nil
2871 0 : },
2872 0 : visitRangeKey: func(start, end []byte, keys []rangekey.Key) error {
2873 0 : stats.BytesRead += uint64(len(start) + len(end))
2874 0 : for _, key := range keys {
2875 0 : stats.Accumulated.KindsCount[key.Kind()]++
2876 0 : stats.BytesRead += uint64(len(key.Value) + len(key.Suffix))
2877 0 : }
2878 0 : return nil
2879 : },
2880 : includeObsoleteKeys: true,
2881 : IterOptions: IterOptions{
2882 : KeyTypes: IterKeyTypePointsAndRanges,
2883 : LowerBound: lower,
2884 : UpperBound: upper,
2885 : },
2886 : rateLimitFunc: rateLimitFunc,
2887 : }
2888 1 : iter, err := d.newInternalIter(ctx, snapshotIterOpts{}, scanInternalOpts)
2889 1 : if err != nil {
2890 0 : return LSMKeyStatistics{}, err
2891 0 : }
2892 1 : defer iter.close()
2893 1 :
2894 1 : err = scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
2895 1 :
2896 1 : if err != nil {
2897 0 : return LSMKeyStatistics{}, err
2898 0 : }
2899 :
2900 1 : return stats, nil
2901 : }
2902 :
2903 : // ObjProvider returns the objstorage.Provider for this database. Meant to be
2904 : // used for internal purposes only.
2905 1 : func (d *DB) ObjProvider() objstorage.Provider {
2906 1 : return d.objProvider
2907 1 : }
2908 :
2909 1 : func (d *DB) checkVirtualBounds(m *fileMetadata) {
2910 1 : if !invariants.Enabled {
2911 0 : return
2912 0 : }
2913 :
2914 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, m.FileBacking.DiskFileNum)
2915 1 : if err != nil {
2916 0 : panic(err)
2917 : }
2918 1 : if objMeta.IsExternal() {
2919 0 : // Nothing to do; bounds are expected to be loose.
2920 0 : return
2921 0 : }
2922 :
2923 1 : iters, err := d.newIters(context.TODO(), m, nil, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
2924 1 : if err != nil {
2925 0 : panic(errors.Wrap(err, "pebble: error creating iterators"))
2926 : }
2927 1 : defer iters.CloseAll()
2928 1 :
2929 1 : if m.HasPointKeys {
2930 1 : pointIter := iters.Point()
2931 1 : rangeDelIter := iters.RangeDeletion()
2932 1 :
2933 1 : // Check that the lower bound is tight.
2934 1 : pointKV := pointIter.First()
2935 1 : rangeDel, err := rangeDelIter.First()
2936 1 : if err != nil {
2937 0 : panic(err)
2938 : }
2939 1 : if (rangeDel == nil || d.cmp(rangeDel.SmallestKey().UserKey, m.SmallestPointKey.UserKey) != 0) &&
2940 1 : (pointKV == nil || d.cmp(pointKV.K.UserKey, m.SmallestPointKey.UserKey) != 0) {
2941 0 : panic(errors.Newf("pebble: virtual sstable %s lower point key bound is not tight", m.FileNum))
2942 : }
2943 :
2944 : // Check that the upper bound is tight.
2945 1 : pointKV = pointIter.Last()
2946 1 : rangeDel, err = rangeDelIter.Last()
2947 1 : if err != nil {
2948 0 : panic(err)
2949 : }
2950 1 : if (rangeDel == nil || d.cmp(rangeDel.LargestKey().UserKey, m.LargestPointKey.UserKey) != 0) &&
2951 1 : (pointKV == nil || d.cmp(pointKV.K.UserKey, m.LargestPointKey.UserKey) != 0) {
2952 0 : panic(errors.Newf("pebble: virtual sstable %s upper point key bound is not tight", m.FileNum))
2953 : }
2954 :
2955 : // Check that iterator keys are within bounds.
2956 1 : for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
2957 1 : if d.cmp(kv.K.UserKey, m.SmallestPointKey.UserKey) < 0 || d.cmp(kv.K.UserKey, m.LargestPointKey.UserKey) > 0 {
2958 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, kv.K.UserKey))
2959 : }
2960 : }
2961 1 : s, err := rangeDelIter.First()
2962 1 : for ; s != nil; s, err = rangeDelIter.Next() {
2963 1 : if d.cmp(s.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
2964 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.SmallestKey().UserKey))
2965 : }
2966 1 : if d.cmp(s.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
2967 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.LargestKey().UserKey))
2968 : }
2969 : }
2970 1 : if err != nil {
2971 0 : panic(err)
2972 : }
2973 : }
2974 :
2975 1 : if !m.HasRangeKeys {
2976 1 : return
2977 1 : }
2978 0 : rangeKeyIter := iters.RangeKey()
2979 0 :
2980 0 : // Check that the lower bound is tight.
2981 0 : if s, err := rangeKeyIter.First(); err != nil {
2982 0 : panic(err)
2983 0 : } else if d.cmp(s.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) != 0 {
2984 0 : panic(errors.Newf("pebble: virtual sstable %s lower range key bound is not tight", m.FileNum))
2985 : }
2986 :
2987 : // Check that upper bound is tight.
2988 0 : if s, err := rangeKeyIter.Last(); err != nil {
2989 0 : panic(err)
2990 0 : } else if d.cmp(s.LargestKey().UserKey, m.LargestRangeKey.UserKey) != 0 {
2991 0 : panic(errors.Newf("pebble: virtual sstable %s upper range key bound is not tight", m.FileNum))
2992 : }
2993 :
2994 0 : s, err := rangeKeyIter.First()
2995 0 : for ; s != nil; s, err = rangeKeyIter.Next() {
2996 0 : if d.cmp(s.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) < 0 {
2997 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.SmallestKey().UserKey))
2998 : }
2999 0 : if d.cmp(s.LargestKey().UserKey, m.LargestRangeKey.UserKey) > 0 {
3000 0 : panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.LargestKey().UserKey))
3001 : }
3002 : }
3003 0 : if err != nil {
3004 0 : panic(err)
3005 : }
3006 : }
3007 :
3008 : // DebugString returns a debugging string describing the LSM.
3009 0 : func (d *DB) DebugString() string {
3010 0 : return d.DebugCurrentVersion().DebugString()
3011 0 : }
3012 :
3013 : // DebugCurrentVersion returns the current LSM tree metadata. Should only be
3014 : // used for testing/debugging.
3015 0 : func (d *DB) DebugCurrentVersion() *manifest.Version {
3016 0 : d.mu.Lock()
3017 0 : defer d.mu.Unlock()
3018 0 : return d.mu.versions.currentVersion()
3019 0 : }
|