LCOV - code coverage report
Current view: top level - pebble - open.go (source / functions) Coverage Total Hit
Test: 2025-10-06 08:19Z 3214797c - tests + meta.lcov Lines: 90.0 % 927 834
Test Date: 2025-10-06 08:22:01 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "context"
      10              :         "encoding/binary"
      11              :         "fmt"
      12              :         "io"
      13              :         "math"
      14              :         "os"
      15              :         "slices"
      16              :         "sync/atomic"
      17              :         "time"
      18              : 
      19              :         "github.com/cockroachdb/crlib/crtime"
      20              :         "github.com/cockroachdb/errors"
      21              :         "github.com/cockroachdb/errors/oserror"
      22              :         "github.com/cockroachdb/pebble/batchrepr"
      23              :         "github.com/cockroachdb/pebble/internal/arenaskl"
      24              :         "github.com/cockroachdb/pebble/internal/base"
      25              :         "github.com/cockroachdb/pebble/internal/cache"
      26              :         "github.com/cockroachdb/pebble/internal/invariants"
      27              :         "github.com/cockroachdb/pebble/internal/keyspan"
      28              :         "github.com/cockroachdb/pebble/internal/manifest"
      29              :         "github.com/cockroachdb/pebble/internal/manual"
      30              :         "github.com/cockroachdb/pebble/objstorage"
      31              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      32              :         "github.com/cockroachdb/pebble/objstorage/remote"
      33              :         "github.com/cockroachdb/pebble/record"
      34              :         "github.com/cockroachdb/pebble/vfs"
      35              :         "github.com/cockroachdb/pebble/wal"
      36              :         "github.com/prometheus/client_golang/prometheus"
      37              : )
      38              : 
      39              : const (
      40              :         initialMemTableSize = 256 << 10 // 256 KB
      41              : 
      42              :         // The max batch size is limited by the uint32 offsets stored in
      43              :         // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
      44              :         //
      45              :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      46              :         // end of an allocation fits in uint32.
      47              :         //
      48              :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      49              :         // 2GB).
      50              :         maxBatchSize = min(math.MaxUint32, math.MaxInt)
      51              : 
      52              :         // The max memtable size is limited by the uint32 offsets stored in
      53              :         // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
      54              :         //
      55              :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      56              :         // end of an allocation fits in uint32.
      57              :         //
      58              :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      59              :         // 2GB).
      60              :         maxMemTableSize = min(math.MaxUint32, math.MaxInt)
      61              : )
      62              : 
      63              : // FileCacheSize can be used to determine the file
      64              : // cache size for a single db, given the maximum open
      65              : // files which can be used by a file cache which is
      66              : // only used by a single db.
      67            2 : func FileCacheSize(maxOpenFiles int) int {
      68            2 :         fileCacheSize := maxOpenFiles - numNonFileCacheFiles
      69            2 :         if fileCacheSize < minFileCacheSize {
      70            2 :                 fileCacheSize = minFileCacheSize
      71            2 :         }
      72            2 :         return fileCacheSize
      73              : }
      74              : 
      75              : // Open opens a DB whose files live in the given directory.
      76              : //
      77              : // IsCorruptionError() can be use to determine if the error is caused by on-disk
      78              : // corruption.
      79            2 : func Open(dirname string, opts *Options) (db *DB, err error) {
      80            2 :         // Make a copy of the options so that we don't mutate the passed in options.
      81            2 :         opts = opts.Clone()
      82            2 :         opts.EnsureDefaults()
      83            2 :         if err := opts.Validate(); err != nil {
      84            0 :                 return nil, err
      85            0 :         }
      86            2 :         if opts.LoggerAndTracer == nil {
      87            2 :                 opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
      88            2 :         } else {
      89            1 :                 opts.Logger = opts.LoggerAndTracer
      90            1 :         }
      91              : 
      92            2 :         if invariants.Sometimes(5) {
      93            2 :                 assertComparer := base.MakeAssertComparer(*opts.Comparer)
      94            2 :                 opts.Comparer = &assertComparer
      95            2 :         }
      96              : 
      97              :         // In all error cases, we return db = nil; this is used by various
      98              :         // deferred cleanups.
      99            2 :         maybeCleanUp := func(fn func() error) {
     100            2 :                 if db == nil {
     101            1 :                         err = errors.CombineErrors(err, fn())
     102            1 :                 }
     103              :         }
     104              : 
     105              :         // Open the database and WAL directories first.
     106            2 :         walDirname, secondaryWalDirName, dataDir, err := prepareAndOpenDirs(dirname, opts)
     107            2 :         if err != nil {
     108            1 :                 return nil, errors.Wrapf(err, "error opening database at %q", dirname)
     109            1 :         }
     110            2 :         defer maybeCleanUp(dataDir.Close)
     111            2 : 
     112            2 :         // Lock the database directory.
     113            2 :         fileLock, err := base.AcquireOrValidateDirectoryLock(opts.Lock, dirname, opts.FS)
     114            2 :         if err != nil {
     115            1 :                 return nil, err
     116            1 :         }
     117            2 :         defer maybeCleanUp(fileLock.Close)
     118            2 : 
     119            2 :         // List the directory contents. This also happens to include WAL log files, if
     120            2 :         // they are in the same dir, but we will ignore those below. The provider is
     121            2 :         // also given this list, but it ignores non sstable files.
     122            2 :         ls, err := opts.FS.List(dirname)
     123            2 :         if err != nil {
     124            1 :                 return nil, err
     125            1 :         }
     126              : 
     127              :         // Establish the format major version.
     128            2 :         formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
     129            2 :         if err != nil {
     130            1 :                 return nil, err
     131            1 :         }
     132            2 :         defer maybeCleanUp(formatVersionMarker.Close)
     133            2 : 
     134            2 :         noFormatVersionMarker := formatVersion == FormatDefault
     135            2 :         if noFormatVersionMarker {
     136            2 :                 // We will initialize the store at the minimum possible format, then upgrade
     137            2 :                 // the format to the desired one. This helps test the format upgrade code.
     138            2 :                 formatVersion = FormatMinSupported
     139            2 :                 if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
     140            2 :                         formatVersion = FormatMinForSharedObjects
     141            2 :                 }
     142              :                 // There is no format version marker file. There are three cases:
     143              :                 //  - we are trying to open an existing store that was created at
     144              :                 //    FormatMostCompatible (the only one without a version marker file)
     145              :                 //  - we are creating a new store;
     146              :                 //  - we are retrying a failed creation.
     147              :                 //
     148              :                 // To error in the first case, we set ErrorIfNotPristine.
     149            2 :                 opts.ErrorIfNotPristine = true
     150            2 :                 defer func() {
     151            2 :                         if err != nil && errors.Is(err, ErrDBNotPristine) {
     152            0 :                                 // We must be trying to open an existing store at FormatMostCompatible.
     153            0 :                                 // Correct the error in this case -we
     154            0 :                                 err = errors.Newf(
     155            0 :                                         "pebble: database %q written in format major version 1 which is no longer supported",
     156            0 :                                         dirname)
     157            0 :                         }
     158              :                 }()
     159              :         }
     160              : 
     161              :         // Find the currently active manifest, if there is one.
     162            2 :         manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
     163            2 :         if err != nil {
     164            1 :                 return nil, errors.Wrapf(err, "pebble: database %q", dirname)
     165            1 :         }
     166            2 :         defer maybeCleanUp(manifestMarker.Close)
     167            2 : 
     168            2 :         // Atomic markers may leave behind obsolete files if there's a crash
     169            2 :         // mid-update. Clean these up if we're not in read-only mode.
     170            2 :         if !opts.ReadOnly {
     171            2 :                 if err := formatVersionMarker.RemoveObsolete(); err != nil {
     172            0 :                         return nil, err
     173            0 :                 }
     174            2 :                 if err := manifestMarker.RemoveObsolete(); err != nil {
     175            0 :                         return nil, err
     176            0 :                 }
     177              :         }
     178              : 
     179            2 :         if opts.Cache == nil {
     180            2 :                 opts.Cache = cache.New(opts.CacheSize)
     181            2 :                 defer opts.Cache.Unref()
     182            2 :         }
     183              : 
     184            2 :         d := &DB{
     185            2 :                 cacheHandle:         opts.Cache.NewHandle(),
     186            2 :                 dirname:             dirname,
     187            2 :                 opts:                opts,
     188            2 :                 cmp:                 opts.Comparer.Compare,
     189            2 :                 equal:               opts.Comparer.Equal,
     190            2 :                 merge:               opts.Merger.Merge,
     191            2 :                 split:               opts.Comparer.Split,
     192            2 :                 abbreviatedKey:      opts.Comparer.AbbreviatedKey,
     193            2 :                 largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
     194            2 :                 dataDirLock:         fileLock,
     195            2 :                 dataDir:             dataDir,
     196            2 :                 closed:              new(atomic.Value),
     197            2 :                 closedCh:            make(chan struct{}),
     198            2 :         }
     199            2 :         d.mu.versions = &versionSet{}
     200            2 :         d.diskAvailBytes.Store(math.MaxUint64)
     201            2 :         d.problemSpans.Init(manifest.NumLevels, opts.Comparer.Compare)
     202            2 :         if opts.Experimental.CompactionScheduler != nil {
     203            2 :                 d.compactionScheduler = opts.Experimental.CompactionScheduler()
     204            2 :         } else {
     205            1 :                 d.compactionScheduler = newConcurrencyLimitScheduler(defaultTimeSource{})
     206            1 :         }
     207              : 
     208            2 :         defer func() {
     209            2 :                 // If an error or panic occurs during open, attempt to release the manually
     210            2 :                 // allocated memory resources. Note that rather than look for an error, we
     211            2 :                 // look for the return of a nil DB pointer.
     212            2 :                 if r := recover(); db == nil {
     213            1 :                         // If there's an unused, recycled memtable, we need to release its memory.
     214            1 :                         if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
     215            1 :                                 d.freeMemTable(obsoleteMemTable)
     216            1 :                         }
     217              : 
     218            1 :                         if d.fileCache != nil {
     219            1 :                                 _ = d.fileCache.Close()
     220            1 :                         }
     221            1 :                         d.cacheHandle.Close()
     222            1 : 
     223            1 :                         for _, mem := range d.mu.mem.queue {
     224            1 :                                 switch t := mem.flushable.(type) {
     225            1 :                                 case *memTable:
     226            1 :                                         manual.Free(manual.MemTable, t.arenaBuf)
     227            1 :                                         t.arenaBuf = manual.Buf{}
     228              :                                 }
     229              :                         }
     230            1 :                         if d.cleanupManager != nil {
     231            1 :                                 d.cleanupManager.Close()
     232            1 :                         }
     233            1 :                         if d.objProvider != nil {
     234            1 :                                 _ = d.objProvider.Close()
     235            1 :                         }
     236            1 :                         if d.mu.versions.manifestFile != nil {
     237            1 :                                 _ = d.mu.versions.manifestFile.Close()
     238            1 :                         }
     239            1 :                         if r != nil {
     240            1 :                                 panic(r)
     241              :                         }
     242              :                 }
     243              :         }()
     244              : 
     245            2 :         d.commit = newCommitPipeline(commitEnv{
     246            2 :                 logSeqNum:     &d.mu.versions.logSeqNum,
     247            2 :                 visibleSeqNum: &d.mu.versions.visibleSeqNum,
     248            2 :                 apply:         d.commitApply,
     249            2 :                 write:         d.commitWrite,
     250            2 :         })
     251            2 :         d.mu.nextJobID = 1
     252            2 :         d.mu.mem.nextSize = opts.MemTableSize
     253            2 :         if d.mu.mem.nextSize > initialMemTableSize {
     254            2 :                 d.mu.mem.nextSize = initialMemTableSize
     255            2 :         }
     256            2 :         d.mu.compact.cond.L = &d.mu.Mutex
     257            2 :         d.mu.compact.inProgress = make(map[compaction]struct{})
     258            2 :         d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
     259            2 :         d.mu.snapshots.init()
     260            2 :         // logSeqNum is the next sequence number that will be assigned.
     261            2 :         // Start assigning sequence numbers from base.SeqNumStart to leave
     262            2 :         // room for reserved sequence numbers (see comments around
     263            2 :         // SeqNumStart).
     264            2 :         d.mu.versions.logSeqNum.Store(base.SeqNumStart)
     265            2 :         d.mu.formatVers.vers.Store(uint64(formatVersion))
     266            2 :         d.mu.formatVers.marker = formatVersionMarker
     267            2 : 
     268            2 :         d.timeNow = time.Now
     269            2 :         d.openedAt = d.timeNow()
     270            2 : 
     271            2 :         d.mu.Lock()
     272            2 :         defer d.mu.Unlock()
     273            2 : 
     274            2 :         jobID := d.newJobIDLocked()
     275            2 : 
     276            2 :         providerSettings := opts.MakeObjStorageProviderSettings(dirname)
     277            2 :         providerSettings.FSDirInitialListing = ls
     278            2 :         d.objProvider, err = objstorageprovider.Open(providerSettings)
     279            2 :         if err != nil {
     280            1 :                 return nil, err
     281            1 :         }
     282              : 
     283            2 :         blobRewriteHeuristic := manifest.BlobRewriteHeuristic{
     284            2 :                 CurrentTime: d.timeNow,
     285            2 :                 MinimumAge:  opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
     286            2 :         }
     287            2 : 
     288            2 :         if !manifestExists {
     289            2 :                 // DB does not exist.
     290            2 :                 if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
     291            1 :                         return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
     292            1 :                 }
     293              : 
     294              :                 // Create the DB.
     295            2 :                 if err := d.mu.versions.create(
     296            2 :                         jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
     297            1 :                         return nil, err
     298            1 :                 }
     299            2 :         } else {
     300            2 :                 if opts.ErrorIfExists {
     301            1 :                         return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
     302            1 :                 }
     303              :                 // Load the version set.
     304            2 :                 if err := d.mu.versions.load(
     305            2 :                         dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
     306            1 :                         return nil, err
     307            1 :                 }
     308            2 :                 if opts.ErrorIfNotPristine {
     309            1 :                         liveFileNums := make(map[base.DiskFileNum]struct{})
     310            1 :                         d.mu.versions.addLiveFileNums(liveFileNums)
     311            1 :                         if len(liveFileNums) != 0 {
     312            1 :                                 return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
     313            1 :                         }
     314              :                 }
     315              :         }
     316              : 
     317              :         // In read-only mode, we replay directly into the mutable memtable but never
     318              :         // flush it. We need to delay creation of the memtable until we know the
     319              :         // sequence number of the first batch that will be inserted.
     320            2 :         if !d.opts.ReadOnly {
     321            2 :                 var entry *flushableEntry
     322            2 :                 d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     323            2 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     324            2 :         }
     325              : 
     326            2 :         d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     327            2 :                 Buckets: FsyncLatencyBuckets,
     328            2 :         })
     329            2 : 
     330            2 :         walOpts := wal.Options{
     331            2 :                 Primary:              wal.Dir{FS: opts.FS, Dirname: walDirname},
     332            2 :                 Secondary:            wal.Dir{},
     333            2 :                 MinUnflushedWALNum:   wal.NumWAL(d.mu.versions.minUnflushedLogNum),
     334            2 :                 MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
     335            2 :                 NoSyncOnClose:        opts.NoSyncOnClose,
     336            2 :                 BytesPerSync:         opts.WALBytesPerSync,
     337            2 :                 PreallocateSize:      d.walPreallocateSize,
     338            2 :                 MinSyncInterval:      opts.WALMinSyncInterval,
     339            2 :                 FsyncLatency:         d.mu.log.metrics.fsyncLatency,
     340            2 :                 QueueSemChan:         d.commit.logSyncQSem,
     341            2 :                 Logger:               opts.Logger,
     342            2 :                 EventListener:        walEventListenerAdaptor{l: opts.EventListener},
     343            2 :                 WriteWALSyncOffsets:  func() bool { return d.FormatMajorVersion() >= FormatWALSyncChunks },
     344              :         }
     345              :         // Ensure we release the WAL directory locks if we fail to open the
     346              :         // database. If we fail before initializing the WAL manager, this defer is
     347              :         // responsible for releasing the locks. If we fail after initializing the
     348              :         // WAL manager, closing the WAL manager will release the locks.
     349              :         //
     350              :         // TODO(jackson): Open's cleanup error handling logic is convoluted; can we
     351              :         // simplify it?
     352            2 :         defer maybeCleanUp(func() (err error) {
     353            1 :                 if d.mu.log.manager == nil {
     354            1 :                         if walOpts.Primary.Lock != nil {
     355            0 :                                 err = errors.CombineErrors(err, walOpts.Primary.Lock.Close())
     356            0 :                         }
     357            1 :                         if walOpts.Secondary.Lock != nil {
     358            0 :                                 err = errors.CombineErrors(err, walOpts.Secondary.Lock.Close())
     359            0 :                         }
     360            1 :                         return err
     361              :                 }
     362            1 :                 return nil
     363              :         })
     364              : 
     365              :         // Lock the dedicated WAL directory, if configured.
     366            2 :         if walDirname != dirname {
     367            2 :                 walOpts.Primary.Lock, err = base.AcquireOrValidateDirectoryLock(opts.WALDirLock, walDirname, opts.FS)
     368            2 :                 if err != nil {
     369            1 :                         return nil, err
     370            1 :                 }
     371              :         }
     372            2 :         if opts.WALFailover != nil {
     373            2 :                 walOpts.Secondary = opts.WALFailover.Secondary
     374            2 :                 // Lock the secondary WAL directory, if distinct from the data directory
     375            2 :                 // and primary WAL directory.
     376            2 :                 if secondaryWalDirName != dirname && secondaryWalDirName != walDirname {
     377            2 :                         walOpts.Secondary.Lock, err = base.AcquireOrValidateDirectoryLock(
     378            2 :                                 opts.WALFailover.Secondary.Lock, secondaryWalDirName, opts.WALFailover.Secondary.FS)
     379            2 :                         if err != nil {
     380            1 :                                 return nil, err
     381            1 :                         }
     382              :                 }
     383            2 :                 walOpts.Secondary.Dirname = secondaryWalDirName
     384            2 :                 walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
     385            2 :                 walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     386            2 :                         Buckets: FsyncLatencyBuckets,
     387            2 :                 })
     388              :         }
     389            2 :         walDirs := walOpts.Dirs()
     390            2 :         walRecoveryLocks := make([]*base.DirLock, len(opts.WALRecoveryDirs))
     391            2 :         defer func() {
     392            2 :                 // We only need the recovery WALs during Open, so we can release
     393            2 :                 // the locks after the WALs have been scanned.
     394            2 :                 for _, l := range walRecoveryLocks {
     395            1 :                         if l != nil {
     396            1 :                                 _ = l.Close()
     397            1 :                         }
     398              :                 }
     399              :         }()
     400            2 :         for i, dir := range opts.WALRecoveryDirs {
     401            1 :                 dir.Dirname = resolveStorePath(dirname, dir.Dirname)
     402            1 :                 if dir.Dirname != dirname {
     403            1 :                         // Acquire a lock on the WAL recovery directory.
     404            1 :                         walRecoveryLocks[i], err = base.AcquireOrValidateDirectoryLock(dir.Lock, dir.Dirname, dir.FS)
     405            1 :                         if err != nil {
     406            0 :                                 return nil, errors.Wrapf(err, "error acquiring lock on WAL recovery directory %q", dir.Dirname)
     407            0 :                         }
     408              :                 }
     409            1 :                 walDirs = append(walDirs, dir)
     410              :         }
     411            2 :         wals, err := wal.Scan(walDirs...)
     412            2 :         if err != nil {
     413            1 :                 return nil, err
     414            1 :         }
     415              : 
     416              :         // Remove obsolete WAL files now (as opposed to relying on asynchronous cleanup)
     417              :         // to prevent crash loops due to no disk space (ENOSPC).
     418            2 :         var retainedWALs wal.Logs
     419            2 :         for _, w := range wals {
     420            2 :                 if base.DiskFileNum(w.Num) < d.mu.versions.minUnflushedLogNum {
     421            2 :                         // Log obsolete WALs that will be removed.
     422            2 :                         for i := range w.NumSegments() {
     423            2 :                                 fs, path := w.SegmentLocation(i)
     424            2 :                                 if err := fs.Remove(path); err != nil {
     425            1 :                                         // It's not a big deal if we can't delete the file now.
     426            1 :                                         // We'll try to remove it later in the cleanup process.
     427            1 :                                         d.opts.EventListener.WALDeleted(WALDeleteInfo{
     428            1 :                                                 JobID:   0,
     429            1 :                                                 Path:    path,
     430            1 :                                                 FileNum: base.DiskFileNum(w.Num),
     431            1 :                                                 Err:     err,
     432            1 :                                         })
     433            1 :                                         retainedWALs = append(retainedWALs, w)
     434            2 :                                 } else {
     435            2 :                                         d.opts.EventListener.WALDeleted(WALDeleteInfo{
     436            2 :                                                 JobID:   0,
     437            2 :                                                 Path:    path,
     438            2 :                                                 FileNum: base.DiskFileNum(w.Num),
     439            2 :                                                 Err:     nil,
     440            2 :                                         })
     441            2 :                                 }
     442              :                         }
     443            2 :                 } else {
     444            2 :                         retainedWALs = append(retainedWALs, w)
     445            2 :                 }
     446              :         }
     447              : 
     448            2 :         walManager, err := wal.Init(walOpts, retainedWALs)
     449            2 :         if err != nil {
     450            1 :                 return nil, err
     451            1 :         }
     452            2 :         defer maybeCleanUp(walManager.Close)
     453            2 :         d.mu.log.manager = walManager
     454            2 : 
     455            2 :         d.cleanupManager = openCleanupManager(opts, d.objProvider, d.getDeletionPacerInfo)
     456            2 : 
     457            2 :         if manifestExists && !opts.DisableConsistencyCheck {
     458            2 :                 curVersion := d.mu.versions.currentVersion()
     459            2 :                 if err := checkConsistency(curVersion, d.objProvider); err != nil {
     460            1 :                         return nil, err
     461            1 :                 }
     462              :         }
     463              : 
     464            2 :         fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
     465            2 :         if opts.FileCache == nil {
     466            2 :                 opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
     467            2 :                 defer opts.FileCache.Unref()
     468            2 :         }
     469            2 :         d.fileCache = opts.FileCache.newHandle(d.cacheHandle, d.objProvider, d.opts.LoggerAndTracer, d.opts.MakeReaderOptions(), d.reportCorruption)
     470            2 :         d.newIters = d.fileCache.newIters
     471            2 :         d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
     472            2 : 
     473            2 :         d.fileSizeAnnotator = d.makeFileSizeAnnotator()
     474            2 : 
     475            2 :         var previousOptionsFileNum base.DiskFileNum
     476            2 :         var previousOptionsFilename string
     477            2 :         for _, filename := range ls {
     478            2 :                 ft, fn, ok := base.ParseFilename(opts.FS, filename)
     479            2 :                 if !ok {
     480            2 :                         continue
     481              :                 }
     482              : 
     483              :                 // Don't reuse any obsolete file numbers to avoid modifying an
     484              :                 // ingested sstable's original external file.
     485            2 :                 d.mu.versions.markFileNumUsed(fn)
     486            2 : 
     487            2 :                 switch ft {
     488            0 :                 case base.FileTypeLog:
     489              :                         // Ignore.
     490            2 :                 case base.FileTypeOptions:
     491            2 :                         if previousOptionsFileNum < fn {
     492            2 :                                 previousOptionsFileNum = fn
     493            2 :                                 previousOptionsFilename = filename
     494            2 :                         }
     495            1 :                 case base.FileTypeTemp, base.FileTypeOldTemp:
     496            1 :                         if !d.opts.ReadOnly {
     497            1 :                                 // Some codepaths write to a temporary file and then
     498            1 :                                 // rename it to its final location when complete.  A
     499            1 :                                 // temp file is leftover if a process exits before the
     500            1 :                                 // rename.  Remove it.
     501            1 :                                 err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
     502            1 :                                 if err != nil {
     503            0 :                                         return nil, err
     504            0 :                                 }
     505              :                         }
     506              :                 }
     507              :         }
     508            2 :         if n := len(wals); n > 0 {
     509            2 :                 // Don't reuse any obsolete file numbers to avoid modifying an
     510            2 :                 // ingested sstable's original external file.
     511            2 :                 d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
     512            2 :         }
     513              : 
     514              :         // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
     515              :         // objProvider. This avoids FileNum collisions with obsolete sstables.
     516            2 :         objects := d.objProvider.List()
     517            2 :         for _, obj := range objects {
     518            2 :                 d.mu.versions.markFileNumUsed(obj.DiskFileNum)
     519            2 :         }
     520              : 
     521              :         // Validate the most-recent OPTIONS file, if there is one.
     522            2 :         if previousOptionsFilename != "" {
     523            2 :                 path := opts.FS.PathJoin(dirname, previousOptionsFilename)
     524            2 :                 previousOptions, err := readOptionsFile(opts, path)
     525            2 :                 if err != nil {
     526            0 :                         return nil, err
     527            0 :                 }
     528            2 :                 if err := opts.CheckCompatibility(dirname, previousOptions); err != nil {
     529            1 :                         return nil, err
     530            1 :                 }
     531              :         }
     532              : 
     533              :         // Replay any newer log files than the ones named in the manifest.
     534            2 :         var replayWALs wal.Logs
     535            2 :         for i, w := range wals {
     536            2 :                 if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
     537            2 :                         replayWALs = wals[i:]
     538            2 :                         break
     539              :                 }
     540              :         }
     541            2 :         var flushableIngests []*ingestedFlushable
     542            2 :         for i, lf := range replayWALs {
     543            2 :                 // WALs other than the last one would have been closed cleanly.
     544            2 :                 //
     545            2 :                 // Note: we used to never require strict WAL tails when reading from older
     546            2 :                 // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
     547            2 :                 // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
     548            2 :                 // compatible Pebble format is newer and guarantees a clean EOF.
     549            2 :                 strictWALTail := i < len(replayWALs)-1
     550            2 :                 fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
     551            2 :                 if err != nil {
     552            1 :                         return nil, err
     553            1 :                 }
     554            2 :                 if len(fi) > 0 {
     555            2 :                         flushableIngests = append(flushableIngests, fi...)
     556            2 :                 }
     557            2 :                 if d.mu.versions.logSeqNum.Load() < maxSeqNum {
     558            2 :                         d.mu.versions.logSeqNum.Store(maxSeqNum)
     559            2 :                 }
     560              :         }
     561            2 :         if d.mu.mem.mutable == nil {
     562            2 :                 // Recreate the mutable memtable if replayWAL got rid of it.
     563            2 :                 var entry *flushableEntry
     564            2 :                 d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     565            2 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     566            2 :         }
     567            2 :         d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
     568            2 : 
     569            2 :         // Register with the CompactionScheduler before calling
     570            2 :         // d.maybeScheduleFlush, since completion of the flush can trigger
     571            2 :         // compactions.
     572            2 :         d.compactionScheduler.Register(2, d)
     573            2 :         if !d.opts.ReadOnly {
     574            2 :                 d.maybeScheduleFlush()
     575            2 :                 for d.mu.compact.flushing {
     576            2 :                         d.mu.compact.cond.Wait()
     577            2 :                 }
     578              : 
     579              :                 // Create an empty .log file for the mutable memtable.
     580            2 :                 newLogNum := d.mu.versions.getNextDiskFileNum()
     581            2 :                 d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
     582            2 :                 if err != nil {
     583            1 :                         return nil, err
     584            1 :                 }
     585              : 
     586              :                 // This isn't strictly necessary as we don't use the log number for
     587              :                 // memtables being flushed, only for the next unflushed memtable.
     588            2 :                 d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
     589              :         }
     590            2 :         d.updateReadStateLocked(d.opts.DebugCheck)
     591            2 : 
     592            2 :         if !d.opts.ReadOnly {
     593            2 :                 // If the Options specify a format major version higher than the
     594            2 :                 // loaded database's, upgrade it. If this is a new database, this
     595            2 :                 // code path also performs an initial upgrade from the starting
     596            2 :                 // implicit MinSupported version.
     597            2 :                 //
     598            2 :                 // We ratchet the version this far into Open so that migrations have a read
     599            2 :                 // state available. Note that this also results in creating/updating the
     600            2 :                 // format version marker file.
     601            2 :                 if opts.FormatMajorVersion > d.FormatMajorVersion() {
     602            2 :                         if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
     603            0 :                                 return nil, err
     604            0 :                         }
     605            2 :                 } else if noFormatVersionMarker {
     606            2 :                         // We are creating a new store. Create the format version marker file.
     607            2 :                         if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
     608            1 :                                 return nil, err
     609            1 :                         }
     610              :                 }
     611              : 
     612              :                 // Write the current options to disk.
     613            2 :                 d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
     614            2 :                 tmpPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeTemp, d.optionsFileNum)
     615            2 :                 optionsPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeOptions, d.optionsFileNum)
     616            2 : 
     617            2 :                 // Write them to a temporary file first, in case we crash before
     618            2 :                 // we're done. A corrupt options file prevents opening the
     619            2 :                 // database.
     620            2 :                 optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
     621            2 :                 if err != nil {
     622            1 :                         return nil, err
     623            1 :                 }
     624            2 :                 serializedOpts := []byte(opts.String())
     625            2 :                 if _, err := optionsFile.Write(serializedOpts); err != nil {
     626            1 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     627            1 :                 }
     628            2 :                 d.optionsFileSize = uint64(len(serializedOpts))
     629            2 :                 if err := optionsFile.Sync(); err != nil {
     630            1 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     631            1 :                 }
     632            2 :                 if err := optionsFile.Close(); err != nil {
     633            0 :                         return nil, err
     634            0 :                 }
     635              :                 // Atomically rename to the OPTIONS-XXXXXX path. This rename is
     636              :                 // guaranteed to be atomic because the destination path does not
     637              :                 // exist.
     638            2 :                 if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
     639            1 :                         return nil, err
     640            1 :                 }
     641            2 :                 if err := d.dataDir.Sync(); err != nil {
     642            1 :                         return nil, err
     643            1 :                 }
     644              :         }
     645              : 
     646            2 :         if !d.opts.ReadOnly {
     647            2 :                 // Get a fresh list of files, in case some of the earlier flushes/compactions
     648            2 :                 // have deleted some files.
     649            2 :                 ls, err := opts.FS.List(dirname)
     650            2 :                 if err != nil {
     651            1 :                         return nil, err
     652            1 :                 }
     653            2 :                 d.scanObsoleteFiles(ls, flushableIngests)
     654            2 :                 d.deleteObsoleteFiles(jobID)
     655              :         }
     656              :         // Else, nothing is obsolete.
     657              : 
     658            2 :         d.mu.tableStats.cond.L = &d.mu.Mutex
     659            2 :         d.mu.tableValidation.cond.L = &d.mu.Mutex
     660            2 :         if !d.opts.ReadOnly {
     661            2 :                 d.maybeCollectTableStatsLocked()
     662            2 :         }
     663            2 :         d.calculateDiskAvailableBytes()
     664            2 : 
     665            2 :         d.maybeScheduleFlush()
     666            2 :         d.maybeScheduleCompaction()
     667            2 : 
     668            2 :         // Note: this is a no-op if invariants are disabled or race is enabled.
     669            2 :         //
     670            2 :         // Setting a finalizer on *DB causes *DB to never be reclaimed and the
     671            2 :         // finalizer to never be run. The problem is due to this limitation of
     672            2 :         // finalizers mention in the SetFinalizer docs:
     673            2 :         //
     674            2 :         //   If a cyclic structure includes a block with a finalizer, that cycle is
     675            2 :         //   not guaranteed to be garbage collected and the finalizer is not
     676            2 :         //   guaranteed to run, because there is no ordering that respects the
     677            2 :         //   dependencies.
     678            2 :         //
     679            2 :         // DB has cycles with several of its internal structures: readState,
     680            2 :         // newIters, fileCache, versions, etc. Each of this individually cause a
     681            2 :         // cycle and prevent the finalizer from being run. But we can workaround this
     682            2 :         // finializer limitation by setting a finalizer on another object that is
     683            2 :         // tied to the lifetime of DB: the DB.closed atomic.Value.
     684            2 :         dPtr := fmt.Sprintf("%p", d)
     685            2 :         invariants.SetFinalizer(d.closed, func(obj interface{}) {
     686            0 :                 v := obj.(*atomic.Value)
     687            0 :                 if err := v.Load(); err == nil {
     688            0 :                         fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
     689            0 :                         os.Exit(1)
     690            0 :                 }
     691              :         })
     692              : 
     693            2 :         return d, nil
     694              : }
     695              : 
     696              : // prepareAndOpenDirs opens the directories for the store (and creates them if
     697              : // necessary).
     698              : //
     699              : // Returns an error if ReadOnly is set and the directories don't exist.
     700              : func prepareAndOpenDirs(
     701              :         dirname string, opts *Options,
     702            2 : ) (walDirname string, secondaryWalDirName string, dataDir vfs.File, err error) {
     703            2 :         walDirname = dirname
     704            2 :         if opts.WALDir != "" {
     705            2 :                 walDirname = resolveStorePath(dirname, opts.WALDir)
     706            2 :         }
     707            2 :         if opts.WALFailover != nil {
     708            2 :                 secondaryWalDirName = resolveStorePath(dirname, opts.WALFailover.Secondary.Dirname)
     709            2 :         }
     710              : 
     711              :         // Create directories if needed.
     712            2 :         if !opts.ReadOnly {
     713            2 :                 f, err := mkdirAllAndSyncParents(opts.FS, dirname)
     714            2 :                 if err != nil {
     715            1 :                         return "", "", nil, err
     716            1 :                 }
     717            2 :                 f.Close()
     718            2 :                 if walDirname != dirname {
     719            2 :                         f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
     720            2 :                         if err != nil {
     721            0 :                                 return "", "", nil, err
     722            0 :                         }
     723            2 :                         f.Close()
     724              :                 }
     725            2 :                 if opts.WALFailover != nil {
     726            2 :                         f, err := mkdirAllAndSyncParents(opts.WALFailover.Secondary.FS, secondaryWalDirName)
     727            2 :                         if err != nil {
     728            0 :                                 return "", "", nil, err
     729            0 :                         }
     730            2 :                         f.Close()
     731              :                 }
     732              :         }
     733              : 
     734            2 :         dataDir, err = opts.FS.OpenDir(dirname)
     735            2 :         if err != nil {
     736            1 :                 if opts.ReadOnly && oserror.IsNotExist(err) {
     737            1 :                         return "", "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
     738            1 :                 }
     739            1 :                 return "", "", nil, err
     740              :         }
     741            2 :         if opts.ReadOnly && walDirname != dirname {
     742            1 :                 // Check that the wal dir exists.
     743            1 :                 walDir, err := opts.FS.OpenDir(walDirname)
     744            1 :                 if err != nil {
     745            1 :                         dataDir.Close()
     746            1 :                         return "", "", nil, err
     747            1 :                 }
     748            1 :                 walDir.Close()
     749              :         }
     750              : 
     751            2 :         return walDirname, secondaryWalDirName, dataDir, nil
     752              : }
     753              : 
     754              : // GetVersion returns the engine version string from the latest options
     755              : // file present in dir. Used to check what Pebble or RocksDB version was last
     756              : // used to write to the database stored in this directory. An empty string is
     757              : // returned if no valid OPTIONS file with a version key was found.
     758            1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
     759            1 :         ls, err := fs.List(dir)
     760            1 :         if err != nil {
     761            0 :                 return "", err
     762            0 :         }
     763            1 :         var version string
     764            1 :         lastOptionsSeen := base.DiskFileNum(0)
     765            1 :         for _, filename := range ls {
     766            1 :                 ft, fn, ok := base.ParseFilename(fs, filename)
     767            1 :                 if !ok {
     768            1 :                         continue
     769              :                 }
     770            1 :                 switch ft {
     771            1 :                 case base.FileTypeOptions:
     772            1 :                         // If this file has a higher number than the last options file
     773            1 :                         // processed, reset version. This is because rocksdb often
     774            1 :                         // writes multiple options files without deleting previous ones.
     775            1 :                         // Otherwise, skip parsing this options file.
     776            1 :                         if fn > lastOptionsSeen {
     777            1 :                                 version = ""
     778            1 :                                 lastOptionsSeen = fn
     779            1 :                         } else {
     780            1 :                                 continue
     781              :                         }
     782            1 :                         f, err := fs.Open(fs.PathJoin(dir, filename))
     783            1 :                         if err != nil {
     784            0 :                                 return "", err
     785            0 :                         }
     786            1 :                         data, err := io.ReadAll(f)
     787            1 :                         f.Close()
     788            1 : 
     789            1 :                         if err != nil {
     790            0 :                                 return "", err
     791            0 :                         }
     792            1 :                         err = parseOptions(string(data), parseOptionsFuncs{
     793            1 :                                 visitKeyValue: func(i, j int, section, key, value string) error {
     794            1 :                                         switch {
     795            1 :                                         case section == "Version":
     796            1 :                                                 switch key {
     797            1 :                                                 case "pebble_version":
     798            1 :                                                         version = value
     799            1 :                                                 case "rocksdb_version":
     800            1 :                                                         version = fmt.Sprintf("rocksdb v%s", value)
     801              :                                                 }
     802              :                                         }
     803            1 :                                         return nil
     804              :                                 },
     805              :                         })
     806            1 :                         if err != nil {
     807            0 :                                 return "", err
     808            0 :                         }
     809              :                 }
     810              :         }
     811            1 :         return version, nil
     812              : }
     813              : 
     814              : func (d *DB) replayIngestedFlushable(
     815              :         b *Batch, logNum base.DiskFileNum,
     816            2 : ) (entry *flushableEntry, err error) {
     817            2 :         br := b.Reader()
     818            2 :         seqNum := b.SeqNum()
     819            2 : 
     820            2 :         fileNums := make([]base.DiskFileNum, 0, b.Count())
     821            2 :         var exciseSpan KeyRange
     822            2 :         addFileNum := func(encodedFileNum []byte) {
     823            2 :                 fileNum, n := binary.Uvarint(encodedFileNum)
     824            2 :                 if n <= 0 {
     825            0 :                         panic("pebble: ingest sstable file num is invalid")
     826              :                 }
     827            2 :                 fileNums = append(fileNums, base.DiskFileNum(fileNum))
     828              :         }
     829              : 
     830            2 :         for i := 0; i < int(b.Count()); i++ {
     831            2 :                 kind, key, val, ok, err := br.Next()
     832            2 :                 if err != nil {
     833            0 :                         return nil, err
     834            0 :                 }
     835            2 :                 if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
     836            0 :                         panic("pebble: invalid batch key kind")
     837              :                 }
     838            2 :                 if !ok {
     839            0 :                         panic("pebble: invalid batch count")
     840              :                 }
     841            2 :                 if kind == base.InternalKeyKindExcise {
     842            1 :                         if exciseSpan.Valid() {
     843            0 :                                 panic("pebble: multiple excise spans in a single batch")
     844              :                         }
     845            1 :                         exciseSpan.Start = slices.Clone(key)
     846            1 :                         exciseSpan.End = slices.Clone(val)
     847            1 :                         continue
     848              :                 }
     849            2 :                 addFileNum(key)
     850              :         }
     851              : 
     852            2 :         if _, _, _, ok, err := br.Next(); err != nil {
     853            0 :                 return nil, err
     854            2 :         } else if ok {
     855            0 :                 panic("pebble: invalid number of entries in batch")
     856              :         }
     857              : 
     858            2 :         meta := make([]*manifest.TableMetadata, len(fileNums))
     859            2 :         var lastRangeKey keyspan.Span
     860            2 :         for i, n := range fileNums {
     861            2 :                 readable, err := d.objProvider.OpenForReading(context.TODO(), base.FileTypeTable, n,
     862            2 :                         objstorage.OpenOptions{MustExist: true})
     863            2 :                 if err != nil {
     864            0 :                         return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
     865            0 :                 }
     866              :                 // NB: ingestLoad1 will close readable.
     867            2 :                 meta[i], lastRangeKey, _, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
     868            2 :                         readable, d.cacheHandle, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
     869            2 :                 if err != nil {
     870            0 :                         return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
     871            0 :                 }
     872              :         }
     873            2 :         if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
     874            0 :                 return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
     875            0 :                         d.opts.Comparer.FormatKey(lastRangeKey.End))
     876            0 :         }
     877              : 
     878            2 :         numFiles := len(meta)
     879            2 :         if exciseSpan.Valid() {
     880            1 :                 numFiles++
     881            1 :         }
     882            2 :         if uint32(numFiles) != b.Count() {
     883            0 :                 panic("pebble: couldn't load all files in WAL entry")
     884              :         }
     885              : 
     886            2 :         return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
     887              : }
     888              : 
     889              : // replayWAL replays the edits in the specified WAL. If the DB is in read
     890              : // only mode, then the WALs are replayed into memtables and not flushed. If
     891              : // the DB is not in read only mode, then the contents of the WAL are
     892              : // guaranteed to be flushed when a flush is scheduled after this method is run.
     893              : // Note that this flushing is very important for guaranteeing durability:
     894              : // the application may have had a number of pending
     895              : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
     896              : // happened but the corresponding data may now be readable from the WAL (while
     897              : // sitting in write-back caches in the kernel or the storage device). By
     898              : // reading the WAL (including the non-fsynced data) and then flushing all
     899              : // these changes (flush does fsyncs), we are able to guarantee that the
     900              : // initial state of the DB is durable.
     901              : //
     902              : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
     903              : // WALs into the flushable queue. Flushing of the queue is expected to be handled
     904              : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
     905              : //
     906              : // d.mu must be held when calling this, but the mutex may be dropped and
     907              : // re-acquired during the course of this method.
     908              : func (d *DB) replayWAL(
     909              :         jobID JobID, ll wal.LogicalLog, strictWALTail bool,
     910            2 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
     911            2 :         rr := ll.OpenForRead()
     912            2 :         defer func() { _ = rr.Close() }()
     913            2 :         var (
     914            2 :                 b               Batch
     915            2 :                 buf             bytes.Buffer
     916            2 :                 mem             *memTable
     917            2 :                 entry           *flushableEntry
     918            2 :                 offset          wal.Offset
     919            2 :                 lastFlushOffset int64
     920            2 :                 keysReplayed    int64 // number of keys replayed
     921            2 :                 batchesReplayed int64 // number of batches replayed
     922            2 :         )
     923            2 : 
     924            2 :         // TODO(jackson): This function is interspersed with panics, in addition to
     925            2 :         // corruption error propagation. Audit them to ensure we're truly only
     926            2 :         // panicking where the error points to Pebble bug and not user or
     927            2 :         // hardware-induced corruption.
     928            2 : 
     929            2 :         // "Flushes" (ie. closes off) the current memtable, if not nil.
     930            2 :         flushMem := func() {
     931            2 :                 if mem == nil {
     932            2 :                         return
     933            2 :                 }
     934            2 :                 mem.writerUnref()
     935            2 :                 if d.mu.mem.mutable == mem {
     936            2 :                         d.mu.mem.mutable = nil
     937            2 :                 }
     938            2 :                 entry.flushForced = !d.opts.ReadOnly
     939            2 :                 var logSize uint64
     940            2 :                 mergedOffset := offset.Physical + offset.PreviousFilesBytes
     941            2 :                 if mergedOffset >= lastFlushOffset {
     942            2 :                         logSize = uint64(mergedOffset - lastFlushOffset)
     943            2 :                 }
     944              :                 // Else, this was the initial memtable in the read-only case which must have
     945              :                 // been empty, but we need to flush it since we don't want to add to it later.
     946            2 :                 lastFlushOffset = mergedOffset
     947            2 :                 entry.logSize = logSize
     948            2 :                 mem, entry = nil, nil
     949              :         }
     950              : 
     951            2 :         mem = d.mu.mem.mutable
     952            2 :         if mem != nil {
     953            2 :                 entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
     954            2 :                 if !d.opts.ReadOnly {
     955            2 :                         flushMem()
     956            2 :                 }
     957              :         }
     958              : 
     959              :         // Creates a new memtable if there is no current memtable.
     960            2 :         ensureMem := func(seqNum base.SeqNum) {
     961            2 :                 if mem != nil {
     962            2 :                         return
     963            2 :                 }
     964            2 :                 mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
     965            2 :                 d.mu.mem.mutable = mem
     966            2 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     967              :         }
     968              : 
     969            2 :         defer func() {
     970            2 :                 if err != nil {
     971            1 :                         err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
     972            1 :                 }
     973              :         }()
     974              : 
     975            2 :         for {
     976            2 :                 var r io.Reader
     977            2 :                 var err error
     978            2 :                 r, offset, err = rr.NextRecord()
     979            2 :                 if err == nil {
     980            2 :                         _, err = io.Copy(&buf, r)
     981            2 :                 }
     982            2 :                 if err != nil {
     983            2 :                         // It is common to encounter a zeroed or invalid chunk due to WAL
     984            2 :                         // preallocation and WAL recycling. However zeroed or invalid chunks
     985            2 :                         // can also be a consequence of corruption / disk rot. When the log
     986            2 :                         // reader encounters one of these cases, it attempts to disambiguate
     987            2 :                         // by reading ahead looking for a future record. If a future chunk
     988            2 :                         // indicates the chunk at the original offset should've been valid, it
     989            2 :                         // surfaces record.ErrInvalidChunk or record.ErrZeroedChunk. These
     990            2 :                         // errors are always indicative of corruption and data loss.
     991            2 :                         //
     992            2 :                         // Otherwise, the reader surfaces record.ErrUnexpectedEOF indicating
     993            2 :                         // that the WAL terminated uncleanly and ambiguously. If the WAL is
     994            2 :                         // the most recent logical WAL, the caller passes in
     995            2 :                         // (strictWALTail=false), indicating we should tolerate the unclean
     996            2 :                         // ending. If the WAL is an older WAL, the caller passes in
     997            2 :                         // (strictWALTail=true), indicating that the WAL should have been
     998            2 :                         // closed cleanly, and we should interpret the
     999            2 :                         // `record.ErrUnexpectedEOF` as corruption and stop recovery.
    1000            2 :                         if errors.Is(err, io.EOF) {
    1001            2 :                                 break
    1002            1 :                         } else if errors.Is(err, record.ErrUnexpectedEOF) && !strictWALTail {
    1003            1 :                                 break
    1004            1 :                         } else if (errors.Is(err, record.ErrUnexpectedEOF) && strictWALTail) ||
    1005            1 :                                 errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) {
    1006            1 :                                 // If a read-ahead returns record.ErrInvalidChunk or
    1007            1 :                                 // record.ErrZeroedChunk, then there's definitively corruption.
    1008            1 :                                 //
    1009            1 :                                 // If strictWALTail=true, then record.ErrUnexpectedEOF should
    1010            1 :                                 // also be considered corruption because the strictWALTail
    1011            1 :                                 // indicates we expect a clean end to the WAL.
    1012            1 :                                 //
    1013            1 :                                 // Other I/O related errors should not be marked with corruption
    1014            1 :                                 // and simply returned.
    1015            1 :                                 err = errors.Mark(err, ErrCorruption)
    1016            1 :                         }
    1017              : 
    1018            1 :                         return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
    1019              :                 }
    1020              : 
    1021            2 :                 if buf.Len() < batchrepr.HeaderLen {
    1022            0 :                         return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
    1023            0 :                                 errors.Safe(base.DiskFileNum(ll.Num)), offset)
    1024            0 :                 }
    1025              : 
    1026            2 :                 if d.opts.ErrorIfNotPristine {
    1027            1 :                         return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
    1028            1 :                 }
    1029              : 
    1030              :                 // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
    1031              :                 // which is used below.
    1032            2 :                 b = Batch{}
    1033            2 :                 b.db = d
    1034            2 :                 if err := b.SetRepr(buf.Bytes()); err != nil {
    1035            0 :                         return nil, 0, err
    1036            0 :                 }
    1037            2 :                 seqNum := b.SeqNum()
    1038            2 :                 maxSeqNum = seqNum + base.SeqNum(b.Count())
    1039            2 :                 keysReplayed += int64(b.Count())
    1040            2 :                 batchesReplayed++
    1041            2 :                 {
    1042            2 :                         br := b.Reader()
    1043            2 :                         if kind, _, _, ok, err := br.Next(); err != nil {
    1044            0 :                                 return nil, 0, err
    1045            2 :                         } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
    1046            2 :                                 // We're in the flushable ingests (+ possibly excises) case.
    1047            2 :                                 //
    1048            2 :                                 // Ingests require an up-to-date view of the LSM to determine the target
    1049            2 :                                 // level of ingested sstables, and to accurately compute excises. Instead of
    1050            2 :                                 // doing an ingest in this function, we just enqueue a flushable ingest
    1051            2 :                                 // in the flushables queue and run a regular flush.
    1052            2 :                                 flushMem()
    1053            2 :                                 // mem is nil here.
    1054            2 :                                 entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
    1055            2 :                                 if err != nil {
    1056            0 :                                         return nil, 0, err
    1057            0 :                                 }
    1058            2 :                                 fi := entry.flushable.(*ingestedFlushable)
    1059            2 :                                 flushableIngests = append(flushableIngests, fi)
    1060            2 :                                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1061            2 :                                 // A flushable ingest is always followed by a WAL rotation.
    1062            2 :                                 break
    1063              :                         }
    1064              :                 }
    1065              : 
    1066            2 :                 if b.memTableSize >= uint64(d.largeBatchThreshold) {
    1067            2 :                         flushMem()
    1068            2 :                         // Make a copy of the data slice since it is currently owned by buf and will
    1069            2 :                         // be reused in the next iteration.
    1070            2 :                         b.data = slices.Clone(b.data)
    1071            2 :                         b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
    1072            2 :                         if err != nil {
    1073            0 :                                 return nil, 0, err
    1074            0 :                         }
    1075            2 :                         entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
    1076            2 :                         // Disable memory accounting by adding a reader ref that will never be
    1077            2 :                         // removed.
    1078            2 :                         entry.readerRefs.Add(1)
    1079            2 :                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1080            2 :                 } else {
    1081            2 :                         ensureMem(seqNum)
    1082            2 :                         if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
    1083            0 :                                 return nil, 0, err
    1084            0 :                         }
    1085              :                         // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
    1086              :                         // batch may not initially fit, but will eventually fit (since it is smaller than
    1087              :                         // largeBatchThreshold).
    1088            2 :                         for err == arenaskl.ErrArenaFull {
    1089            2 :                                 flushMem()
    1090            2 :                                 ensureMem(seqNum)
    1091            2 :                                 err = mem.prepare(&b)
    1092            2 :                                 if err != nil && err != arenaskl.ErrArenaFull {
    1093            0 :                                         return nil, 0, err
    1094            0 :                                 }
    1095              :                         }
    1096            2 :                         if err = mem.apply(&b, seqNum); err != nil {
    1097            0 :                                 return nil, 0, err
    1098            0 :                         }
    1099            2 :                         mem.writerUnref()
    1100              :                 }
    1101            2 :                 buf.Reset()
    1102              :         }
    1103              : 
    1104            2 :         d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
    1105            2 :                 jobID, ll.String(), offset, keysReplayed, batchesReplayed)
    1106            2 :         if !d.opts.ReadOnly {
    1107            2 :                 flushMem()
    1108            2 :         }
    1109              : 
    1110              :         // mem is nil here, if !ReadOnly.
    1111            2 :         return flushableIngests, maxSeqNum, err
    1112              : }
    1113              : 
    1114            2 : func readOptionsFile(opts *Options, path string) (string, error) {
    1115            2 :         f, err := opts.FS.Open(path)
    1116            2 :         if err != nil {
    1117            0 :                 return "", err
    1118            0 :         }
    1119            2 :         defer f.Close()
    1120            2 : 
    1121            2 :         data, err := io.ReadAll(f)
    1122            2 :         if err != nil {
    1123            0 :                 return "", err
    1124            0 :         }
    1125            2 :         return string(data), nil
    1126              : }
    1127              : 
    1128              : // DBDesc briefly describes high-level state about a database.
    1129              : type DBDesc struct {
    1130              :         // Exists is true if an existing database was found.
    1131              :         Exists bool
    1132              :         // FormatMajorVersion indicates the database's current format
    1133              :         // version.
    1134              :         FormatMajorVersion FormatMajorVersion
    1135              :         // ManifestFilename is the filename of the current active manifest,
    1136              :         // if the database exists.
    1137              :         ManifestFilename string
    1138              :         // OptionsFilename is the filename of the most recent OPTIONS file, if it
    1139              :         // exists.
    1140              :         OptionsFilename string
    1141              : }
    1142              : 
    1143              : // String implements fmt.Stringer.
    1144            1 : func (d *DBDesc) String() string {
    1145            1 :         if !d.Exists {
    1146            1 :                 return "uninitialized"
    1147            1 :         }
    1148            1 :         var buf bytes.Buffer
    1149            1 :         fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
    1150            1 :         fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
    1151            1 :         fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
    1152            1 :         return buf.String()
    1153              : }
    1154              : 
    1155              : // Peek looks for an existing database in dirname on the provided FS. It
    1156              : // returns a brief description of the database. Peek is read-only and
    1157              : // does not open the database
    1158            1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
    1159            1 :         ls, err := fs.List(dirname)
    1160            1 :         if err != nil {
    1161            1 :                 return nil, err
    1162            1 :         }
    1163              : 
    1164            1 :         vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
    1165            1 :         if err != nil {
    1166            0 :                 return nil, err
    1167            0 :         }
    1168              :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1169              :         // PeekMarker variant that avoids opening the directory.
    1170            1 :         if err := versMarker.Close(); err != nil {
    1171            0 :                 return nil, err
    1172            0 :         }
    1173              : 
    1174              :         // Find the currently active manifest, if there is one.
    1175            1 :         manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
    1176            1 :         if err != nil {
    1177            0 :                 return nil, err
    1178            0 :         }
    1179              :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1180              :         // PeekMarker variant that avoids opening the directory.
    1181            1 :         if err := manifestMarker.Close(); err != nil {
    1182            0 :                 return nil, err
    1183            0 :         }
    1184              : 
    1185            1 :         desc := &DBDesc{
    1186            1 :                 Exists:             exists,
    1187            1 :                 FormatMajorVersion: vers,
    1188            1 :         }
    1189            1 : 
    1190            1 :         // Find the OPTIONS file with the highest file number within the list of
    1191            1 :         // directory entries.
    1192            1 :         var previousOptionsFileNum base.DiskFileNum
    1193            1 :         for _, filename := range ls {
    1194            1 :                 ft, fn, ok := base.ParseFilename(fs, filename)
    1195            1 :                 if !ok || ft != base.FileTypeOptions || fn < previousOptionsFileNum {
    1196            1 :                         continue
    1197              :                 }
    1198            1 :                 previousOptionsFileNum = fn
    1199            1 :                 desc.OptionsFilename = fs.PathJoin(dirname, filename)
    1200              :         }
    1201              : 
    1202            1 :         if exists {
    1203            1 :                 desc.ManifestFilename = base.MakeFilepath(fs, dirname, base.FileTypeManifest, manifestFileNum)
    1204            1 :         }
    1205            1 :         return desc, nil
    1206              : }
    1207              : 
    1208              : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
    1209              : // does not exist.
    1210              : //
    1211              : // Note that errors can be wrapped with more details; use errors.Is().
    1212              : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
    1213              : 
    1214              : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
    1215              : // already exists.
    1216              : //
    1217              : // Note that errors can be wrapped with more details; use errors.Is().
    1218              : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
    1219              : 
    1220              : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
    1221              : // already exists and is not pristine.
    1222              : //
    1223              : // Note that errors can be wrapped with more details; use errors.Is().
    1224              : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
    1225              : 
    1226            2 : func checkConsistency(v *manifest.Version, objProvider objstorage.Provider) error {
    1227            2 :         var errs []error
    1228            2 :         dedup := make(map[base.DiskFileNum]struct{})
    1229            2 :         for level, files := range v.Levels {
    1230            2 :                 for f := range files.All() {
    1231            2 :                         backingState := f.TableBacking
    1232            2 :                         if _, ok := dedup[backingState.DiskFileNum]; ok {
    1233            2 :                                 continue
    1234              :                         }
    1235            2 :                         dedup[backingState.DiskFileNum] = struct{}{}
    1236            2 :                         fileNum := backingState.DiskFileNum
    1237            2 :                         fileSize := backingState.Size
    1238            2 :                         // We skip over remote objects; those are instead checked asynchronously
    1239            2 :                         // by the table stats loading job.
    1240            2 :                         meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
    1241            2 :                         var size int64
    1242            2 :                         if err == nil {
    1243            2 :                                 if meta.IsRemote() {
    1244            2 :                                         continue
    1245              :                                 }
    1246            2 :                                 size, err = objProvider.Size(meta)
    1247              :                         }
    1248            2 :                         if err != nil {
    1249            1 :                                 errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
    1250            1 :                                 continue
    1251              :                         }
    1252              : 
    1253            2 :                         if size != int64(fileSize) {
    1254            0 :                                 errs = append(errs, errors.Errorf(
    1255            0 :                                         "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
    1256            0 :                                         errors.Safe(level), fileNum, objProvider.Path(meta),
    1257            0 :                                         errors.Safe(size), errors.Safe(fileSize)))
    1258            0 :                                 continue
    1259              :                         }
    1260              :                 }
    1261              :         }
    1262            2 :         return errors.Join(errs...)
    1263              : }
    1264              : 
    1265              : type walEventListenerAdaptor struct {
    1266              :         l *EventListener
    1267              : }
    1268              : 
    1269            2 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
    1270            2 :         // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
    1271            2 :         // is insufficient to infer whether primary or secondary.
    1272            2 :         wci := WALCreateInfo{
    1273            2 :                 JobID:           ci.JobID,
    1274            2 :                 Path:            ci.Path,
    1275            2 :                 FileNum:         base.DiskFileNum(ci.Num),
    1276            2 :                 RecycledFileNum: ci.RecycledFileNum,
    1277            2 :                 Err:             ci.Err,
    1278            2 :         }
    1279            2 :         l.l.WALCreated(wci)
    1280            2 : }
        

Generated by: LCOV version 2.0-1