Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package pebble provides an ordered key/value store.
6 : package pebble // import "github.com/cockroachdb/pebble"
7 :
8 : import (
9 : "context"
10 : "fmt"
11 : "io"
12 : "reflect"
13 : "slices"
14 : "sync"
15 : "sync/atomic"
16 : "time"
17 : "unsafe"
18 :
19 : "github.com/cockroachdb/crlib/crtime"
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/pebble/internal/arenaskl"
22 : "github.com/cockroachdb/pebble/internal/base"
23 : "github.com/cockroachdb/pebble/internal/bytesprofile"
24 : "github.com/cockroachdb/pebble/internal/cache"
25 : "github.com/cockroachdb/pebble/internal/deletepacer"
26 : "github.com/cockroachdb/pebble/internal/inflight"
27 : "github.com/cockroachdb/pebble/internal/invalidating"
28 : "github.com/cockroachdb/pebble/internal/invariants"
29 : "github.com/cockroachdb/pebble/internal/keyspan"
30 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
31 : "github.com/cockroachdb/pebble/internal/manifest"
32 : "github.com/cockroachdb/pebble/internal/manual"
33 : "github.com/cockroachdb/pebble/internal/problemspans"
34 : "github.com/cockroachdb/pebble/metrics"
35 : "github.com/cockroachdb/pebble/objstorage"
36 : "github.com/cockroachdb/pebble/objstorage/remote"
37 : "github.com/cockroachdb/pebble/rangekey"
38 : "github.com/cockroachdb/pebble/record"
39 : "github.com/cockroachdb/pebble/sstable"
40 : "github.com/cockroachdb/pebble/sstable/blob"
41 : "github.com/cockroachdb/pebble/sstable/block"
42 : "github.com/cockroachdb/pebble/vfs/atomicfs"
43 : "github.com/cockroachdb/pebble/wal"
44 : "github.com/cockroachdb/tokenbucket"
45 : "github.com/prometheus/client_golang/prometheus"
46 : )
47 :
48 : const (
49 : // minFileCacheSize is the minimum size of the file cache, for a single db.
50 : minFileCacheSize = 64
51 :
52 : // numNonFileCacheFiles is an approximation for the number of files
53 : // that we don't account for in the file cache, for a given db.
54 : numNonFileCacheFiles = 10
55 : )
56 :
57 : var (
58 : // ErrNotFound is returned when a get operation does not find the requested
59 : // key.
60 : ErrNotFound = base.ErrNotFound
61 : // ErrClosed is panicked when an operation is performed on a closed snapshot or
62 : // DB. Use errors.Is(err, ErrClosed) to check for this error.
63 : ErrClosed = errors.New("pebble: closed")
64 : // ErrReadOnly is returned when a write operation is performed on a read-only
65 : // database.
66 : ErrReadOnly = errors.New("pebble: read-only")
67 : // errNoSplit indicates that the user is trying to perform a range key
68 : // operation but the configured Comparer does not provide a Split
69 : // implementation.
70 : errNoSplit = errors.New("pebble: Comparer.Split required for range key operations")
71 : )
72 :
73 : // Reader is a readable key/value store.
74 : //
75 : // It is safe to call Get and NewIter from concurrent goroutines.
76 : type Reader interface {
77 : // Get gets the value for the given key. It returns ErrNotFound if the DB
78 : // does not contain the key.
79 : //
80 : // The caller should not modify the contents of the returned slice, but it is
81 : // safe to modify the contents of the argument after Get returns. The
82 : // returned slice will remain valid until the returned Closer is closed. On
83 : // success, the caller MUST call closer.Close() or a memory leak will occur.
84 : Get(key []byte) (value []byte, closer io.Closer, err error)
85 :
86 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
87 : // return false). The iterator can be positioned via a call to SeekGE,
88 : // SeekLT, First or Last.
89 : NewIter(o *IterOptions) (*Iterator, error)
90 :
91 : // NewIterWithContext is like NewIter, and additionally accepts a context
92 : // for tracing.
93 : NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error)
94 :
95 : // Close closes the Reader. It may or may not close any underlying io.Reader
96 : // or io.Writer, depending on how the DB was created.
97 : //
98 : // It is not safe to close a DB until all outstanding iterators are closed.
99 : // It is valid to call Close multiple times. Other methods should not be
100 : // called after the DB has been closed.
101 : Close() error
102 : }
103 :
104 : // Writer is a writable key/value store.
105 : //
106 : // Goroutine safety is dependent on the specific implementation.
107 : type Writer interface {
108 : // Apply the operations contained in the batch to the DB.
109 : //
110 : // It is safe to modify the contents of the arguments after Apply returns.
111 : Apply(batch *Batch, o *WriteOptions) error
112 :
113 : // Delete deletes the value for the given key. Deletes are blind all will
114 : // succeed even if the given key does not exist.
115 : //
116 : // It is safe to modify the contents of the arguments after Delete returns.
117 : Delete(key []byte, o *WriteOptions) error
118 :
119 : // DeleteSized behaves identically to Delete, but takes an additional
120 : // argument indicating the size of the value being deleted. DeleteSized
121 : // should be preferred when the caller has the expectation that there exists
122 : // a single internal KV pair for the key (eg, the key has not been
123 : // overwritten recently), and the caller knows the size of its value.
124 : //
125 : // DeleteSized will record the value size within the tombstone and use it to
126 : // inform compaction-picking heuristics which strive to reduce space
127 : // amplification in the LSM. This "calling your shot" mechanic allows the
128 : // storage engine to more accurately estimate and reduce space
129 : // amplification.
130 : //
131 : // It is safe to modify the contents of the arguments after DeleteSized
132 : // returns.
133 : DeleteSized(key []byte, valueSize uint32, _ *WriteOptions) error
134 :
135 : // SingleDelete is similar to Delete in that it deletes the value for the given key. Like Delete,
136 : // it is a blind operation that will succeed even if the given key does not exist.
137 : //
138 : // WARNING: Undefined (non-deterministic) behavior will result if a key is overwritten and
139 : // then deleted using SingleDelete. The record may appear deleted immediately, but be
140 : // resurrected at a later time after compactions have been performed. Or the record may
141 : // be deleted permanently. A Delete operation lays down a "tombstone" which shadows all
142 : // previous versions of a key. The SingleDelete operation is akin to "anti-matter" and will
143 : // only delete the most recently written version for a key. These different semantics allow
144 : // the DB to avoid propagating a SingleDelete operation during a compaction as soon as the
145 : // corresponding Set operation is encountered. These semantics require extreme care to handle
146 : // properly. Only use if you have a workload where the performance gain is critical and you
147 : // can guarantee that a record is written once and then deleted once.
148 : //
149 : // Note that SINGLEDEL, SET, SINGLEDEL, SET, DEL/RANGEDEL, ... from most
150 : // recent to older will work as intended since there is a single SET
151 : // sandwiched between SINGLEDEL/DEL/RANGEDEL.
152 : //
153 : // IMPLEMENTATION WARNING: By offering SingleDelete, Pebble must guarantee
154 : // that there is no duplication of writes inside Pebble. That is, idempotent
155 : // application of writes is insufficient. For example, if a SET operation
156 : // gets duplicated inside Pebble, resulting in say SET#20 and SET#17, the
157 : // caller may issue a SINGLEDEL#25 and it will not have the desired effect.
158 : // A duplication where a SET#20 is duplicated across two sstables will have
159 : // the same correctness problem, since the SINGLEDEL may meet one of the
160 : // SETs. This guarantee is partially achieved by ensuring that a WAL and a
161 : // flushable are usually in one-to-one correspondence, and atomically
162 : // updating the MANIFEST when the flushable is flushed (which ensures the
163 : // WAL will never be replayed). There is one exception: a flushableBatch (a
164 : // batch too large to fit in a memtable) is written to the end of the WAL
165 : // that it shares with the preceding memtable. This is safe because the
166 : // memtable and the flushableBatch are part of the same flush (see DB.flush1
167 : // where this invariant is maintained). If the memtable were to be flushed
168 : // without the flushableBatch, the WAL cannot yet be deleted and if a crash
169 : // happened, the WAL would be replayed despite the memtable already being
170 : // flushed.
171 : //
172 : // It is safe to modify the contents of the arguments after SingleDelete returns.
173 : SingleDelete(key []byte, o *WriteOptions) error
174 :
175 : // DeleteRange deletes all of the point keys (and values) in the range
176 : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
177 : // delete overlapping range keys (eg, keys set via RangeKeySet).
178 : //
179 : // It is safe to modify the contents of the arguments after DeleteRange
180 : // returns.
181 : DeleteRange(start, end []byte, o *WriteOptions) error
182 :
183 : // LogData adds the specified to the batch. The data will be written to the
184 : // WAL, but not added to memtables or sstables. Log data is never indexed,
185 : // which makes it useful for testing WAL performance.
186 : //
187 : // It is safe to modify the contents of the argument after LogData returns.
188 : LogData(data []byte, opts *WriteOptions) error
189 :
190 : // Merge merges the value for the given key. The details of the merge are
191 : // dependent upon the configured merge operation.
192 : //
193 : // It is safe to modify the contents of the arguments after Merge returns.
194 : Merge(key, value []byte, o *WriteOptions) error
195 :
196 : // Set sets the value for the given key. It overwrites any previous value
197 : // for that key; a DB is not a multi-map.
198 : //
199 : // It is safe to modify the contents of the arguments after Set returns.
200 : Set(key, value []byte, o *WriteOptions) error
201 :
202 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
203 : // timestamp suffix to value. The suffix is optional. If any portion of the key
204 : // range [start, end) is already set by a range key with the same suffix value,
205 : // RangeKeySet overrides it.
206 : //
207 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
208 : RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error
209 :
210 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
211 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
212 : // range key. RangeKeyUnset only removes portions of range keys that fall within
213 : // the [start, end) key span, and only range keys with suffixes that exactly
214 : // match the unset suffix.
215 : //
216 : // It is safe to modify the contents of the arguments after RangeKeyUnset
217 : // returns.
218 : RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error
219 :
220 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
221 : // (inclusive on start, exclusive on end). It does not delete point keys (for
222 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
223 : // bounds, including those with or without suffixes.
224 : //
225 : // It is safe to modify the contents of the arguments after RangeKeyDelete
226 : // returns.
227 : RangeKeyDelete(start, end []byte, opts *WriteOptions) error
228 : }
229 :
230 : // DB provides a concurrent, persistent ordered key/value store.
231 : //
232 : // A DB's basic operations (Get, Set, Delete) should be self-explanatory. Get
233 : // and Delete will return ErrNotFound if the requested key is not in the store.
234 : // Callers are free to ignore this error.
235 : //
236 : // A DB also allows for iterating over the key/value pairs in key order. If d
237 : // is a DB, the code below prints all key/value pairs whose keys are 'greater
238 : // than or equal to' k:
239 : //
240 : // iter := d.NewIter(readOptions)
241 : // for iter.SeekGE(k); iter.Valid(); iter.Next() {
242 : // fmt.Printf("key=%q value=%q\n", iter.Key(), iter.Value())
243 : // }
244 : // return iter.Close()
245 : //
246 : // The Options struct holds the optional parameters for the DB, including a
247 : // Comparer to define a 'less than' relationship over keys. It is always valid
248 : // to pass a nil *Options, which means to use the default parameter values. Any
249 : // zero field of a non-nil *Options also means to use the default value for
250 : // that parameter. Thus, the code below uses a custom Comparer, but the default
251 : // values for every other parameter:
252 : //
253 : // db := pebble.Open(&Options{
254 : // Comparer: myComparer,
255 : // })
256 : type DB struct {
257 : // The count and size of referenced memtables. This includes memtables
258 : // present in DB.mu.mem.queue, as well as memtables that have been flushed
259 : // but are still referenced by an inuse readState, as well as up to one
260 : // memTable waiting to be reused and stored in d.memTableRecycle.
261 : memTableCount atomic.Int64
262 : memTableReserved atomic.Int64 // number of bytes reserved in the cache for memtables
263 : // memTableRecycle holds a pointer to an obsolete memtable. The next
264 : // memtable allocation will reuse this memtable if it has not already been
265 : // recycled.
266 : memTableRecycle atomic.Pointer[memTable]
267 :
268 : // The logical size of the current WAL.
269 : logSize atomic.Uint64
270 : // The number of input bytes to the log. This is the raw size of the
271 : // batches written to the WAL, without the overhead of the record
272 : // envelopes.
273 : logBytesIn atomic.Uint64
274 :
275 : // The number of bytes available on disk.
276 : diskAvailBytes atomic.Uint64
277 : lowDiskSpaceReporter lowDiskSpaceReporter
278 :
279 : cacheHandle *cache.Handle
280 : dirname string
281 : opts *Options
282 : cmp Compare
283 : equal Equal
284 : merge Merge
285 : split Split
286 : abbreviatedKey AbbreviatedKey
287 : // The threshold for determining when a batch is "large" and will skip being
288 : // inserted into a memtable.
289 : largeBatchThreshold uint64
290 : // The current OPTIONS file number.
291 : optionsFileNum base.DiskFileNum
292 : // The on-disk size of the current OPTIONS file.
293 : optionsFileSize uint64
294 :
295 : // objProvider is used to access and manage SSTs.
296 : objProvider objstorage.Provider
297 :
298 : dirs *resolvedDirs
299 :
300 : fileCache *fileCacheHandle
301 : newIters tableNewIters
302 : tableNewRangeKeyIter keyspanimpl.TableNewSpanIter
303 :
304 : commit *commitPipeline
305 :
306 : // readState provides access to the state needed for reading without needing
307 : // to acquire DB.mu.
308 : readState struct {
309 : sync.RWMutex
310 : val *readState
311 : }
312 :
313 : closed *atomic.Value
314 : closedCh chan struct{}
315 :
316 : deletePacer *deletepacer.DeletePacer
317 :
318 : compactionScheduler CompactionScheduler
319 :
320 : // During an iterator close, we may asynchronously schedule read compactions.
321 : // We want to wait for those goroutines to finish, before closing the DB.
322 : // compactionShedulers.Wait() should not be called while the DB.mu is held.
323 : compactionSchedulers sync.WaitGroup
324 :
325 : // The main mutex protecting internal DB state. This mutex encompasses many
326 : // fields because those fields need to be accessed and updated atomically. In
327 : // particular, the current version, log.*, mem.*, and snapshot list need to
328 : // be accessed and updated atomically during compaction.
329 : //
330 : // Care is taken to avoid holding DB.mu during IO operations. Accomplishing
331 : // this sometimes requires releasing DB.mu in a method that was called with
332 : // it held. See versionSet.UpdateVersionLocked() and DB.makeRoomForWrite() for
333 : // examples. This is a common pattern, so be careful about expectations that
334 : // DB.mu will be held continuously across a set of calls.
335 : mu struct {
336 : sync.Mutex
337 :
338 : formatVers struct {
339 : // vers is the database's current format major version.
340 : // Backwards-incompatible features are gated behind new
341 : // format major versions and not enabled until a database's
342 : // version is ratcheted upwards.
343 : //
344 : // Although this is under the `mu` prefix, readers may read vers
345 : // atomically without holding d.mu. Writers must only write to this
346 : // value through finalizeFormatVersUpgrade which requires d.mu is
347 : // held.
348 : vers atomic.Uint64
349 : // marker is the atomic marker for the format major version.
350 : // When a database's version is ratcheted upwards, the
351 : // marker is moved in order to atomically record the new
352 : // version.
353 : marker *atomicfs.Marker
354 : // ratcheting when set to true indicates that the database is
355 : // currently in the process of ratcheting the format major version
356 : // to vers + 1. As a part of ratcheting the format major version,
357 : // migrations may drop and re-acquire the mutex.
358 : ratcheting bool
359 : }
360 :
361 : // The ID of the next job. Job IDs are passed to event listener
362 : // notifications and act as a mechanism for tying together the events and
363 : // log messages for a single job such as a flush, compaction, or file
364 : // ingestion. Job IDs are not serialized to disk or used for correctness.
365 : nextJobID JobID
366 :
367 : // The collection of immutable versions and state about the log and visible
368 : // sequence numbers. Use the pointer here to ensure the atomic fields in
369 : // version set are aligned properly.
370 : versions *versionSet
371 :
372 : log struct {
373 : // manager is not protected by mu, but calls to Create must be
374 : // serialized, and happen after the previous writer is closed.
375 : manager wal.Manager
376 : // The Writer is protected by commitPipeline.mu. This allows log writes
377 : // to be performed without holding DB.mu, but requires both
378 : // commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
379 : // (i.e. makeRoomForWrite). Can be nil.
380 : writer wal.Writer
381 : metrics struct {
382 : // fsyncLatency has its own internal synchronization, and is not
383 : // protected by mu.
384 : fsyncLatency prometheus.Histogram
385 : // Updated whenever a wal.Writer is closed.
386 : record.LogWriterMetrics
387 : }
388 : }
389 :
390 : mem struct {
391 : // The current mutable memTable. Readers of the pointer may hold
392 : // either DB.mu or commitPipeline.mu.
393 : //
394 : // Its internal fields are protected by commitPipeline.mu. This
395 : // allows batch commits to be performed without DB.mu as long as no
396 : // memtable rotation is required.
397 : //
398 : // Both commitPipeline.mu and DB.mu must be held when rotating the
399 : // memtable.
400 : mutable *memTable
401 : // Queue of flushables (the mutable memtable is at end). Elements are
402 : // added to the end of the slice and removed from the beginning. Once an
403 : // index is set it is never modified making a fixed slice immutable and
404 : // safe for concurrent reads.
405 : queue flushableList
406 : // nextSize is the size of the next memtable. The memtable size starts at
407 : // min(256KB,Options.MemTableSize) and doubles each time a new memtable
408 : // is allocated up to Options.MemTableSize. This reduces the memory
409 : // footprint of memtables when lots of DB instances are used concurrently
410 : // in test environments.
411 : nextSize uint64
412 : }
413 :
414 : compact struct {
415 : // Condition variable used to signal when a flush or compaction has
416 : // completed. Used by the write-stall mechanism to wait for the stall
417 : // condition to clear. See DB.makeRoomForWrite().
418 : cond sync.Cond
419 : // True when a flush is in progress.
420 : flushing bool
421 : // The number of ongoing non-download compactions.
422 : compactingCount int
423 : // The number of calls to compact that have not yet finished. This is different
424 : // from compactingCount, as calls to compact will attempt to schedule and run
425 : // follow-up compactions after the current compaction finishes, dropping db.mu
426 : // in between updating compactingCount and the scheduling operation. This value is
427 : // used in tests in order to track when all compaction activity has finished.
428 : compactProcesses int
429 : // The number of download compactions.
430 : downloadingCount int
431 : // The list of deletion hints, suggesting ranges for delete-only
432 : // compactions.
433 : deletionHints []deleteCompactionHint
434 : // The list of manual compactions. The next manual compaction to perform
435 : // is at the start of the list. New entries are added to the end.
436 : manual []*manualCompaction
437 : manualLen atomic.Int32
438 : // manualID is used to identify manualCompactions in the manual slice.
439 : manualID uint64
440 : // downloads is the list of pending download tasks. The next download to
441 : // perform is at the start of the list. New entries are added to the end.
442 : downloads []*downloadSpanTask
443 : // inProgress is the set of in-progress flushes and compactions.
444 : // It's used in the calculation of some metrics and to initialize L0
445 : // sublevels' state. Some of the compactions contained within this
446 : // map may have already committed an edit to the version but are
447 : // lingering performing cleanup, like deleting obsolete files.
448 : inProgress map[compaction]struct{}
449 : // burstConcurrency is the number of additional compaction
450 : // concurrency slots that have been added in order to allow
451 : // high-priority compactions to run.
452 : //
453 : // Today, it's used when scheduling 'high priority' blob file
454 : // rewrite compactions to reclaim disk space. Normally compactions
455 : // that reclaim disk space and don't reshape the LSM are considered
456 : // low priority. If there's sufficient space amplification,
457 : // heuristics may decide to let these space-reclamation compactions
458 : // run anyways.
459 : //
460 : // In this case we don't want to steal concurrency from the default
461 : // compactions, so we temporarily inflate the allowed compaction
462 : // concurrency for the duration of the compaction. This field is the
463 : // count of compactions within inProgress for which
464 : // UsesBurstConcurrency is true. It's incremented when one begins
465 : // and decremented when it completes.
466 : burstConcurrency atomic.Int32
467 :
468 : // rescheduleReadCompaction indicates to an iterator that a read compaction
469 : // should be scheduled.
470 : rescheduleReadCompaction bool
471 :
472 : // readCompactions is a readCompactionQueue which keeps track of the
473 : // compactions which we might have to perform.
474 : readCompactions readCompactionQueue
475 :
476 : // The cumulative duration of all completed compactions since Open.
477 : // Does not include flushes.
478 : duration time.Duration
479 : // Flush throughput metric.
480 : flushWriteThroughput ThroughputMetric
481 : // The idle start time for the flush "loop", i.e., when the flushing
482 : // bool above transitions to false.
483 : noOngoingFlushStartTime crtime.Mono
484 : }
485 :
486 : fileDeletions struct {
487 : // Non-zero when file cleaning is disableCount. The disableCount
488 : // count acts as a reference count to prohibit file cleaning. See
489 : // DB.{disable,enable}FileDeletions().
490 : disableCount int
491 : }
492 :
493 : snapshots struct {
494 : // The list of active snapshots, including EFOS snapshots that have not
495 : // transitioned. An EFOS snapshot that transitions to FOS is removed
496 : // from this list atomically with the transition (using DB.mu). A closed
497 : // snapshot (including an EFOS snapshot) may still be in this list
498 : // transiently, since the removal is not atomic with the close.
499 : snapshotList
500 :
501 : // The cumulative count and size of snapshot-pinned keys written to
502 : // sstables.
503 : cumulativePinnedCount uint64
504 : cumulativePinnedSize uint64
505 :
506 : // The ongoing excises. These are under snapshots since this is only
507 : // needed to coordinate with EventuallyFileOnlySnapshot creation.
508 : ongoingExcises map[SeqNum]KeyRange
509 : // Signalled when an ongoingExcise is removed.
510 : ongoingExcisesRemovedCond *sync.Cond
511 : }
512 :
513 : tableStats struct {
514 : // Condition variable used to signal the completion of a
515 : // job to collect table stats.
516 : cond sync.Cond
517 : // True when a stat collection operation is in progress.
518 : loading bool
519 : // True if stat collection has loaded statistics for all tables
520 : // other than those listed explicitly in pending. This flag starts
521 : // as false when a database is opened and flips to true once stat
522 : // collection has caught up.
523 : loadedInitial bool
524 : // A slice of files for which stats have not been computed.
525 : // Compactions, ingests, flushes append files to be processed. An
526 : // active stat collection goroutine clears the list and processes
527 : // them.
528 : pending []manifest.NewTableEntry
529 : }
530 :
531 : tableValidation struct {
532 : // cond is a condition variable used to signal the completion of a
533 : // job to validate one or more sstables.
534 : cond sync.Cond
535 : // pending is a slice of metadata for sstables waiting to be
536 : // validated. Only physical sstables should be added to the pending
537 : // queue.
538 : pending []manifest.NewTableEntry
539 : // validating is set to true when validation is running.
540 : validating bool
541 : }
542 : }
543 :
544 : tableDiskUsageAnnotator manifest.TableAnnotator[TableUsageByPlacement]
545 : blobFileDiskUsageAnnotator manifest.BlobFileAnnotator[metrics.CountAndSizeByPlacement]
546 :
547 : // problemSpans keeps track of spans of keys within LSM levels where
548 : // compactions have failed; used to avoid retrying these compactions too
549 : // quickly.
550 : problemSpans problemspans.ByLevel
551 :
552 : // the time at database Open; may be used to compute metrics like effective
553 : // compaction concurrency
554 : openedAt time.Time
555 :
556 : compressionCounters block.CompressionCounters
557 :
558 : iterTracker *inflight.Tracker
559 : valueRetrievalProfile atomic.Pointer[bytesprofile.Profile]
560 : }
561 :
562 : var _ Reader = (*DB)(nil)
563 : var _ Writer = (*DB)(nil)
564 :
565 : // TestOnlyWaitForCleaning MUST only be used in tests.
566 1 : func (d *DB) TestOnlyWaitForCleaning() {
567 1 : d.deletePacer.WaitForTesting()
568 1 : }
569 :
570 : // RecordSeparatedValueRetrievals begins collection of stack traces that are
571 : // retrieving separated values. If a profile has already begun, an error is
572 : // returned. If successful, the stop function must be called to stop the
573 : // collection and return the resulting profile.
574 : func (d *DB) RecordSeparatedValueRetrievals() (
575 : stop func() *metrics.ValueRetrievalProfile,
576 : err error,
577 1 : ) {
578 1 : p := bytesprofile.NewProfile()
579 1 : swapped := d.valueRetrievalProfile.CompareAndSwap(nil, p)
580 1 : if !swapped {
581 1 : return nil, errors.New("separated value retrieval profile is already in progress")
582 1 : }
583 1 : return func() *metrics.ValueRetrievalProfile {
584 1 : if !d.valueRetrievalProfile.CompareAndSwap(p, nil) {
585 0 : panic(errors.AssertionFailedf("profile already stopped"))
586 : }
587 1 : return p
588 : }, nil
589 : }
590 :
591 : // Set sets the value for the given key. It overwrites any previous value
592 : // for that key; a DB is not a multi-map.
593 : //
594 : // It is safe to modify the contents of the arguments after Set returns.
595 1 : func (d *DB) Set(key, value []byte, opts *WriteOptions) error {
596 1 : b := newBatch(d)
597 1 : _ = b.Set(key, value, opts)
598 1 : if err := d.Apply(b, opts); err != nil {
599 1 : return err
600 1 : }
601 : // Only release the batch on success.
602 1 : return b.Close()
603 : }
604 :
605 : // Delete deletes the value for the given key. Deletes are blind all will
606 : // succeed even if the given key does not exist.
607 : //
608 : // It is safe to modify the contents of the arguments after Delete returns.
609 1 : func (d *DB) Delete(key []byte, opts *WriteOptions) error {
610 1 : b := newBatch(d)
611 1 : _ = b.Delete(key, opts)
612 1 : if err := d.Apply(b, opts); err != nil {
613 1 : return err
614 1 : }
615 : // Only release the batch on success.
616 1 : return b.Close()
617 : }
618 :
619 : // DeleteSized behaves identically to Delete, but takes an additional
620 : // argument indicating the size of the value being deleted. DeleteSized
621 : // should be preferred when the caller has the expectation that there exists
622 : // a single internal KV pair for the key (eg, the key has not been
623 : // overwritten recently), and the caller knows the size of its value.
624 : //
625 : // DeleteSized will record the value size within the tombstone and use it to
626 : // inform compaction-picking heuristics which strive to reduce space
627 : // amplification in the LSM. This "calling your shot" mechanic allows the
628 : // storage engine to more accurately estimate and reduce space amplification.
629 : //
630 : // It is safe to modify the contents of the arguments after DeleteSized
631 : // returns.
632 1 : func (d *DB) DeleteSized(key []byte, valueSize uint32, opts *WriteOptions) error {
633 1 : b := newBatch(d)
634 1 : _ = b.DeleteSized(key, valueSize, opts)
635 1 : if err := d.Apply(b, opts); err != nil {
636 0 : return err
637 0 : }
638 : // Only release the batch on success.
639 1 : return b.Close()
640 : }
641 :
642 : // SingleDelete adds an action to the batch that single deletes the entry for key.
643 : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
644 : //
645 : // WARNING: See the detailed warning in Writer.SingleDelete before using this.
646 : //
647 : // It is safe to modify the contents of the arguments after SingleDelete returns.
648 1 : func (d *DB) SingleDelete(key []byte, opts *WriteOptions) error {
649 1 : b := newBatch(d)
650 1 : _ = b.SingleDelete(key, opts)
651 1 : if err := d.Apply(b, opts); err != nil {
652 0 : return err
653 0 : }
654 : // Only release the batch on success.
655 1 : return b.Close()
656 : }
657 :
658 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
659 : // (inclusive on start, exclusive on end).
660 : //
661 : // It is safe to modify the contents of the arguments after DeleteRange
662 : // returns.
663 1 : func (d *DB) DeleteRange(start, end []byte, opts *WriteOptions) error {
664 1 : b := newBatch(d)
665 1 : _ = b.DeleteRange(start, end, opts)
666 1 : if err := d.Apply(b, opts); err != nil {
667 1 : return err
668 1 : }
669 : // Only release the batch on success.
670 1 : return b.Close()
671 : }
672 :
673 : // Merge adds an action to the DB that merges the value at key with the new
674 : // value. The details of the merge are dependent upon the configured merge
675 : // operator.
676 : //
677 : // It is safe to modify the contents of the arguments after Merge returns.
678 1 : func (d *DB) Merge(key, value []byte, opts *WriteOptions) error {
679 1 : b := newBatch(d)
680 1 : _ = b.Merge(key, value, opts)
681 1 : if err := d.Apply(b, opts); err != nil {
682 1 : return err
683 1 : }
684 : // Only release the batch on success.
685 1 : return b.Close()
686 : }
687 :
688 : // LogData adds the specified to the batch. The data will be written to the
689 : // WAL, but not added to memtables or sstables. Log data is never indexed,
690 : // which makes it useful for testing WAL performance.
691 : //
692 : // It is safe to modify the contents of the argument after LogData returns.
693 1 : func (d *DB) LogData(data []byte, opts *WriteOptions) error {
694 1 : b := newBatch(d)
695 1 : _ = b.LogData(data, opts)
696 1 : if err := d.Apply(b, opts); err != nil {
697 1 : return err
698 1 : }
699 : // Only release the batch on success.
700 1 : return b.Close()
701 : }
702 :
703 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
704 : // timestamp suffix to value. The suffix is optional. If any portion of the key
705 : // range [start, end) is already set by a range key with the same suffix value,
706 : // RangeKeySet overrides it.
707 : //
708 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
709 1 : func (d *DB) RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error {
710 1 : b := newBatch(d)
711 1 : _ = b.RangeKeySet(start, end, suffix, value, opts)
712 1 : if err := d.Apply(b, opts); err != nil {
713 0 : return err
714 0 : }
715 : // Only release the batch on success.
716 1 : return b.Close()
717 : }
718 :
719 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
720 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
721 : // range key. RangeKeyUnset only removes portions of range keys that fall within
722 : // the [start, end) key span, and only range keys with suffixes that exactly
723 : // match the unset suffix.
724 : //
725 : // It is safe to modify the contents of the arguments after RangeKeyUnset
726 : // returns.
727 1 : func (d *DB) RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error {
728 1 : b := newBatch(d)
729 1 : _ = b.RangeKeyUnset(start, end, suffix, opts)
730 1 : if err := d.Apply(b, opts); err != nil {
731 0 : return err
732 0 : }
733 : // Only release the batch on success.
734 1 : return b.Close()
735 : }
736 :
737 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
738 : // (inclusive on start, exclusive on end). It does not delete point keys (for
739 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
740 : // bounds, including those with or without suffixes.
741 : //
742 : // It is safe to modify the contents of the arguments after RangeKeyDelete
743 : // returns.
744 1 : func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error {
745 1 : b := newBatch(d)
746 1 : _ = b.RangeKeyDelete(start, end, opts)
747 1 : if err := d.Apply(b, opts); err != nil {
748 0 : return err
749 0 : }
750 : // Only release the batch on success.
751 1 : return b.Close()
752 : }
753 :
754 : // Apply the operations contained in the batch to the DB. If the batch is large
755 : // the contents of the batch may be retained by the database. If that occurs
756 : // the batch contents will be cleared preventing the caller from attempting to
757 : // reuse them.
758 : //
759 : // It is safe to modify the contents of the arguments after Apply returns.
760 : //
761 : // Apply returns ErrInvalidBatch if the provided batch is invalid in any way.
762 1 : func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
763 1 : return d.applyInternal(batch, opts, false)
764 1 : }
765 :
766 : // ApplyNoSyncWait must only be used when opts.Sync is true and the caller
767 : // does not want to wait for the WAL fsync to happen. The method will return
768 : // once the mutation is applied to the memtable and is visible (note that a
769 : // mutation is visible before the WAL sync even in the wait case, so we have
770 : // not weakened the durability semantics). The caller must call Batch.SyncWait
771 : // to wait for the WAL fsync. The caller must not Close the batch without
772 : // first calling Batch.SyncWait.
773 : //
774 : // RECOMMENDATION: Prefer using Apply unless you really understand why you
775 : // need ApplyNoSyncWait.
776 : // EXPERIMENTAL: API/feature subject to change. Do not yet use outside
777 : // CockroachDB.
778 1 : func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error {
779 1 : if !opts.Sync {
780 0 : return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false")
781 0 : }
782 1 : return d.applyInternal(batch, opts, true)
783 : }
784 :
785 : // REQUIRES: noSyncWait => opts.Sync
786 1 : func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error {
787 1 : if err := d.closed.Load(); err != nil {
788 1 : panic(err)
789 : }
790 1 : if batch.committing {
791 0 : panic("pebble: batch already committing")
792 : }
793 1 : if batch.applied.Load() {
794 0 : panic("pebble: batch already applied")
795 : }
796 1 : if d.opts.ReadOnly {
797 1 : return ErrReadOnly
798 1 : }
799 1 : if batch.db != nil && batch.db != d {
800 1 : panic(fmt.Sprintf("pebble: batch db mismatch: %p != %p", batch.db, d))
801 : }
802 :
803 1 : sync := opts.GetSync()
804 1 : if sync && d.opts.DisableWAL {
805 0 : return errors.New("pebble: WAL disabled")
806 0 : }
807 :
808 1 : if fmv := d.FormatMajorVersion(); fmv < batch.minimumFormatMajorVersion {
809 0 : panic(fmt.Sprintf(
810 0 : "pebble: batch requires at least format major version %d (current: %d)",
811 0 : batch.minimumFormatMajorVersion, fmv,
812 0 : ))
813 : }
814 :
815 1 : if batch.countRangeKeys > 0 {
816 1 : if d.split == nil {
817 0 : return errNoSplit
818 0 : }
819 : }
820 1 : batch.committing = true
821 1 :
822 1 : if batch.db == nil {
823 1 : if err := batch.refreshMemTableSize(); err != nil {
824 0 : return err
825 0 : }
826 : }
827 1 : if batch.memTableSize >= d.largeBatchThreshold {
828 1 : var err error
829 1 : batch.flushable, err = newFlushableBatch(batch, d.opts.Comparer)
830 1 : if err != nil {
831 0 : return err
832 0 : }
833 : }
834 1 : if err := d.commit.Commit(batch, sync, noSyncWait); err != nil {
835 0 : // There isn't much we can do on an error here. The commit pipeline will be
836 0 : // horked at this point.
837 0 : d.opts.Logger.Fatalf("pebble: fatal commit error: %v", err)
838 0 : }
839 : // If this is a large batch, we need to clear the batch contents as the
840 : // flushable batch may still be present in the flushables queue.
841 : //
842 : // TODO(peter): Currently large batches are written to the WAL. We could
843 : // skip the WAL write and instead wait for the large batch to be flushed to
844 : // an sstable. For a 100 MB batch, this might actually be faster. For a 1
845 : // GB batch this is almost certainly faster.
846 1 : if batch.flushable != nil {
847 1 : batch.data = nil
848 1 : }
849 1 : return nil
850 : }
851 :
852 1 : func (d *DB) commitApply(b *Batch, mem *memTable) error {
853 1 : if b.flushable != nil {
854 1 : // This is a large batch which was already added to the immutable queue.
855 1 : return nil
856 1 : }
857 1 : err := mem.apply(b, b.SeqNum())
858 1 : if err != nil {
859 0 : return err
860 0 : }
861 :
862 : // If the batch contains range tombstones and the database is configured
863 : // to flush range deletions, schedule a delayed flush so that disk space
864 : // may be reclaimed without additional writes or an explicit flush.
865 1 : if b.countRangeDels > 0 && d.opts.FlushDelayDeleteRange > 0 {
866 1 : d.mu.Lock()
867 1 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayDeleteRange)
868 1 : d.mu.Unlock()
869 1 : }
870 :
871 : // If the batch contains range keys and the database is configured to flush
872 : // range keys, schedule a delayed flush so that the range keys are cleared
873 : // from the memtable.
874 1 : if b.countRangeKeys > 0 && d.opts.FlushDelayRangeKey > 0 {
875 1 : d.mu.Lock()
876 1 : d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayRangeKey)
877 1 : d.mu.Unlock()
878 1 : }
879 :
880 1 : if mem.writerUnref() {
881 1 : d.mu.Lock()
882 1 : d.maybeScheduleFlush()
883 1 : d.mu.Unlock()
884 1 : }
885 1 : return nil
886 : }
887 :
888 1 : func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
889 1 : var size int64
890 1 : repr := b.Repr()
891 1 :
892 1 : if b.flushable != nil {
893 1 : // We have a large batch. Such batches are special in that they don't get
894 1 : // added to the memtable, and are instead inserted into the queue of
895 1 : // memtables. The call to makeRoomForWrite with this batch will force the
896 1 : // current memtable to be flushed. We want the large batch to be part of
897 1 : // the same log, so we add it to the WAL here, rather than after the call
898 1 : // to makeRoomForWrite().
899 1 : //
900 1 : // Set the sequence number since it was not set to the correct value earlier
901 1 : // (see comment in newFlushableBatch()).
902 1 : b.flushable.setSeqNum(b.SeqNum())
903 1 : if !d.opts.DisableWAL {
904 1 : var err error
905 1 : size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
906 1 : if err != nil {
907 0 : panic(err)
908 : }
909 : }
910 : }
911 :
912 1 : var err error
913 1 : // Grab a reference to the memtable. We don't hold DB.mu, but we do hold
914 1 : // d.commit.mu. It's okay for readers of d.mu.mem.mutable to only hold one of
915 1 : // d.commit.mu or d.mu, because memtable rotations require holding both.
916 1 : mem := d.mu.mem.mutable
917 1 : // Batches which contain keys of kind InternalKeyKindIngestSST will
918 1 : // never be applied to the memtable, so we don't need to make room for
919 1 : // write.
920 1 : if !b.ingestedSSTBatch {
921 1 : // Flushable batches will require a rotation of the memtable regardless,
922 1 : // so only attempt an optimistic reservation of space in the current
923 1 : // memtable if this batch is not a large flushable batch.
924 1 : if b.flushable == nil {
925 1 : err = d.mu.mem.mutable.prepare(b)
926 1 : }
927 1 : if b.flushable != nil || err == arenaskl.ErrArenaFull {
928 1 : // Slow path.
929 1 : // We need to acquire DB.mu and rotate the memtable.
930 1 : func() {
931 1 : d.mu.Lock()
932 1 : defer d.mu.Unlock()
933 1 : err = d.makeRoomForWrite(b)
934 1 : mem = d.mu.mem.mutable
935 1 : }()
936 : }
937 : }
938 1 : if err != nil {
939 0 : return nil, err
940 0 : }
941 1 : if d.opts.DisableWAL {
942 1 : return mem, nil
943 1 : }
944 1 : d.logBytesIn.Add(uint64(len(repr)))
945 1 :
946 1 : if b.flushable == nil {
947 1 : size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
948 1 : if err != nil {
949 0 : panic(err)
950 : }
951 : }
952 :
953 1 : d.logSize.Store(uint64(size))
954 1 : return mem, err
955 : }
956 :
957 : type iterAlloc struct {
958 : keyBuf []byte `invariants:"reused"`
959 : boundsBuf [2][]byte `invariants:"reused"`
960 : prefixOrFullSeekKey []byte `invariants:"reused"`
961 : batchState iteratorBatchState
962 : dbi Iterator
963 : merging mergingIter
964 : mlevels [3 + numLevels]mergingIterLevel
965 : levels [3 + numLevels]levelIter
966 : levelsPositioned [3 + numLevels]bool
967 : }
968 :
969 : // maybeAssertZeroed asserts that i is a "zeroed" value. See assertZeroed for
970 : // the definition of "zeroed". It's used to ensure we're properly zeroing out
971 : // memory before returning the iterAlloc to the shared pool.
972 1 : func (i *iterAlloc) maybeAssertZeroed() {
973 1 : if invariants.Enabled {
974 1 : v := reflect.ValueOf(i).Elem()
975 1 : if err := assertZeroed(v); err != nil {
976 0 : panic(err)
977 : }
978 : }
979 : }
980 :
981 : // assertZeroed asserts that v is a "zeroed" value. A "zeroed" value is a value
982 : // that is:
983 : // - the Go zero value for its type, or
984 : // - a pointer to a "zeroed" value, or
985 : // - a slice of len()=0, with any values in the backing array being "zeroed
986 : // values", or
987 : // - a struct with all fields being "zeroed" values,
988 : // - a struct field explicitly marked with a "invariants:reused" tag.
989 1 : func assertZeroed(v reflect.Value) error {
990 1 : if v.IsZero() {
991 1 : return nil
992 1 : }
993 1 : typ := v.Type()
994 1 : switch typ.Kind() {
995 0 : case reflect.Pointer:
996 0 : return assertZeroed(v.Elem())
997 1 : case reflect.Slice:
998 1 : if v.Len() > 0 {
999 0 : return errors.AssertionFailedf("%s is not zeroed (%d len): %#v", typ.Name(), v.Len(), v)
1000 0 : }
1001 1 : resliced := v.Slice(0, v.Cap())
1002 1 : for i := 0; i < resliced.Len(); i++ {
1003 1 : if err := assertZeroed(resliced.Index(i)); err != nil {
1004 0 : return errors.Wrapf(err, "[%d]", i)
1005 0 : }
1006 : }
1007 1 : return nil
1008 1 : case reflect.Struct:
1009 1 : for i := 0; i < typ.NumField(); i++ {
1010 1 : if typ.Field(i).Tag.Get("invariants") == "reused" {
1011 1 : continue
1012 : }
1013 1 : if err := assertZeroed(v.Field(i)); err != nil {
1014 0 : return errors.Wrapf(err, "%q", typ.Field(i).Name)
1015 0 : }
1016 : }
1017 1 : return nil
1018 : }
1019 0 : return errors.AssertionFailedf("%s (%s) is not zeroed: %#v", typ.Name(), typ.Kind(), v)
1020 : }
1021 :
1022 1 : func newIterAlloc() *iterAlloc {
1023 1 : buf := iterAllocPool.Get().(*iterAlloc)
1024 1 : buf.maybeAssertZeroed()
1025 1 : return buf
1026 1 : }
1027 :
1028 : var iterAllocPool = sync.Pool{
1029 1 : New: func() interface{} {
1030 1 : return &iterAlloc{}
1031 1 : },
1032 : }
1033 :
1034 : // snapshotIterOpts denotes snapshot-related iterator options when calling
1035 : // newIter. These are the possible cases for a snapshotIterOpts:
1036 : // - No snapshot: All fields are zero values.
1037 : // - Classic snapshot: Only `seqNum` is set. The latest readState will be used
1038 : // and the specified seqNum will be used as the snapshot seqNum.
1039 : // - EventuallyFileOnlySnapshot (EFOS) behaving as a classic snapshot. Only
1040 : // the `seqNum` is set. The latest readState will be used
1041 : // and the specified seqNum will be used as the snapshot seqNum.
1042 : // - EFOS in file-only state: Only `seqNum` and `vers` are set. All the
1043 : // relevant SSTs are referenced by the *version.
1044 : // - EFOS that has been excised but is in alwaysCreateIters mode (tests only).
1045 : // Only `seqNum` and `readState` are set.
1046 : type snapshotIterOpts struct {
1047 : seqNum base.SeqNum
1048 : vers *manifest.Version
1049 : readState *readState
1050 : }
1051 :
1052 : type batchIterOpts struct {
1053 : batchOnly bool
1054 : }
1055 : type newIterOpts struct {
1056 : snapshot snapshotIterOpts
1057 : batch batchIterOpts
1058 : }
1059 :
1060 : // newIter constructs a new iterator, merging in batch iterators as an extra
1061 : // level.
1062 : func (d *DB) newIter(
1063 : ctx context.Context, batch *Batch, newIterOpts newIterOpts, o *IterOptions,
1064 1 : ) *Iterator {
1065 1 : if newIterOpts.batch.batchOnly {
1066 1 : if batch == nil {
1067 0 : panic("batchOnly is true, but batch is nil")
1068 : }
1069 1 : if newIterOpts.snapshot.vers != nil {
1070 0 : panic("batchOnly is true, but snapshotIterOpts is initialized")
1071 : }
1072 : }
1073 1 : if err := d.closed.Load(); err != nil {
1074 1 : panic(err)
1075 : }
1076 1 : seqNum := newIterOpts.snapshot.seqNum
1077 1 : if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
1078 0 : panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
1079 : }
1080 1 : if (batch != nil || seqNum != 0) && (o != nil && o.OnlyReadGuaranteedDurable) {
1081 1 : // We could add support for OnlyReadGuaranteedDurable on snapshots if
1082 1 : // there was a need: this would require checking that the sequence number
1083 1 : // of the snapshot has been flushed, by comparing with
1084 1 : // DB.mem.queue[0].logSeqNum.
1085 1 : panic("OnlyReadGuaranteedDurable is not supported for batches or snapshots")
1086 : }
1087 1 : var readState *readState
1088 1 : var newIters tableNewIters
1089 1 : var newIterRangeKey keyspanimpl.TableNewSpanIter
1090 1 : if !newIterOpts.batch.batchOnly {
1091 1 : // Grab and reference the current readState. This prevents the underlying
1092 1 : // files in the associated version from being deleted if there is a current
1093 1 : // compaction. The readState is unref'd by Iterator.Close().
1094 1 : if newIterOpts.snapshot.vers == nil {
1095 1 : if newIterOpts.snapshot.readState != nil {
1096 0 : readState = newIterOpts.snapshot.readState
1097 0 : readState.ref()
1098 1 : } else {
1099 1 : // NB: loadReadState() calls readState.ref().
1100 1 : readState = d.loadReadState()
1101 1 : }
1102 1 : } else {
1103 1 : // vers != nil
1104 1 : newIterOpts.snapshot.vers.Ref()
1105 1 : }
1106 :
1107 : // Determine the seqnum to read at after grabbing the read state (current and
1108 : // memtables) above.
1109 1 : if seqNum == 0 {
1110 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
1111 1 : }
1112 1 : newIters = d.newIters
1113 1 : newIterRangeKey = d.tableNewRangeKeyIter
1114 : }
1115 :
1116 : // Bundle various structures under a single umbrella in order to allocate
1117 : // them together.
1118 1 : buf := newIterAlloc()
1119 1 : dbi := &buf.dbi
1120 1 : dbi.ctx = ctx
1121 1 : dbi.alloc = buf
1122 1 : dbi.merge = d.merge
1123 1 : dbi.comparer = d.opts.Comparer
1124 1 : dbi.readState = readState
1125 1 : dbi.version = newIterOpts.snapshot.vers
1126 1 : dbi.keyBuf = buf.keyBuf
1127 1 : dbi.prefixOrFullSeekKey = buf.prefixOrFullSeekKey
1128 1 : dbi.boundsBuf = buf.boundsBuf
1129 1 : dbi.fc = d.fileCache
1130 1 : dbi.newIters = newIters
1131 1 : dbi.newIterRangeKey = newIterRangeKey
1132 1 : dbi.valueRetrievalProfile = d.valueRetrievalProfile.Load()
1133 1 : dbi.seqNum = seqNum
1134 1 : dbi.batchOnlyIter = newIterOpts.batch.batchOnly
1135 1 : if o != nil {
1136 1 : dbi.opts = *o
1137 1 : dbi.processBounds(o.LowerBound, o.UpperBound)
1138 1 : }
1139 1 : dbi.opts.logger = d.opts.Logger
1140 1 : if d.opts.private.disableLazyCombinedIteration {
1141 0 : dbi.opts.disableLazyCombinedIteration = true
1142 0 : }
1143 1 : if batch != nil {
1144 1 : dbi.batch = &buf.batchState
1145 1 : dbi.batch.batch = batch
1146 1 : dbi.batch.batchSeqNum = batch.nextSeqNum()
1147 1 : }
1148 1 : dbi.tracker = d.iterTracker
1149 1 : if !dbi.batchOnlyIter && d.iterTracker != nil && !dbi.opts.ExemptFromTracking {
1150 1 : dbi.trackerHandle = d.iterTracker.Start()
1151 1 : }
1152 1 : return finishInitializingIter(ctx, buf)
1153 : }
1154 :
1155 : // finishInitializingIter is a helper for doing the non-trivial initialization
1156 : // of an Iterator. It's invoked to perform the initial initialization of an
1157 : // Iterator during NewIter or Clone, and to perform reinitialization due to a
1158 : // change in IterOptions by a call to Iterator.SetOptions.
1159 1 : func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
1160 1 : // Short-hand.
1161 1 : dbi := &buf.dbi
1162 1 :
1163 1 : var memtables flushableList
1164 1 : if dbi.readState != nil {
1165 1 : memtables = dbi.readState.memtables
1166 1 : }
1167 1 : if dbi.opts.OnlyReadGuaranteedDurable {
1168 1 : memtables = nil
1169 1 : } else {
1170 1 : // We only need to read from memtables which contain sequence numbers older
1171 1 : // than seqNum. Trim off newer memtables.
1172 1 : for i := len(memtables) - 1; i >= 0; i-- {
1173 1 : if logSeqNum := memtables[i].logSeqNum; logSeqNum < dbi.seqNum {
1174 1 : break
1175 : }
1176 1 : memtables = memtables[:i]
1177 : }
1178 : }
1179 :
1180 1 : if dbi.opts.pointKeys() {
1181 1 : // Construct the point iterator, initializing dbi.pointIter to point to
1182 1 : // dbi.merging. If this is called during a SetOptions call and this
1183 1 : // Iterator has already initialized dbi.merging, constructPointIter is a
1184 1 : // noop and an initialized pointIter already exists in dbi.pointIter.
1185 1 : dbi.constructPointIter(ctx, memtables, buf)
1186 1 : dbi.iter = dbi.pointIter
1187 1 : } else {
1188 1 : dbi.iter = emptyIter
1189 1 : }
1190 :
1191 1 : if dbi.opts.rangeKeys() {
1192 1 : dbi.rangeKeyMasking.init(dbi, dbi.comparer)
1193 1 :
1194 1 : // When iterating over both point and range keys, don't create the
1195 1 : // range-key iterator stack immediately if we can avoid it. This
1196 1 : // optimization takes advantage of the expected sparseness of range
1197 1 : // keys, and configures the point-key iterator to dynamically switch to
1198 1 : // combined iteration when it observes a file containing range keys.
1199 1 : //
1200 1 : // Lazy combined iteration is not possible if a batch or a memtable
1201 1 : // contains any range keys.
1202 1 : useLazyCombinedIteration := dbi.rangeKey == nil &&
1203 1 : dbi.opts.KeyTypes == IterKeyTypePointsAndRanges &&
1204 1 : (dbi.batch == nil || dbi.batch.batch.countRangeKeys == 0) &&
1205 1 : !dbi.opts.disableLazyCombinedIteration
1206 1 : if useLazyCombinedIteration {
1207 1 : // The user requested combined iteration, and there's no indexed
1208 1 : // batch currently containing range keys that would prevent lazy
1209 1 : // combined iteration. Check the memtables to see if they contain
1210 1 : // any range keys.
1211 1 : for i := range memtables {
1212 1 : if memtables[i].containsRangeKeys() {
1213 1 : useLazyCombinedIteration = false
1214 1 : break
1215 : }
1216 : }
1217 : }
1218 :
1219 1 : if useLazyCombinedIteration {
1220 1 : dbi.lazyCombinedIter = lazyCombinedIter{
1221 1 : parent: dbi,
1222 1 : pointIter: dbi.pointIter,
1223 1 : combinedIterState: combinedIterState{
1224 1 : initialized: false,
1225 1 : },
1226 1 : }
1227 1 : dbi.iter = &dbi.lazyCombinedIter
1228 1 : dbi.iter = invalidating.MaybeWrapIfInvariants(dbi.iter)
1229 1 : } else {
1230 1 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{
1231 1 : initialized: true,
1232 1 : }
1233 1 : if dbi.rangeKey == nil {
1234 1 : dbi.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1235 1 : dbi.constructRangeKeyIter()
1236 1 : } else {
1237 1 : dbi.rangeKey.iterConfig.SetBounds(dbi.opts.LowerBound, dbi.opts.UpperBound)
1238 1 : }
1239 :
1240 : // Wrap the point iterator (currently dbi.iter) with an interleaving
1241 : // iterator that interleaves range keys pulled from
1242 : // dbi.rangeKey.rangeKeyIter.
1243 : //
1244 : // NB: The interleaving iterator is always reinitialized, even if
1245 : // dbi already had an initialized range key iterator, in case the point
1246 : // iterator changed or the range key masking suffix changed.
1247 1 : dbi.rangeKey.iiter.Init(dbi.comparer, dbi.iter, dbi.rangeKey.rangeKeyIter,
1248 1 : keyspan.InterleavingIterOpts{
1249 1 : Mask: &dbi.rangeKeyMasking,
1250 1 : LowerBound: dbi.opts.LowerBound,
1251 1 : UpperBound: dbi.opts.UpperBound,
1252 1 : })
1253 1 : dbi.iter = &dbi.rangeKey.iiter
1254 : }
1255 1 : } else {
1256 1 : // !dbi.opts.rangeKeys()
1257 1 : //
1258 1 : // Reset the combined iterator state. The initialized=true ensures the
1259 1 : // iterator doesn't unnecessarily try to switch to combined iteration.
1260 1 : dbi.lazyCombinedIter.combinedIterState = combinedIterState{initialized: true}
1261 1 : }
1262 1 : return dbi
1263 : }
1264 :
1265 : func (i *Iterator) constructPointIter(
1266 : ctx context.Context, memtables flushableList, buf *iterAlloc,
1267 1 : ) {
1268 1 : if i.pointIter != nil {
1269 1 : // Already have one.
1270 1 : return
1271 1 : }
1272 1 : readEnv := block.ReadEnv{
1273 1 : Stats: &i.stats.InternalStats,
1274 1 : // If the file cache has a sstable stats collector, ask it for an
1275 1 : // accumulator for this iterator's configured category and QoS. All SSTable
1276 1 : // iterators created by this Iterator will accumulate their stats to it as
1277 1 : // they Close during iteration.
1278 1 : IterStats: i.fc.SSTStatsCollector().Accumulator(
1279 1 : uint64(uintptr(unsafe.Pointer(i))),
1280 1 : i.opts.Category,
1281 1 : ),
1282 1 : ValueRetrievalProfile: i.valueRetrievalProfile,
1283 1 : }
1284 1 : if i.readState != nil {
1285 1 : i.blobValueFetcher.Init(&i.readState.current.BlobFiles, i.fc, readEnv,
1286 1 : blob.SuggestedCachedReaders(i.readState.current.MaxReadAmp()))
1287 1 : } else if i.version != nil {
1288 1 : i.blobValueFetcher.Init(&i.version.BlobFiles, i.fc, readEnv,
1289 1 : blob.SuggestedCachedReaders(i.version.MaxReadAmp()))
1290 1 : }
1291 1 : internalOpts := internalIterOpts{
1292 1 : readEnv: sstable.ReadEnv{Block: readEnv},
1293 1 : blobValueFetcher: &i.blobValueFetcher,
1294 1 : }
1295 1 : if i.opts.RangeKeyMasking.Filter != nil {
1296 1 : internalOpts.boundLimitedFilter = &i.rangeKeyMasking
1297 1 : }
1298 :
1299 : // Merging levels and levels from iterAlloc.
1300 1 : mlevels := buf.mlevels[:0]
1301 1 : levels := buf.levels[:0]
1302 1 :
1303 1 : // We compute the number of levels needed ahead of time and reallocate a slice if
1304 1 : // the array from the iterAlloc isn't large enough. Doing this allocation once
1305 1 : // should improve the performance.
1306 1 : numMergingLevels := 0
1307 1 : numLevelIters := 0
1308 1 : if i.batch != nil {
1309 1 : numMergingLevels++
1310 1 : }
1311 :
1312 1 : var current *manifest.Version
1313 1 : if !i.batchOnlyIter {
1314 1 : numMergingLevels += len(memtables)
1315 1 : current = i.version
1316 1 : if current == nil {
1317 1 : current = i.readState.current
1318 1 : }
1319 1 : maxReadAmp := current.MaxReadAmp()
1320 1 : numMergingLevels += maxReadAmp
1321 1 : numLevelIters += maxReadAmp
1322 : }
1323 :
1324 1 : if numMergingLevels > cap(mlevels) {
1325 1 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
1326 1 : }
1327 1 : if numLevelIters > cap(levels) {
1328 1 : levels = make([]levelIter, 0, numLevelIters)
1329 1 : }
1330 :
1331 : // Top-level is the batch, if any.
1332 1 : if i.batch != nil {
1333 1 : if i.batch.batch.index == nil {
1334 0 : // This isn't an indexed batch. We shouldn't have gotten this far.
1335 0 : panic(errors.AssertionFailedf("creating an iterator over an unindexed batch"))
1336 1 : } else {
1337 1 : i.batch.batch.initInternalIter(&i.opts, &i.batch.pointIter)
1338 1 : i.batch.batch.initRangeDelIter(&i.opts, &i.batch.rangeDelIter, i.batch.batchSeqNum)
1339 1 : // Only include the batch's rangedel iterator if it's non-empty.
1340 1 : // This requires some subtle logic in the case a rangedel is later
1341 1 : // written to the batch and the view of the batch is refreshed
1342 1 : // during a call to SetOptions—in this case, we need to reconstruct
1343 1 : // the point iterator to add the batch rangedel iterator.
1344 1 : var rangeDelIter keyspan.FragmentIterator
1345 1 : if i.batch.rangeDelIter.Count() > 0 {
1346 1 : rangeDelIter = &i.batch.rangeDelIter
1347 1 : }
1348 1 : mlevels = append(mlevels, mergingIterLevel{
1349 1 : iter: &i.batch.pointIter,
1350 1 : rangeDelIter: rangeDelIter,
1351 1 : })
1352 : }
1353 : }
1354 :
1355 1 : if !i.batchOnlyIter {
1356 1 : // Next are the memtables.
1357 1 : for j := len(memtables) - 1; j >= 0; j-- {
1358 1 : mem := memtables[j]
1359 1 : mlevels = append(mlevels, mergingIterLevel{
1360 1 : iter: mem.newIter(&i.opts),
1361 1 : rangeDelIter: mem.newRangeDelIter(&i.opts),
1362 1 : })
1363 1 : }
1364 :
1365 : // Next are the file levels: L0 sub-levels followed by lower levels.
1366 1 : mlevelsIndex := len(mlevels)
1367 1 : levelsIndex := len(levels)
1368 1 : mlevels = mlevels[:numMergingLevels]
1369 1 : levels = levels[:numLevelIters]
1370 1 : i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
1371 1 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Layer) {
1372 1 : li := &levels[levelsIndex]
1373 1 :
1374 1 : li.init(ctx, i.opts, i.comparer, i.newIters, files, level, internalOpts)
1375 1 : li.initRangeDel(&mlevels[mlevelsIndex])
1376 1 : li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
1377 1 : mlevels[mlevelsIndex].levelIter = li
1378 1 : mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
1379 1 :
1380 1 : levelsIndex++
1381 1 : mlevelsIndex++
1382 1 : }
1383 :
1384 : // Add level iterators for the L0 sublevels, iterating from newest to
1385 : // oldest.
1386 1 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
1387 1 : addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
1388 1 : }
1389 :
1390 : // Add level iterators for the non-empty non-L0 levels.
1391 1 : for level := 1; level < len(current.Levels); level++ {
1392 1 : if current.Levels[level].Empty() {
1393 1 : continue
1394 : }
1395 1 : addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
1396 : }
1397 : }
1398 1 : buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
1399 1 : if len(mlevels) <= cap(buf.levelsPositioned) {
1400 1 : buf.merging.levelsPositioned = buf.levelsPositioned[:len(mlevels)]
1401 1 : }
1402 1 : buf.merging.snapshot = i.seqNum
1403 1 : if i.batch != nil {
1404 1 : buf.merging.batchSnapshot = i.batch.batchSeqNum
1405 1 : }
1406 1 : buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
1407 1 : i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging).(topLevelIterator)
1408 1 : i.merging = &buf.merging
1409 : }
1410 :
1411 : // NewBatch returns a new empty write-only batch. Any reads on the batch will
1412 : // return an error. If the batch is committed it will be applied to the DB.
1413 1 : func (d *DB) NewBatch(opts ...BatchOption) *Batch {
1414 1 : return newBatch(d, opts...)
1415 1 : }
1416 :
1417 : // NewBatchWithSize is mostly identical to NewBatch, but it will allocate the
1418 : // the specified memory space for the internal slice in advance.
1419 0 : func (d *DB) NewBatchWithSize(size int, opts ...BatchOption) *Batch {
1420 0 : return newBatchWithSize(d, size, opts...)
1421 0 : }
1422 :
1423 : // NewIndexedBatch returns a new empty read-write batch. Any reads on the batch
1424 : // will read from both the batch and the DB. If the batch is committed it will
1425 : // be applied to the DB. An indexed batch is slower that a non-indexed batch
1426 : // for insert operations. If you do not need to perform reads on the batch, use
1427 : // NewBatch instead.
1428 1 : func (d *DB) NewIndexedBatch() *Batch {
1429 1 : return newIndexedBatch(d, d.opts.Comparer)
1430 1 : }
1431 :
1432 : // NewIndexedBatchWithSize is mostly identical to NewIndexedBatch, but it will
1433 : // allocate the specified memory space for the internal slice in advance.
1434 0 : func (d *DB) NewIndexedBatchWithSize(size int) *Batch {
1435 0 : return newIndexedBatchWithSize(d, d.opts.Comparer, size)
1436 0 : }
1437 :
1438 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
1439 : // return false). The iterator can be positioned via a call to SeekGE, SeekLT,
1440 : // First or Last. The iterator provides a point-in-time view of the current DB
1441 : // state. This view is maintained by preventing file deletions and preventing
1442 : // memtables referenced by the iterator from being deleted. Using an iterator
1443 : // to maintain a long-lived point-in-time view of the DB state can lead to an
1444 : // apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for
1445 : // point-in-time snapshots which avoids these problems.
1446 1 : func (d *DB) NewIter(o *IterOptions) (*Iterator, error) {
1447 1 : return d.NewIterWithContext(context.Background(), o)
1448 1 : }
1449 :
1450 : // NewIterWithContext is like NewIter, and additionally accepts a context for
1451 : // tracing.
1452 1 : func (d *DB) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
1453 1 : return d.newIter(ctx, nil /* batch */, newIterOpts{}, o), nil
1454 1 : }
1455 :
1456 : // NewSnapshot returns a point-in-time view of the current DB state. Iterators
1457 : // created with this handle will all observe a stable snapshot of the current
1458 : // DB state. The caller must call Snapshot.Close() when the snapshot is no
1459 : // longer needed. Snapshots are not persisted across DB restarts (close ->
1460 : // open). Unlike the implicit snapshot maintained by an iterator, a snapshot
1461 : // will not prevent memtables from being released or sstables from being
1462 : // deleted. Instead, a snapshot prevents deletion of sequence numbers
1463 : // referenced by the snapshot.
1464 : //
1465 : // There exists one violation of a Snapshot's point-in-time guarantee: An excise
1466 : // (see DB.Excise and DB.IngestAndExcise) that occurs after the snapshot's
1467 : // creation will be observed by iterators created from the snapshot after the
1468 : // excise. See NewEventuallyFileOnlySnapshot for a variant of NewSnapshot that
1469 : // provides a full point-in-time guarantee.
1470 1 : func (d *DB) NewSnapshot() *Snapshot {
1471 1 : // TODO(jackson): Consider removal of regular, non-eventually-file-only
1472 1 : // snapshots given they no longer provide a true point-in-time snapshot of
1473 1 : // the database due to excises. If we had a mechanism to construct a maximal
1474 1 : // key range, we could implement NewSnapshot in terms of
1475 1 : // NewEventuallyFileOnlySnapshot and provide a true point-in-time guarantee.
1476 1 : if err := d.closed.Load(); err != nil {
1477 1 : panic(err)
1478 : }
1479 1 : d.mu.Lock()
1480 1 : s := &Snapshot{
1481 1 : db: d,
1482 1 : seqNum: d.mu.versions.visibleSeqNum.Load(),
1483 1 : }
1484 1 : d.mu.snapshots.pushBack(s)
1485 1 : d.mu.Unlock()
1486 1 : return s
1487 : }
1488 :
1489 : // NewEventuallyFileOnlySnapshot returns a point-in-time view of the current DB
1490 : // state, similar to NewSnapshot, but with consistency constrained to the
1491 : // provided set of key ranges. See the comment at EventuallyFileOnlySnapshot for
1492 : // its semantics.
1493 1 : func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
1494 1 : if err := d.closed.Load(); err != nil {
1495 0 : panic(err)
1496 : }
1497 1 : for i := range keyRanges {
1498 1 : if i > 0 && d.cmp(keyRanges[i-1].End, keyRanges[i].Start) > 0 {
1499 0 : panic("pebble: key ranges for eventually-file-only-snapshot not in order")
1500 : }
1501 : }
1502 1 : return d.makeEventuallyFileOnlySnapshot(keyRanges)
1503 : }
1504 :
1505 : // Close closes the DB.
1506 : //
1507 : // It is not safe to close a DB until all outstanding iterators are closed
1508 : // or to call Close concurrently with any other DB method. It is not valid
1509 : // to call any of a DB's methods after the DB has been closed.
1510 1 : func (d *DB) Close() error {
1511 1 : if err := d.closed.Load(); err != nil {
1512 1 : panic(err)
1513 : }
1514 1 : d.compactionSchedulers.Wait()
1515 1 : // Compactions can be asynchronously started by the CompactionScheduler
1516 1 : // calling d.Schedule. When this Unregister returns, we know that the
1517 1 : // CompactionScheduler will never again call a method on the DB. Note that
1518 1 : // this must be called without holding d.mu.
1519 1 : d.compactionScheduler.Unregister()
1520 1 : // Lock the commit pipeline for the duration of Close. This prevents a race
1521 1 : // with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires
1522 1 : // dropping d.mu several times for I/O. If Close only holds d.mu, an
1523 1 : // in-progress WAL rotation may re-acquire d.mu only once the database is
1524 1 : // closed.
1525 1 : //
1526 1 : // Additionally, locking the commit pipeline makes it more likely that
1527 1 : // (illegal) concurrent writes will observe d.closed.Load() != nil, creating
1528 1 : // more understable panics if the database is improperly used concurrently
1529 1 : // during Close.
1530 1 : d.commit.mu.Lock()
1531 1 : defer d.commit.mu.Unlock()
1532 1 : d.mu.Lock()
1533 1 : defer d.mu.Unlock()
1534 1 : // Check that the DB is not closed again. If there are two concurrent calls
1535 1 : // to DB.Close, the best-effort check at the top of DB.Close may not fire.
1536 1 : // But since this second check happens after mutex acquisition, the two
1537 1 : // concurrent calls will get serialized and the second one will see the
1538 1 : // effect of the d.closed.Store below.
1539 1 : if err := d.closed.Load(); err != nil {
1540 0 : panic(err)
1541 : }
1542 : // Clear the finalizer that is used to check that an unreferenced DB has been
1543 : // closed. We're closing the DB here, so the check performed by that
1544 : // finalizer isn't necessary.
1545 : //
1546 : // Note: this is a no-op if invariants are disabled or race is enabled.
1547 1 : invariants.SetFinalizer(d.closed, nil)
1548 1 :
1549 1 : d.closed.Store(errors.WithStack(ErrClosed))
1550 1 : close(d.closedCh)
1551 1 :
1552 1 : defer d.cacheHandle.Close()
1553 1 :
1554 1 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
1555 1 : d.mu.compact.cond.Wait()
1556 1 : }
1557 1 : for d.mu.tableStats.loading {
1558 1 : d.mu.tableStats.cond.Wait()
1559 1 : }
1560 1 : for d.mu.tableValidation.validating {
1561 0 : d.mu.tableValidation.cond.Wait()
1562 0 : }
1563 :
1564 1 : var err error
1565 1 : if n := len(d.mu.compact.inProgress); n > 0 {
1566 1 : err = errors.Errorf("pebble: %d unexpected in-progress compactions", errors.Safe(n))
1567 1 : }
1568 1 : err = firstError(err, d.mu.formatVers.marker.Close())
1569 1 : if !d.opts.ReadOnly {
1570 1 : if d.mu.log.writer != nil {
1571 1 : _, err2 := d.mu.log.writer.Close()
1572 1 : err = firstError(err, err2)
1573 1 : }
1574 1 : } else if d.mu.log.writer != nil {
1575 0 : panic("pebble: log-writer should be nil in read-only mode")
1576 : }
1577 1 : err = firstError(err, d.mu.log.manager.Close())
1578 1 :
1579 1 : // Note that versionSet.close() only closes the MANIFEST. The versions list
1580 1 : // is still valid for the checks below.
1581 1 : err = firstError(err, d.mu.versions.close())
1582 1 :
1583 1 : d.readState.val.unrefLocked()
1584 1 :
1585 1 : current := d.mu.versions.currentVersion()
1586 1 : for v := d.mu.versions.versions.Front(); true; v = v.Next() {
1587 1 : refs := v.Refs()
1588 1 : if v == current {
1589 1 : if refs != 1 {
1590 1 : err = firstError(err, errors.Errorf("leaked iterators: current\n%s", v))
1591 1 : }
1592 1 : break
1593 : }
1594 0 : if refs != 0 {
1595 0 : err = firstError(err, errors.Errorf("leaked iterators:\n%s", v))
1596 0 : }
1597 : }
1598 :
1599 1 : for _, mem := range d.mu.mem.queue {
1600 1 : // Usually, we'd want to delete the files returned by readerUnref. But
1601 1 : // in this case, even if we're unreferencing the flushables, the
1602 1 : // flushables aren't obsolete. They will be reconstructed during WAL
1603 1 : // replay.
1604 1 : mem.readerUnrefLocked(false)
1605 1 : }
1606 : // If there's an unused, recycled memtable, we need to release its memory.
1607 1 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
1608 1 : d.freeMemTable(obsoleteMemTable)
1609 1 : }
1610 1 : if reserved := d.memTableReserved.Load(); reserved != 0 {
1611 1 : err = firstError(err, errors.Errorf("leaked memtable reservation: %d", errors.Safe(reserved)))
1612 1 : }
1613 :
1614 : // Since we called d.readState.val.unrefLocked() above, we are expected to
1615 : // manually schedule deletion of obsolete files.
1616 1 : if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
1617 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
1618 1 : }
1619 :
1620 1 : d.mu.Unlock()
1621 1 :
1622 1 : // Wait for all cleaning jobs to finish.
1623 1 : d.deletePacer.Close()
1624 1 :
1625 1 : d.mu.Lock()
1626 1 : // Sanity check compaction metrics.
1627 1 : if invariants.Enabled {
1628 1 : if d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.versions.atomicInProgressBytes.Load() > 0 {
1629 0 : panic("compacting counts not 0 on close")
1630 : }
1631 : }
1632 :
1633 : // As a sanity check, ensure that there are no zombie tables or blob files.
1634 : // A non-zero count hints at a reference count leak.
1635 1 : if ztbls := d.mu.versions.zombieTables.Count(); ztbls > 0 {
1636 0 : err = firstError(err, errors.Errorf("non-zero zombie file count: %d", ztbls))
1637 0 : }
1638 1 : if zblobs := d.mu.versions.zombieBlobs.Count(); zblobs > 0 {
1639 0 : err = firstError(err, errors.Errorf("non-zero zombie blob count: %d", zblobs))
1640 0 : }
1641 :
1642 1 : err = firstError(err, d.fileCache.Close())
1643 1 :
1644 1 : err = firstError(err, d.objProvider.Close())
1645 1 :
1646 1 : // If the options include a closer to 'close' the filesystem, close it.
1647 1 : if d.opts.private.fsCloser != nil {
1648 1 : d.opts.private.fsCloser.Close()
1649 1 : }
1650 :
1651 : // Return an error if the user failed to close all open snapshots.
1652 1 : if v := d.mu.snapshots.count(); v > 0 {
1653 0 : err = firstError(err, errors.Errorf("leaked snapshots: %d open snapshots on DB %p", v, d))
1654 0 : }
1655 1 : err = firstError(err, d.dirs.Close())
1656 1 :
1657 1 : if d.iterTracker != nil {
1658 1 : d.iterTracker.Close()
1659 1 : d.iterTracker = nil
1660 1 : }
1661 :
1662 1 : return err
1663 : }
1664 :
1665 : // Compact the specified range of keys in the database.
1666 1 : func (d *DB) Compact(ctx context.Context, start, end []byte, parallelize bool) error {
1667 1 : if err := d.closed.Load(); err != nil {
1668 1 : panic(err)
1669 : }
1670 1 : if d.opts.ReadOnly {
1671 1 : return ErrReadOnly
1672 1 : }
1673 1 : if d.cmp(start, end) >= 0 {
1674 1 : return errors.Errorf("Compact start %s is not less than end %s",
1675 1 : d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
1676 1 : }
1677 :
1678 1 : d.mu.Lock()
1679 1 : maxLevelWithFiles := 1
1680 1 : cur := d.mu.versions.currentVersion()
1681 1 : for level := 0; level < numLevels; level++ {
1682 1 : overlaps := cur.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
1683 1 : if !overlaps.Empty() {
1684 1 : maxLevelWithFiles = level + 1
1685 1 : }
1686 : }
1687 :
1688 : // Determine if any memtable overlaps with the compaction range. We wait for
1689 : // any such overlap to flush (initiating a flush if necessary).
1690 1 : mem, err := func() (*flushableEntry, error) {
1691 1 : // Check to see if any files overlap with any of the memtables. The queue
1692 1 : // is ordered from oldest to newest with the mutable memtable being the
1693 1 : // last element in the slice. We want to wait for the newest table that
1694 1 : // overlaps.
1695 1 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1696 1 : mem := d.mu.mem.queue[i]
1697 1 : var anyOverlaps bool
1698 1 : mem.computePossibleOverlaps(func(b bounded) shouldContinue {
1699 1 : anyOverlaps = true
1700 1 : return stopIteration
1701 1 : }, KeyRange{Start: start, End: end})
1702 1 : if !anyOverlaps {
1703 1 : continue
1704 : }
1705 1 : var err error
1706 1 : if mem.flushable == d.mu.mem.mutable {
1707 1 : // We have to hold both commitPipeline.mu and DB.mu when calling
1708 1 : // makeRoomForWrite(). Lock order requirements elsewhere force us to
1709 1 : // unlock DB.mu in order to grab commitPipeline.mu first.
1710 1 : d.mu.Unlock()
1711 1 : d.commit.mu.Lock()
1712 1 : d.mu.Lock()
1713 1 : defer d.commit.mu.Unlock() //nolint:deferloop
1714 1 : if mem.flushable == d.mu.mem.mutable {
1715 1 : // Only flush if the active memtable is unchanged.
1716 1 : err = d.makeRoomForWrite(nil)
1717 1 : }
1718 : }
1719 1 : mem.flushForced = true
1720 1 : d.maybeScheduleFlush()
1721 1 : return mem, err
1722 : }
1723 1 : return nil, nil
1724 : }()
1725 :
1726 1 : d.mu.Unlock()
1727 1 :
1728 1 : if err != nil {
1729 0 : return err
1730 0 : }
1731 1 : if mem != nil {
1732 1 : select {
1733 1 : case <-mem.flushed:
1734 0 : case <-ctx.Done():
1735 0 : return ctx.Err()
1736 : }
1737 : }
1738 :
1739 1 : for level := 0; level < maxLevelWithFiles; {
1740 1 : for {
1741 1 : if err := d.manualCompact(
1742 1 : ctx, start, end, level, parallelize); err != nil {
1743 1 : if errors.Is(err, ErrCancelledCompaction) {
1744 1 : continue
1745 : }
1746 1 : return err
1747 : }
1748 1 : break
1749 : }
1750 1 : level++
1751 1 : if level == numLevels-1 {
1752 1 : // A manual compaction of the bottommost level occurred.
1753 1 : // There is no next level to try and compact.
1754 1 : break
1755 : }
1756 : }
1757 1 : return nil
1758 : }
1759 :
1760 : func (d *DB) manualCompact(
1761 : ctx context.Context, start, end []byte, level int, parallelize bool,
1762 1 : ) error {
1763 1 : d.mu.Lock()
1764 1 : curr := d.mu.versions.currentVersion()
1765 1 : files := curr.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
1766 1 : if files.Empty() {
1767 1 : d.mu.Unlock()
1768 1 : return nil
1769 1 : }
1770 :
1771 1 : var compactions []*manualCompaction
1772 1 : if parallelize {
1773 1 : compactions = append(compactions, d.splitManualCompaction(start, end, level)...)
1774 1 : } else {
1775 1 : compactions = append(compactions, &manualCompaction{
1776 1 : level: level,
1777 1 : done: make(chan error, 1),
1778 1 : start: start,
1779 1 : end: end,
1780 1 : })
1781 1 : }
1782 1 : n := len(compactions)
1783 1 : if n == 0 {
1784 0 : d.mu.Unlock()
1785 0 : return nil
1786 0 : }
1787 1 : for i := range compactions {
1788 1 : d.mu.compact.manualID++
1789 1 : compactions[i].id = d.mu.compact.manualID
1790 1 : }
1791 : // [manualIDStart, manualIDEnd] are the compactions that have been added to
1792 : // d.mu.compact.manual.
1793 1 : manualIDStart := compactions[0].id
1794 1 : manualIDEnd := compactions[n-1].id
1795 1 : d.mu.compact.manual = append(d.mu.compact.manual, compactions...)
1796 1 : d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
1797 1 : d.maybeScheduleCompaction()
1798 1 : d.mu.Unlock()
1799 1 :
1800 1 : // On context cancellation, we only cancel the compactions that have not yet
1801 1 : // started. The assumption is that it is relatively harmless to have the
1802 1 : // already started compactions run to completion. We don't wait for the
1803 1 : // ongoing compactions to finish, since the assumption is that the caller
1804 1 : // has already given up on the operation (and the cancellation error is
1805 1 : // going to be returned anyway).
1806 1 : //
1807 1 : // An alternative would be to store the context in each *manualCompaction,
1808 1 : // and have the goroutine that retrieves the *manualCompaction for running
1809 1 : // notice the cancellation and write the cancellation error to
1810 1 : // manualCompaction.done. That approach would require this method to wait
1811 1 : // for all the *manualCompactions it has enqueued to finish before returning
1812 1 : // (to not leak a context). Since there is no timeliness guarantee on when a
1813 1 : // *manualCompaction will be retrieved for running, the wait until a
1814 1 : // cancelled context causes this method to return is not bounded. Hence, we
1815 1 : // don't adopt that approach.
1816 1 : cancelPendingCompactions := func() {
1817 1 : d.mu.Lock()
1818 1 : for i := 0; i < len(d.mu.compact.manual); {
1819 1 : if d.mu.compact.manual[i].id >= manualIDStart && d.mu.compact.manual[i].id <= manualIDEnd {
1820 1 : d.mu.compact.manual = slices.Delete(d.mu.compact.manual, i, i+1)
1821 1 : d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
1822 1 : } else {
1823 0 : i++
1824 0 : }
1825 : }
1826 1 : d.mu.Unlock()
1827 : }
1828 : // Each of the channels is guaranteed to be eventually sent to once. After a
1829 : // compaction is possibly picked in d.maybeScheduleCompaction(), either the
1830 : // compaction is dropped, executed after being scheduled, or retried later.
1831 : // Assuming eventual progress when a compaction is retried, all outcomes send
1832 : // a value to the done channel. Since the channels are buffered, it is not
1833 : // necessary to read from each channel, and so we can exit early in the event
1834 : // of an error.
1835 1 : for _, compaction := range compactions {
1836 1 : select {
1837 1 : case <-ctx.Done():
1838 1 : cancelPendingCompactions()
1839 1 : return ctx.Err()
1840 1 : case err := <-compaction.done:
1841 1 : if err != nil {
1842 1 : cancelPendingCompactions()
1843 1 : return err
1844 1 : }
1845 : }
1846 : }
1847 1 : return nil
1848 : }
1849 :
1850 : // splitManualCompaction splits a manual compaction over [start,end] on level
1851 : // such that the resulting compactions have no key overlap.
1852 : func (d *DB) splitManualCompaction(
1853 : start, end []byte, level int,
1854 1 : ) (splitCompactions []*manualCompaction) {
1855 1 : curr := d.mu.versions.currentVersion()
1856 1 : endLevel := level + 1
1857 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
1858 1 : if level == 0 {
1859 1 : endLevel = baseLevel
1860 1 : }
1861 1 : keyRanges := curr.CalculateInuseKeyRanges(d.mu.versions.latest.l0Organizer, level, endLevel, start, end)
1862 1 : for _, keyRange := range keyRanges {
1863 1 : splitCompactions = append(splitCompactions, &manualCompaction{
1864 1 : level: level,
1865 1 : done: make(chan error, 1),
1866 1 : start: keyRange.Start,
1867 1 : end: keyRange.End.Key,
1868 1 : split: true,
1869 1 : })
1870 1 : }
1871 1 : return splitCompactions
1872 : }
1873 :
1874 : // Flush the memtable to stable storage.
1875 1 : func (d *DB) Flush() error {
1876 1 : flushDone, err := d.AsyncFlush()
1877 1 : if err != nil {
1878 1 : return err
1879 1 : }
1880 1 : <-flushDone
1881 1 : return nil
1882 : }
1883 :
1884 : // AsyncFlush asynchronously flushes the memtable to stable storage.
1885 : //
1886 : // If no error is returned, the caller can receive from the returned channel in
1887 : // order to wait for the flush to complete.
1888 1 : func (d *DB) AsyncFlush() (<-chan struct{}, error) {
1889 1 : if err := d.closed.Load(); err != nil {
1890 1 : panic(err)
1891 : }
1892 1 : if d.opts.ReadOnly {
1893 1 : return nil, ErrReadOnly
1894 1 : }
1895 :
1896 1 : d.commit.mu.Lock()
1897 1 : defer d.commit.mu.Unlock()
1898 1 : d.mu.Lock()
1899 1 : defer d.mu.Unlock()
1900 1 : flushed := d.mu.mem.queue[len(d.mu.mem.queue)-1].flushed
1901 1 : err := d.makeRoomForWrite(nil)
1902 1 : if err != nil {
1903 0 : return nil, err
1904 0 : }
1905 1 : return flushed, nil
1906 : }
1907 :
1908 : // Metrics returns metrics about the database.
1909 1 : func (d *DB) Metrics() *Metrics {
1910 1 : metrics := &Metrics{}
1911 1 : walStats := d.mu.log.manager.Stats()
1912 1 :
1913 1 : d.mu.Lock()
1914 1 : vers := d.mu.versions.currentVersion()
1915 1 : vers.Ref()
1916 1 : defer vers.Unref()
1917 1 :
1918 1 : metrics.Levels = d.mu.versions.metrics.Levels
1919 1 : metrics.Compact = d.mu.versions.metrics.Compact
1920 1 : metrics.Ingest = d.mu.versions.metrics.Ingest
1921 1 : metrics.Flush = d.mu.versions.metrics.Flush
1922 1 : metrics.Keys = d.mu.versions.metrics.Keys
1923 1 :
1924 1 : metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt()
1925 1 : metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load()
1926 1 : // TODO(radu): split this to separate the download compactions.
1927 1 : metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount + d.mu.compact.downloadingCount)
1928 1 : metrics.Compact.MarkedFiles = vers.MarkedForCompaction.Count()
1929 1 : metrics.Compact.Duration = d.mu.compact.duration
1930 1 : for c := range d.mu.compact.inProgress {
1931 1 : if !c.IsFlush() {
1932 1 : metrics.Compact.Duration += d.opts.private.timeNow().Sub(c.BeganAt())
1933 1 : }
1934 : }
1935 1 : metrics.Compact.NumProblemSpans = d.problemSpans.Len()
1936 1 :
1937 1 : for _, m := range d.mu.mem.queue {
1938 1 : metrics.MemTable.Size += m.totalBytes()
1939 1 : }
1940 1 : metrics.Snapshots.Count = d.mu.snapshots.count()
1941 1 : if metrics.Snapshots.Count > 0 {
1942 0 : metrics.Snapshots.EarliestSeqNum = d.mu.snapshots.earliest()
1943 0 : }
1944 1 : metrics.Snapshots.PinnedKeys = d.mu.snapshots.cumulativePinnedCount
1945 1 : metrics.Snapshots.PinnedSize = d.mu.snapshots.cumulativePinnedSize
1946 1 : metrics.MemTable.Count = int64(len(d.mu.mem.queue))
1947 1 : metrics.MemTable.ZombieCount = d.memTableCount.Load() - metrics.MemTable.Count
1948 1 : metrics.MemTable.ZombieSize = uint64(d.memTableReserved.Load()) - metrics.MemTable.Size
1949 1 : metrics.WAL.ObsoleteFiles = int64(walStats.ObsoleteFileCount)
1950 1 : metrics.WAL.ObsoletePhysicalSize = walStats.ObsoleteFileSize
1951 1 : metrics.WAL.Files = int64(walStats.LiveFileCount)
1952 1 : // The current WAL's size (d.logSize) is the logical size, which may be less
1953 1 : // than the WAL's physical size if it was recycled. walStats.LiveFileSize
1954 1 : // includes the physical size of all live WALs, but for the current WAL it
1955 1 : // reflects the physical size when it was opened. So it is possible that
1956 1 : // d.atomic.logSize has exceeded that physical size. We allow for this
1957 1 : // anomaly.
1958 1 : metrics.WAL.PhysicalSize = walStats.LiveFileSize
1959 1 : metrics.WAL.BytesIn = d.logBytesIn.Load()
1960 1 : metrics.WAL.Size = d.logSize.Load()
1961 1 : for i, n := 0, len(d.mu.mem.queue)-1; i < n; i++ {
1962 1 : metrics.WAL.Size += d.mu.mem.queue[i].logSize
1963 1 : }
1964 1 : metrics.WAL.BytesWritten = metrics.Levels[0].TableBytesIn + metrics.WAL.Size
1965 1 : metrics.WAL.Failover = walStats.Failover
1966 1 :
1967 1 : if p := d.mu.versions.picker; p != nil {
1968 1 : compactions := d.getInProgressCompactionInfoLocked(nil)
1969 1 : m := p.getMetrics(compactions)
1970 1 : for level, lm := range m.levels {
1971 1 : metrics.Levels[level].Score = lm.score
1972 1 : metrics.Levels[level].FillFactor = lm.fillFactor
1973 1 : metrics.Levels[level].CompensatedFillFactor = lm.compensatedFillFactor
1974 1 : }
1975 : }
1976 1 : metrics.Table.Physical.Zombie = d.mu.versions.zombieTables.Metrics()
1977 1 : metrics.BlobFiles.Zombie = d.mu.versions.zombieBlobs.Metrics()
1978 1 :
1979 1 : // Populate obsolete blob/table metrics from both the not-yet-enqueued lists
1980 1 : // in the versionSet, and what is already in the delete pacer queue.
1981 1 : deletePacerMetrics := d.deletePacer.Metrics()
1982 1 : metrics.Table.Physical.Obsolete = deletePacerMetrics.InQueue.Tables
1983 1 : for _, fi := range d.mu.versions.obsoleteTables {
1984 1 : metrics.Table.Physical.Obsolete.Inc(fi.FileSize, fi.Placement)
1985 1 : }
1986 1 : metrics.BlobFiles.Obsolete = deletePacerMetrics.InQueue.BlobFiles
1987 1 : for _, fi := range d.mu.versions.obsoleteBlobs {
1988 0 : metrics.BlobFiles.Obsolete.Inc(fi.FileSize, fi.Placement)
1989 0 : }
1990 :
1991 1 : metrics.private.optionsFileSize = d.optionsFileSize
1992 1 :
1993 1 : d.mu.versions.logLock()
1994 1 : metrics.private.manifestFileSize = uint64(d.mu.versions.manifest.Size())
1995 1 : backingStats := d.mu.versions.latest.virtualBackings.Stats()
1996 1 : blobStats, _ := d.mu.versions.latest.blobFiles.Stats()
1997 1 : d.mu.versions.logUnlock()
1998 1 :
1999 1 : metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
2000 1 : if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
2001 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2002 0 : }
2003 1 : metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput
2004 1 : if d.mu.compact.flushing {
2005 1 : metrics.Flush.NumInProgress = 1
2006 1 : }
2007 :
2008 1 : metrics.Table.PendingStatsCollectionCount = int64(len(d.mu.tableStats.pending))
2009 1 : metrics.Table.InitialStatsCollectionComplete = d.mu.tableStats.loadedInitial
2010 1 :
2011 1 : d.mu.Unlock()
2012 1 :
2013 1 : // The table disk usage is due to physical tables plus backings for virtual tables.
2014 1 : tableDiskUsage := d.tableDiskUsageAnnotator.MultiLevelAnnotation(vers.Levels[:])
2015 1 : metrics.Table.Physical.Live.Local = tableDiskUsage.Local.Physical
2016 1 : metrics.Table.Physical.Live.Shared = tableDiskUsage.Shared.Physical
2017 1 : metrics.Table.Physical.Live.External = tableDiskUsage.External.Physical
2018 1 : metrics.Table.Physical.Live.Accumulate(backingStats)
2019 1 :
2020 1 : // TODO(jackson): Consider making these metrics optional.
2021 1 : aggProps := tablePropsAnnotator.MultiLevelAnnotation(vers.Levels[:])
2022 1 : metrics.Keys.RangeKeySetsCount = aggProps.NumRangeKeySets
2023 1 : metrics.Keys.TombstoneCount = aggProps.NumDeletions
2024 1 :
2025 1 : delBytes := deletionBytesAnnotator.MultiLevelAnnotation(vers.Levels[:])
2026 1 : metrics.Table.Garbage.PointDeletionsBytesEstimate = delBytes.PointDels
2027 1 : metrics.Table.Garbage.RangeDeletionsBytesEstimate = delBytes.RangeDels
2028 1 :
2029 1 : for i := 0; i < numLevels; i++ {
2030 1 : aggProps := tablePropsAnnotator.LevelAnnotation(vers.Levels[i])
2031 1 : metrics.Levels[i].Additional.ValueBlocksSize = aggProps.ValueBlocksSize
2032 1 : metrics.Table.Compression.MergeWith(&aggProps.CompressionMetrics)
2033 1 : }
2034 :
2035 1 : metrics.BlobFiles.Live = d.blobFileDiskUsageAnnotator.Annotation(&vers.BlobFiles)
2036 1 :
2037 1 : metrics.BlobFiles.ValueSize = blobStats.ValueSize
2038 1 : metrics.BlobFiles.ReferencedValueSize = blobStats.ReferencedValueSize
2039 1 : metrics.BlobFiles.ReferencedBackingValueSize = blobStats.ReferencedBackingValueSize
2040 1 :
2041 1 : blobCompressionMetrics := blobCompressionStatsAnnotator.Annotation(&vers.BlobFiles)
2042 1 : metrics.BlobFiles.Compression.MergeWith(&blobCompressionMetrics)
2043 1 :
2044 1 : metrics.CompressionCounters.LogicalBytesCompressed = d.compressionCounters.LoadCompressed()
2045 1 : metrics.CompressionCounters.LogicalBytesDecompressed = d.compressionCounters.LoadDecompressed()
2046 1 :
2047 1 : metrics.BlockCache = d.opts.Cache.Metrics()
2048 1 : metrics.FileCache, metrics.Filter = d.fileCache.Metrics()
2049 1 : metrics.TableIters = d.fileCache.IterCount()
2050 1 : metrics.CategoryStats = d.fileCache.SSTStatsCollector().GetStats()
2051 1 :
2052 1 : metrics.DeletePacer = deletePacerMetrics
2053 1 :
2054 1 : metrics.SecondaryCacheMetrics = d.objProvider.Metrics()
2055 1 :
2056 1 : metrics.Uptime = d.opts.private.timeNow().Sub(d.openedAt)
2057 1 :
2058 1 : metrics.manualMemory = manual.GetMetrics()
2059 1 :
2060 1 : return metrics
2061 : }
2062 :
2063 : // sstablesOptions hold the optional parameters to retrieve TableInfo for all sstables.
2064 : type sstablesOptions struct {
2065 : // set to true will return the sstable properties in TableInfo
2066 : withProperties bool
2067 :
2068 : // if set, return sstables that overlap the key range (end-exclusive)
2069 : start []byte
2070 : end []byte
2071 :
2072 : withApproximateSpanBytes bool
2073 : }
2074 :
2075 : // SSTablesOption set optional parameter used by `DB.SSTables`.
2076 : type SSTablesOption func(*sstablesOptions)
2077 :
2078 : // WithProperties enable return sstable properties in each TableInfo.
2079 : //
2080 : // NOTE: if most of the sstable properties need to be read from disk,
2081 : // this options may make method `SSTables` quite slow.
2082 1 : func WithProperties() SSTablesOption {
2083 1 : return func(opt *sstablesOptions) {
2084 1 : opt.withProperties = true
2085 1 : }
2086 : }
2087 :
2088 : // WithKeyRangeFilter ensures returned sstables overlap start and end (end-exclusive)
2089 : // if start and end are both nil these properties have no effect.
2090 1 : func WithKeyRangeFilter(start, end []byte) SSTablesOption {
2091 1 : return func(opt *sstablesOptions) {
2092 1 : opt.end = end
2093 1 : opt.start = start
2094 1 : }
2095 : }
2096 :
2097 : // WithApproximateSpanBytes enables capturing the approximate number of bytes that
2098 : // overlap the provided key span for each sstable.
2099 : // NOTE: This option requires WithKeyRangeFilter.
2100 1 : func WithApproximateSpanBytes() SSTablesOption {
2101 1 : return func(opt *sstablesOptions) {
2102 1 : opt.withApproximateSpanBytes = true
2103 1 : }
2104 : }
2105 :
2106 : // BackingType denotes the type of storage backing a given sstable.
2107 : type BackingType int
2108 :
2109 : const (
2110 : // BackingTypeLocal denotes an sstable stored on local disk according to the
2111 : // objprovider. This file is completely owned by us.
2112 : BackingTypeLocal BackingType = iota
2113 : // BackingTypeShared denotes an sstable stored on shared storage, created
2114 : // by this Pebble instance and possibly shared by other Pebble instances.
2115 : // These types of files have lifecycle managed by Pebble.
2116 : BackingTypeShared
2117 : // BackingTypeSharedForeign denotes an sstable stored on shared storage,
2118 : // created by a Pebble instance other than this one. These types of files have
2119 : // lifecycle managed by Pebble.
2120 : BackingTypeSharedForeign
2121 : // BackingTypeExternal denotes an sstable stored on external storage,
2122 : // not owned by any Pebble instance and with no refcounting/cleanup methods
2123 : // or lifecycle management. An example of an external file is a file restored
2124 : // from a backup.
2125 : BackingTypeExternal
2126 : backingTypeCount
2127 : )
2128 :
2129 : var backingTypeToString = [backingTypeCount]string{
2130 : BackingTypeLocal: "local",
2131 : BackingTypeShared: "shared",
2132 : BackingTypeSharedForeign: "shared-foreign",
2133 : BackingTypeExternal: "external",
2134 : }
2135 :
2136 : // String implements fmt.Stringer.
2137 0 : func (b BackingType) String() string {
2138 0 : return backingTypeToString[b]
2139 0 : }
2140 :
2141 : // SSTableInfo export manifest.TableInfo with sstable.Properties alongside
2142 : // other file backing info.
2143 : type SSTableInfo struct {
2144 : manifest.TableInfo
2145 : TableStats manifest.TableStats
2146 : // Virtual indicates whether the sstable is virtual.
2147 : Virtual bool
2148 : // BackingSSTNum is the disk file number associated with the backing sstable.
2149 : // If Virtual is false, BackingSSTNum == PhysicalTableDiskFileNum(TableNum).
2150 : BackingSSTNum base.DiskFileNum
2151 : // BackingSize is the size of the backing sstable. This is the same with
2152 : // TableInfo.Size when the table is not virtual.
2153 : BackingSize uint64
2154 : // BackingType is the type of storage backing this sstable.
2155 : BackingType BackingType
2156 : // Locator is the remote.Locator backing this sstable, if the backing type is
2157 : // not BackingTypeLocal.
2158 : Locator remote.Locator
2159 : // ApproximateSpanBytes describes the approximate number of bytes within the
2160 : // sstable that fall within a particular span. It's populated only when the
2161 : // ApproximateSpanBytes option is passed into DB.SSTables.
2162 : ApproximateSpanBytes uint64 `json:"ApproximateSpanBytes,omitempty"`
2163 :
2164 : // Properties is the sstable properties of this table. If Virtual is true,
2165 : // then the Properties are associated with the backing sst.
2166 : Properties *sstable.Properties
2167 : }
2168 :
2169 : // SSTables retrieves the current sstables. The returned slice is indexed by
2170 : // level and each level is indexed by the position of the sstable within the
2171 : // level. Note that this information may be out of date due to concurrent
2172 : // flushes and compactions.
2173 1 : func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
2174 1 : opt := &sstablesOptions{}
2175 1 : for _, fn := range opts {
2176 1 : fn(opt)
2177 1 : }
2178 :
2179 1 : if opt.withApproximateSpanBytes && (opt.start == nil || opt.end == nil) {
2180 1 : return nil, errors.Errorf("cannot use WithApproximateSpanBytes without WithKeyRangeFilter option")
2181 1 : }
2182 :
2183 : // Grab and reference the current readState.
2184 1 : readState := d.loadReadState()
2185 1 : defer readState.unref()
2186 1 :
2187 1 : // TODO(peter): This is somewhat expensive, especially on a large
2188 1 : // database. It might be worthwhile to unify TableInfo and TableMetadata and
2189 1 : // then we could simply return current.Files. Note that RocksDB is doing
2190 1 : // something similar to the current code, so perhaps it isn't too bad.
2191 1 : srcLevels := readState.current.Levels
2192 1 : var totalTables int
2193 1 : for i := range srcLevels {
2194 1 : totalTables += srcLevels[i].Len()
2195 1 : }
2196 :
2197 1 : destTables := make([]SSTableInfo, totalTables)
2198 1 : destLevels := make([][]SSTableInfo, len(srcLevels))
2199 1 : for i := range destLevels {
2200 1 : j := 0
2201 1 : for m := range srcLevels[i].All() {
2202 1 : if opt.start != nil && opt.end != nil {
2203 1 : b := base.UserKeyBoundsEndExclusive(opt.start, opt.end)
2204 1 : if !m.Overlaps(d.opts.Comparer.Compare, &b) {
2205 1 : continue
2206 : }
2207 : }
2208 1 : destTables[j] = SSTableInfo{
2209 1 : TableInfo: m.TableInfo(),
2210 1 : }
2211 1 : if stats, ok := m.Stats(); ok {
2212 1 : destTables[j].TableStats = *stats
2213 1 : }
2214 1 : if opt.withProperties {
2215 1 : p, err := d.fileCache.getTableProperties(
2216 1 : m,
2217 1 : )
2218 1 : if err != nil {
2219 0 : return nil, err
2220 0 : }
2221 1 : destTables[j].Properties = p
2222 : }
2223 1 : destTables[j].Virtual = m.Virtual
2224 1 : destTables[j].BackingSSTNum = m.TableBacking.DiskFileNum
2225 1 : destTables[j].BackingSize = m.TableBacking.Size
2226 1 : objMeta, err := d.objProvider.Lookup(base.FileTypeTable, m.TableBacking.DiskFileNum)
2227 1 : if err != nil {
2228 0 : return nil, err
2229 0 : }
2230 1 : if objMeta.IsRemote() {
2231 0 : if objMeta.IsShared() {
2232 0 : if d.objProvider.IsSharedForeign(objMeta) {
2233 0 : destTables[j].BackingType = BackingTypeSharedForeign
2234 0 : } else {
2235 0 : destTables[j].BackingType = BackingTypeShared
2236 0 : }
2237 0 : } else {
2238 0 : destTables[j].BackingType = BackingTypeExternal
2239 0 : }
2240 0 : destTables[j].Locator = objMeta.Remote.Locator
2241 1 : } else {
2242 1 : destTables[j].BackingType = BackingTypeLocal
2243 1 : }
2244 :
2245 1 : if opt.withApproximateSpanBytes {
2246 1 : if m.ContainedWithinSpan(d.opts.Comparer.Compare, opt.start, opt.end) {
2247 1 : destTables[j].ApproximateSpanBytes = m.Size
2248 1 : } else {
2249 1 : size, err := d.fileCache.estimateSize(m, opt.start, opt.end)
2250 1 : if err != nil {
2251 0 : return nil, err
2252 0 : }
2253 1 : destTables[j].ApproximateSpanBytes = size
2254 : }
2255 : }
2256 1 : j++
2257 : }
2258 1 : destLevels[i] = destTables[:j]
2259 1 : destTables = destTables[j:]
2260 : }
2261 :
2262 1 : return destLevels, nil
2263 : }
2264 :
2265 1 : func (d *DB) walPreallocateSize() int {
2266 1 : // Set the WAL preallocate size to 110% of the memtable size. Note that there
2267 1 : // is a bit of apples and oranges in units here as the memtabls size
2268 1 : // corresponds to the memory usage of the memtable while the WAL size is the
2269 1 : // size of the batches (plus overhead) stored in the WAL.
2270 1 : //
2271 1 : // TODO(peter): 110% of the memtable size is quite hefty for a block
2272 1 : // size. This logic is taken from GetWalPreallocateBlockSize in
2273 1 : // RocksDB. Could a smaller preallocation block size be used?
2274 1 : size := d.opts.MemTableSize
2275 1 : size = (size / 10) + size
2276 1 : return int(size)
2277 1 : }
2278 :
2279 : func (d *DB) newMemTable(
2280 : logNum base.DiskFileNum, logSeqNum base.SeqNum, minSize uint64,
2281 1 : ) (*memTable, *flushableEntry) {
2282 1 : targetSize := minSize + uint64(memTableEmptySize)
2283 1 : // The targetSize should be less than MemTableSize, because any batch >=
2284 1 : // MemTableSize/2 should be treated as a large flushable batch.
2285 1 : if targetSize > d.opts.MemTableSize {
2286 0 : panic(errors.AssertionFailedf("attempting to allocate memtable larger than MemTableSize"))
2287 : }
2288 : // Double until the next memtable size is at least large enough to fit
2289 : // minSize.
2290 1 : for d.mu.mem.nextSize < targetSize {
2291 1 : d.mu.mem.nextSize = min(2*d.mu.mem.nextSize, d.opts.MemTableSize)
2292 1 : }
2293 1 : size := d.mu.mem.nextSize
2294 1 : // The next memtable should be double the size, up to Options.MemTableSize.
2295 1 : if d.mu.mem.nextSize < d.opts.MemTableSize {
2296 1 : d.mu.mem.nextSize = min(2*d.mu.mem.nextSize, d.opts.MemTableSize)
2297 1 : }
2298 :
2299 1 : memtblOpts := memTableOptions{
2300 1 : Options: d.opts,
2301 1 : logSeqNum: logSeqNum,
2302 1 : }
2303 1 :
2304 1 : // Before attempting to allocate a new memtable, check if there's one
2305 1 : // available for recycling in memTableRecycle. Large contiguous allocations
2306 1 : // can be costly as fragmentation makes it more difficult to find a large
2307 1 : // contiguous free space. We've observed 64MB allocations taking 10ms+.
2308 1 : //
2309 1 : // To reduce these costly allocations, up to 1 obsolete memtable is stashed
2310 1 : // in `d.memTableRecycle` to allow a future memtable rotation to reuse
2311 1 : // existing memory.
2312 1 : var mem *memTable
2313 1 : mem = d.memTableRecycle.Swap(nil)
2314 1 : if mem != nil && uint64(mem.arenaBuf.Len()) != size {
2315 1 : d.freeMemTable(mem)
2316 1 : mem = nil
2317 1 : }
2318 1 : if mem != nil {
2319 1 : // Carry through the existing buffer and memory reservation.
2320 1 : memtblOpts.arenaBuf = mem.arenaBuf
2321 1 : memtblOpts.releaseAccountingReservation = mem.releaseAccountingReservation
2322 1 : } else {
2323 1 : mem = new(memTable)
2324 1 : memtblOpts.arenaBuf = manual.New(manual.MemTable, uintptr(size))
2325 1 : memtblOpts.releaseAccountingReservation = d.opts.Cache.Reserve(int(size))
2326 1 : d.memTableCount.Add(1)
2327 1 : d.memTableReserved.Add(int64(size))
2328 1 :
2329 1 : // Note: this is a no-op if invariants are disabled or race is enabled.
2330 1 : invariants.SetFinalizer(mem, checkMemTable)
2331 1 : }
2332 1 : mem.init(memtblOpts)
2333 1 :
2334 1 : entry := d.newFlushableEntry(mem, logNum, logSeqNum)
2335 1 : entry.releaseMemAccounting = func() {
2336 1 : // If the user leaks iterators, we may be releasing the memtable after
2337 1 : // the DB is already closed. In this case, we want to just release the
2338 1 : // memory because DB.Close won't come along to free it for us.
2339 1 : if err := d.closed.Load(); err != nil {
2340 1 : d.freeMemTable(mem)
2341 1 : return
2342 1 : }
2343 :
2344 : // The next memtable allocation might be able to reuse this memtable.
2345 : // Stash it on d.memTableRecycle.
2346 1 : if unusedMem := d.memTableRecycle.Swap(mem); unusedMem != nil {
2347 1 : // There was already a memtable waiting to be recycled. We're now
2348 1 : // responsible for freeing it.
2349 1 : d.freeMemTable(unusedMem)
2350 1 : }
2351 : }
2352 1 : return mem, entry
2353 : }
2354 :
2355 1 : func (d *DB) freeMemTable(m *memTable) {
2356 1 : d.memTableCount.Add(-1)
2357 1 : d.memTableReserved.Add(-int64(m.arenaBuf.Len()))
2358 1 : m.free()
2359 1 : }
2360 :
2361 : func (d *DB) newFlushableEntry(
2362 : f flushable, logNum base.DiskFileNum, logSeqNum base.SeqNum,
2363 1 : ) *flushableEntry {
2364 1 : fe := &flushableEntry{
2365 1 : flushable: f,
2366 1 : flushed: make(chan struct{}),
2367 1 : logNum: logNum,
2368 1 : logSeqNum: logSeqNum,
2369 1 : deleteFn: d.mu.versions.addObsolete,
2370 1 : deleteFnLocked: d.mu.versions.addObsoleteLocked,
2371 1 : }
2372 1 : fe.readerRefs.Store(1)
2373 1 : return fe
2374 1 : }
2375 :
2376 : // maybeInduceWriteStall is called before performing a memtable rotation in
2377 : // makeRoomForWrite. In some conditions, we prefer to stall the user's write
2378 : // workload rather than continuing to accept writes that may result in resource
2379 : // exhaustion or prohibitively slow reads.
2380 : //
2381 : // There are a couple reasons we might wait to rotate the memtable and
2382 : // instead induce a write stall:
2383 : // 1. If too many memtables have queued, we wait for a flush to finish before
2384 : // creating another memtable.
2385 : // 2. If L0 read amplification has grown too high, we wait for compactions
2386 : // to reduce the read amplification before accepting more writes that will
2387 : // increase write pressure.
2388 : //
2389 : // maybeInduceWriteStall checks these stall conditions, and if present, waits
2390 : // for them to abate.
2391 1 : func (d *DB) maybeInduceWriteStall(b *Batch) {
2392 1 : stalled := false
2393 1 : // This function will call EventListener.WriteStallBegin at most once. If
2394 1 : // it does call it, it will call EventListener.WriteStallEnd once before
2395 1 : // returning.
2396 1 : var timer *time.Timer
2397 1 : for {
2398 1 : var size uint64
2399 1 : for i := range d.mu.mem.queue {
2400 1 : size += d.mu.mem.queue[i].totalBytes()
2401 1 : }
2402 1 : if size >= uint64(d.opts.MemTableStopWritesThreshold)*d.opts.MemTableSize &&
2403 1 : !d.mu.log.manager.ElevateWriteStallThresholdForFailover() {
2404 1 : // We have filled up the current memtable, but already queued memtables
2405 1 : // are still flushing, so we wait.
2406 1 : if !stalled {
2407 1 : stalled = true
2408 1 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2409 1 : Reason: "memtable count limit reached",
2410 1 : })
2411 1 : }
2412 1 : beforeWait := crtime.NowMono()
2413 1 : // NB: In a rare case, we can start a write stall, and then the system
2414 1 : // may detect WAL failover, resulting in
2415 1 : // ElevateWriteStallThresholdForFailover returning true. So we want to
2416 1 : // recheck the predicate periodically, which we do by signaling the
2417 1 : // condition variable.
2418 1 : if timer == nil {
2419 1 : timer = time.AfterFunc(time.Second, d.mu.compact.cond.Broadcast)
2420 1 : } else {
2421 1 : timer.Reset(time.Second)
2422 1 : }
2423 1 : d.mu.compact.cond.Wait()
2424 1 : timer.Stop()
2425 1 : if b != nil {
2426 1 : b.commitStats.MemTableWriteStallDuration += beforeWait.Elapsed()
2427 1 : }
2428 1 : continue
2429 : }
2430 1 : l0ReadAmp := d.mu.versions.latest.l0Organizer.ReadAmplification()
2431 1 : if l0ReadAmp >= d.opts.L0StopWritesThreshold {
2432 1 : // There are too many level-0 files, so we wait.
2433 1 : if !stalled {
2434 1 : stalled = true
2435 1 : d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
2436 1 : Reason: "L0 file count limit exceeded",
2437 1 : })
2438 1 : }
2439 1 : beforeWait := crtime.NowMono()
2440 1 : d.mu.compact.cond.Wait()
2441 1 : if b != nil {
2442 1 : b.commitStats.L0ReadAmpWriteStallDuration += beforeWait.Elapsed()
2443 1 : }
2444 1 : continue
2445 : }
2446 : // Not stalled.
2447 1 : if stalled {
2448 1 : d.opts.EventListener.WriteStallEnd()
2449 1 : }
2450 1 : return
2451 : }
2452 : }
2453 :
2454 : // makeRoomForWrite rotates the current mutable memtable, ensuring that the
2455 : // resulting mutable memtable has room to hold the contents of the provided
2456 : // Batch. The current memtable is rotated (marked as immutable) and a new
2457 : // mutable memtable is allocated. It reserves space in the new memtable and adds
2458 : // a reference to the memtable. The caller must later ensure that the memtable
2459 : // is unreferenced. This memtable rotation also causes a log rotation.
2460 : //
2461 : // If the current memtable is not full but the caller wishes to trigger a
2462 : // rotation regardless, the caller may pass a nil Batch, and no space in the
2463 : // resulting mutable memtable will be reserved.
2464 : //
2465 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2466 : // may be released and reacquired.
2467 1 : func (d *DB) makeRoomForWrite(b *Batch) error {
2468 1 : if b != nil && b.ingestedSSTBatch {
2469 0 : panic("pebble: invalid function call")
2470 : }
2471 1 : d.maybeInduceWriteStall(b)
2472 1 :
2473 1 : var newLogNum base.DiskFileNum
2474 1 : var prevLogSize uint64
2475 1 : if !d.opts.DisableWAL {
2476 1 : beforeRotate := crtime.NowMono()
2477 1 : newLogNum, prevLogSize = d.rotateWAL()
2478 1 : if b != nil {
2479 1 : b.commitStats.WALRotationDuration += beforeRotate.Elapsed()
2480 1 : }
2481 : }
2482 1 : immMem := d.mu.mem.mutable
2483 1 : imm := d.mu.mem.queue[len(d.mu.mem.queue)-1]
2484 1 : imm.logSize = prevLogSize
2485 1 :
2486 1 : var logSeqNum base.SeqNum
2487 1 : var minSize uint64
2488 1 : if b != nil {
2489 1 : logSeqNum = b.SeqNum()
2490 1 : if b.flushable != nil {
2491 1 : logSeqNum += base.SeqNum(b.Count())
2492 1 : // The batch is too large to fit in the memtable so add it directly to
2493 1 : // the immutable queue. The flushable batch is associated with the same
2494 1 : // log as the immutable memtable, but logically occurs after it in
2495 1 : // seqnum space. We ensure while flushing that the flushable batch
2496 1 : // is flushed along with the previous memtable in the flushable
2497 1 : // queue. See the top level comment in DB.flush1 to learn how this
2498 1 : // is ensured.
2499 1 : //
2500 1 : // See DB.commitWrite for the special handling of log writes for large
2501 1 : // batches. In particular, the large batch has already written to
2502 1 : // imm.logNum.
2503 1 : entry := d.newFlushableEntry(b.flushable, imm.logNum, b.SeqNum())
2504 1 : // The large batch is by definition large. Reserve space from the cache
2505 1 : // for it until it is flushed.
2506 1 : entry.releaseMemAccounting = d.opts.Cache.Reserve(int(b.flushable.totalBytes()))
2507 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2508 1 : } else {
2509 1 : minSize = b.memTableSize
2510 1 : }
2511 1 : } else {
2512 1 : // b == nil
2513 1 : //
2514 1 : // This is a manual forced flush.
2515 1 : logSeqNum = base.SeqNum(d.mu.versions.logSeqNum.Load())
2516 1 : imm.flushForced = true
2517 1 : // If we are manually flushing and we used less than half of the bytes in
2518 1 : // the memtable, don't increase the size for the next memtable. This
2519 1 : // reduces memtable memory pressure when an application is frequently
2520 1 : // manually flushing.
2521 1 : if uint64(immMem.availBytes()) > immMem.totalBytes()/2 {
2522 1 : d.mu.mem.nextSize = immMem.totalBytes()
2523 1 : }
2524 : }
2525 1 : d.rotateMemtable(newLogNum, logSeqNum, immMem, minSize)
2526 1 : if b != nil && b.flushable == nil {
2527 1 : err := d.mu.mem.mutable.prepare(b)
2528 1 : // Reserving enough space for the batch after rotation must never fail.
2529 1 : // We pass in a minSize that's equal to b.memtableSize to ensure that
2530 1 : // memtable rotation allocates a memtable sufficiently large. We also
2531 1 : // held d.commit.mu for the entirety of this function, ensuring that no
2532 1 : // other committers may have reserved memory in the new memtable yet.
2533 1 : if err == arenaskl.ErrArenaFull {
2534 0 : panic(errors.AssertionFailedf("memtable still full after rotation"))
2535 : }
2536 1 : return err
2537 : }
2538 1 : return nil
2539 : }
2540 :
2541 : // Both DB.mu and commitPipeline.mu must be held by the caller.
2542 : func (d *DB) rotateMemtable(
2543 : newLogNum base.DiskFileNum, logSeqNum base.SeqNum, prev *memTable, minSize uint64,
2544 1 : ) {
2545 1 : // Create a new memtable, scheduling the previous one for flushing. We do
2546 1 : // this even if the previous memtable was empty because the DB.Flush
2547 1 : // mechanism is dependent on being able to wait for the empty memtable to
2548 1 : // flush. We can't just mark the empty memtable as flushed here because we
2549 1 : // also have to wait for all previous immutable tables to
2550 1 : // flush. Additionally, the memtable is tied to particular WAL file and we
2551 1 : // want to go through the flush path in order to recycle that WAL file.
2552 1 : //
2553 1 : // NB: newLogNum corresponds to the WAL that contains mutations that are
2554 1 : // present in the new memtable. When immutable memtables are flushed to
2555 1 : // disk, a VersionEdit will be created telling the manifest the minimum
2556 1 : // unflushed log number (which will be the next one in d.mu.mem.mutable
2557 1 : // that was not flushed).
2558 1 : //
2559 1 : // NB: prev should be the current mutable memtable.
2560 1 : var entry *flushableEntry
2561 1 : d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum, minSize)
2562 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
2563 1 : // d.logSize tracks the log size of the WAL file corresponding to the most
2564 1 : // recent flushable. The log size of the previous mutable memtable no longer
2565 1 : // applies to the current mutable memtable.
2566 1 : //
2567 1 : // It's tempting to perform this update in rotateWAL, but that would not be
2568 1 : // atomic with the enqueue of the new flushable. A call to DB.Metrics()
2569 1 : // could acquire DB.mu after the WAL has been rotated but before the new
2570 1 : // memtable has been appended; this would result in omitting the log size of
2571 1 : // the most recent flushable.
2572 1 : d.logSize.Store(0)
2573 1 : d.updateReadStateLocked(nil)
2574 1 : if prev.writerUnref() {
2575 1 : d.maybeScheduleFlush()
2576 1 : }
2577 : }
2578 :
2579 : // rotateWAL creates a new write-ahead log, possibly recycling a previous WAL's
2580 : // files. It returns the file number assigned to the new WAL, and the size of
2581 : // the previous WAL file.
2582 : //
2583 : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
2584 : // may be released and reacquired.
2585 1 : func (d *DB) rotateWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
2586 1 : if d.opts.DisableWAL {
2587 0 : panic("pebble: invalid function call")
2588 : }
2589 1 : jobID := d.newJobIDLocked()
2590 1 : newLogNum = d.mu.versions.getNextDiskFileNum()
2591 1 :
2592 1 : d.mu.Unlock()
2593 1 : // Close the previous log first. This writes an EOF trailer
2594 1 : // signifying the end of the file and syncs it to disk. We must
2595 1 : // close the previous log before linking the new log file,
2596 1 : // otherwise a crash could leave both logs with unclean tails, and
2597 1 : // Open will treat the previous log as corrupt.
2598 1 : offset, err := d.mu.log.writer.Close()
2599 1 : if err != nil {
2600 0 : // What to do here? Stumbling on doesn't seem worthwhile. If we failed to
2601 0 : // close the previous log it is possible we lost a write.
2602 0 : d.opts.Logger.Fatalf("pebble: error closing WAL; data loss possible if we continue: %s", err)
2603 0 : }
2604 1 : prevLogSize = uint64(offset)
2605 1 : metrics := d.mu.log.writer.Metrics()
2606 1 :
2607 1 : d.mu.Lock()
2608 1 : if err := d.mu.log.metrics.LogWriterMetrics.Merge(&metrics); err != nil {
2609 0 : d.opts.Logger.Errorf("metrics error: %s", err)
2610 0 : }
2611 :
2612 1 : d.mu.Unlock()
2613 1 : writer, err := d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
2614 1 : if err != nil {
2615 0 : panic(err)
2616 : }
2617 :
2618 1 : d.mu.Lock()
2619 1 : d.mu.log.writer = writer
2620 1 : return newLogNum, prevLogSize
2621 : }
2622 :
2623 1 : func (d *DB) getEarliestUnflushedSeqNumLocked() base.SeqNum {
2624 1 : seqNum := base.SeqNumMax
2625 1 : for i := range d.mu.mem.queue {
2626 1 : logSeqNum := d.mu.mem.queue[i].logSeqNum
2627 1 : if seqNum > logSeqNum {
2628 1 : seqNum = logSeqNum
2629 1 : }
2630 : }
2631 1 : return seqNum
2632 : }
2633 :
2634 1 : func (d *DB) getInProgressCompactionInfoLocked(finishing compaction) (rv []compactionInfo) {
2635 1 : for c := range d.mu.compact.inProgress {
2636 1 : if !c.IsFlush() && (finishing == nil || c != finishing) {
2637 1 : rv = append(rv, c.Info())
2638 1 : }
2639 : }
2640 1 : return
2641 : }
2642 :
2643 1 : func inProgressL0Compactions(inProgress []compactionInfo) []manifest.L0Compaction {
2644 1 : var compactions []manifest.L0Compaction
2645 1 : for _, info := range inProgress {
2646 1 : // Skip in-progress compactions that have already committed; the L0
2647 1 : // sublevels initialization code requires the set of in-progress
2648 1 : // compactions to be consistent with the current version. Compactions
2649 1 : // with versionEditApplied=true are already applied to the current
2650 1 : // version and but are performing cleanup without the database mutex.
2651 1 : if info.versionEditApplied {
2652 1 : continue
2653 : }
2654 1 : l0 := false
2655 1 : for _, cl := range info.inputs {
2656 1 : l0 = l0 || cl.level == 0
2657 1 : }
2658 1 : if !l0 {
2659 1 : continue
2660 : }
2661 1 : compactions = append(compactions, manifest.L0Compaction{
2662 1 : Bounds: *info.bounds,
2663 1 : IsIntraL0: info.outputLevel == 0,
2664 1 : })
2665 : }
2666 1 : return compactions
2667 : }
2668 :
2669 : // firstError returns the first non-nil error of err0 and err1, or nil if both
2670 : // are nil.
2671 1 : func firstError(err0, err1 error) error {
2672 1 : if err0 != nil {
2673 1 : return err0
2674 1 : }
2675 1 : return err1
2676 : }
2677 :
2678 : // SetCreatorID sets the CreatorID which is needed in order to use shared objects.
2679 : // Remote object usage is disabled until this method is called the first time.
2680 : // Once set, the Creator ID is persisted and cannot change.
2681 : //
2682 : // Does nothing if SharedStorage was not set in the options when the DB was
2683 : // opened or if the DB is in read-only mode.
2684 1 : func (d *DB) SetCreatorID(creatorID uint64) error {
2685 1 : if d.opts.Experimental.RemoteStorage == nil || d.opts.ReadOnly {
2686 0 : return nil
2687 0 : }
2688 1 : return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
2689 : }
2690 :
2691 : // KeyStatistics keeps track of the number of keys that have been pinned by a
2692 : // snapshot as well as counts of the different key kinds in the lsm.
2693 : //
2694 : // One way of using the accumulated stats, when we only have sets and dels,
2695 : // and say the counts are represented as del_count, set_count,
2696 : // del_latest_count, set_latest_count, snapshot_pinned_count.
2697 : //
2698 : // - del_latest_count + set_latest_count is the set of unique user keys
2699 : // (unique).
2700 : //
2701 : // - set_latest_count is the set of live unique user keys (live_unique).
2702 : //
2703 : // - Garbage is del_count + set_count - live_unique.
2704 : //
2705 : // - If everything were in the LSM, del_count+set_count-snapshot_pinned_count
2706 : // would also be the set of unique user keys (note that
2707 : // snapshot_pinned_count is counting something different -- see comment below).
2708 : // But snapshot_pinned_count only counts keys in the LSM so the excess here
2709 : // must be keys in memtables.
2710 : type KeyStatistics struct {
2711 : // TODO(sumeer): the SnapshotPinned* are incorrect in that these older
2712 : // versions can be in a different level. Either fix the accounting or
2713 : // rename these fields.
2714 :
2715 : // SnapshotPinnedKeys represents obsolete keys that cannot be elided during
2716 : // a compaction, because they are required by an open snapshot.
2717 : SnapshotPinnedKeys int
2718 : // SnapshotPinnedKeysBytes is the total number of bytes of all snapshot
2719 : // pinned keys.
2720 : SnapshotPinnedKeysBytes uint64
2721 : // KindsCount is the count for each kind of key. It includes point keys,
2722 : // range deletes and range keys.
2723 : KindsCount [InternalKeyKindMax + 1]int
2724 : // LatestKindsCount is the count for each kind of key when it is the latest
2725 : // kind for a user key. It is only populated for point keys.
2726 : LatestKindsCount [InternalKeyKindMax + 1]int
2727 : }
2728 :
2729 : // LSMKeyStatistics is used by DB.ScanStatistics.
2730 : type LSMKeyStatistics struct {
2731 : Accumulated KeyStatistics
2732 : // Levels contains statistics only for point keys. Range deletions and range keys will
2733 : // appear in Accumulated but not Levels.
2734 : Levels [numLevels]KeyStatistics
2735 : // BytesRead represents the logical, pre-compression size of keys and values read
2736 : BytesRead uint64
2737 : }
2738 :
2739 : // ScanStatisticsOptions is used by DB.ScanStatistics.
2740 : type ScanStatisticsOptions struct {
2741 : // LimitBytesPerSecond indicates the number of bytes that are able to be read
2742 : // per second using ScanInternal.
2743 : // A value of 0 indicates that there is no limit set.
2744 : LimitBytesPerSecond int64
2745 : }
2746 :
2747 : // ScanStatistics returns the count of different key kinds within the lsm for a
2748 : // key span [lower, upper) as well as the number of snapshot keys.
2749 : func (d *DB) ScanStatistics(
2750 : ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions,
2751 1 : ) (stats LSMKeyStatistics, err error) {
2752 1 : var prevKey InternalKey
2753 1 : var rateLimitFunc func(key *InternalKey, val LazyValue) error
2754 1 : tb := tokenbucket.TokenBucket{}
2755 1 :
2756 1 : if opts.LimitBytesPerSecond != 0 {
2757 0 : const minBytesPerSec = 100 * 1024
2758 0 : if opts.LimitBytesPerSecond < minBytesPerSec {
2759 0 : return stats, errors.Newf("pebble: ScanStatistics read bandwidth limit %d is below minimum %d", opts.LimitBytesPerSecond, minBytesPerSec)
2760 0 : }
2761 : // Each "token" roughly corresponds to a byte that was read.
2762 0 : tb.Init(tokenbucket.TokensPerSecond(opts.LimitBytesPerSecond), tokenbucket.Tokens(1024))
2763 0 : rateLimitFunc = func(key *InternalKey, val LazyValue) error {
2764 0 : return tb.WaitCtx(ctx, tokenbucket.Tokens(key.Size()+val.Len()))
2765 0 : }
2766 : }
2767 :
2768 1 : scanInternalOpts := &ScanInternalOptions{
2769 1 : VisitPointKey: func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
2770 1 : // If the previous key is equal to the current point key, the current key was
2771 1 : // pinned by a snapshot.
2772 1 : size := uint64(key.Size())
2773 1 : kind := key.Kind()
2774 1 : sameKey := d.equal(prevKey.UserKey, key.UserKey)
2775 1 : if iterInfo.Kind == IteratorLevelLSM && sameKey {
2776 1 : stats.Levels[iterInfo.Level].SnapshotPinnedKeys++
2777 1 : stats.Levels[iterInfo.Level].SnapshotPinnedKeysBytes += size
2778 1 : stats.Accumulated.SnapshotPinnedKeys++
2779 1 : stats.Accumulated.SnapshotPinnedKeysBytes += size
2780 1 : }
2781 1 : if iterInfo.Kind == IteratorLevelLSM {
2782 1 : stats.Levels[iterInfo.Level].KindsCount[kind]++
2783 1 : }
2784 1 : if !sameKey {
2785 1 : if iterInfo.Kind == IteratorLevelLSM {
2786 1 : stats.Levels[iterInfo.Level].LatestKindsCount[kind]++
2787 1 : }
2788 1 : stats.Accumulated.LatestKindsCount[kind]++
2789 : }
2790 :
2791 1 : stats.Accumulated.KindsCount[kind]++
2792 1 : prevKey.CopyFrom(*key)
2793 1 : stats.BytesRead += uint64(key.Size() + value.Len())
2794 1 : return nil
2795 : },
2796 0 : VisitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
2797 0 : stats.Accumulated.KindsCount[InternalKeyKindRangeDelete]++
2798 0 : stats.BytesRead += uint64(len(start) + len(end))
2799 0 : return nil
2800 0 : },
2801 0 : VisitRangeKey: func(start, end []byte, keys []rangekey.Key) error {
2802 0 : stats.BytesRead += uint64(len(start) + len(end))
2803 0 : for _, key := range keys {
2804 0 : stats.Accumulated.KindsCount[key.Kind()]++
2805 0 : stats.BytesRead += uint64(len(key.Value) + len(key.Suffix))
2806 0 : }
2807 0 : return nil
2808 : },
2809 : IncludeObsoleteKeys: true,
2810 : IterOptions: IterOptions{
2811 : KeyTypes: IterKeyTypePointsAndRanges,
2812 : LowerBound: lower,
2813 : UpperBound: upper,
2814 : },
2815 : RateLimitFunc: rateLimitFunc,
2816 : }
2817 1 : iter, err := d.newInternalIter(ctx, snapshotIterOpts{}, scanInternalOpts)
2818 1 : if err != nil {
2819 0 : return LSMKeyStatistics{}, err
2820 0 : }
2821 1 : defer func() { err = errors.CombineErrors(err, iter.Close()) }()
2822 :
2823 1 : err = scanInternalImpl(ctx, iter, scanInternalOpts)
2824 1 : if err != nil {
2825 0 : return LSMKeyStatistics{}, err
2826 0 : }
2827 1 : return stats, nil
2828 : }
2829 :
2830 : // ObjProvider returns the objstorage.Provider for this database. Meant to be
2831 : // used for internal purposes only.
2832 1 : func (d *DB) ObjProvider() objstorage.Provider {
2833 1 : return d.objProvider
2834 1 : }
2835 :
2836 : // DebugString returns a debugging string describing the LSM.
2837 0 : func (d *DB) DebugString() string {
2838 0 : return d.DebugCurrentVersion().DebugString()
2839 0 : }
2840 :
2841 : // DebugCurrentVersion returns the current LSM tree metadata. Should only be
2842 : // used for testing/debugging.
2843 1 : func (d *DB) DebugCurrentVersion() *manifest.Version {
2844 1 : d.mu.Lock()
2845 1 : defer d.mu.Unlock()
2846 1 : return d.mu.versions.currentVersion()
2847 1 : }
2848 :
2849 1 : func (d *DB) removeFromOngoingExcises(seqNum base.SeqNum) {
2850 1 : d.mu.Lock()
2851 1 : defer d.mu.Unlock()
2852 1 : _, ok := d.mu.snapshots.ongoingExcises[seqNum]
2853 1 : if !ok {
2854 0 : panic(errors.AssertionFailedf("pebble: no ongoing excise for seqnum %d", seqNum))
2855 : }
2856 1 : delete(d.mu.snapshots.ongoingExcises, seqNum)
2857 1 : d.mu.snapshots.ongoingExcisesRemovedCond.Broadcast()
2858 : }
|