LCOV - code coverage report
Current view: top level - pebble - open.go (source / functions) Coverage Total Hit
Test: 2025-07-28 08:19Z bb252436 - meta test only.lcov Lines: 64.6 % 916 592
Test Date: 2025-07-28 08:20:54 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "context"
      10              :         "encoding/binary"
      11              :         "fmt"
      12              :         "io"
      13              :         "math"
      14              :         "os"
      15              :         "slices"
      16              :         "sync/atomic"
      17              :         "time"
      18              : 
      19              :         "github.com/cockroachdb/crlib/crtime"
      20              :         "github.com/cockroachdb/errors"
      21              :         "github.com/cockroachdb/errors/oserror"
      22              :         "github.com/cockroachdb/pebble/batchrepr"
      23              :         "github.com/cockroachdb/pebble/internal/arenaskl"
      24              :         "github.com/cockroachdb/pebble/internal/base"
      25              :         "github.com/cockroachdb/pebble/internal/cache"
      26              :         "github.com/cockroachdb/pebble/internal/invariants"
      27              :         "github.com/cockroachdb/pebble/internal/keyspan"
      28              :         "github.com/cockroachdb/pebble/internal/manifest"
      29              :         "github.com/cockroachdb/pebble/internal/manual"
      30              :         "github.com/cockroachdb/pebble/objstorage"
      31              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      32              :         "github.com/cockroachdb/pebble/objstorage/remote"
      33              :         "github.com/cockroachdb/pebble/record"
      34              :         "github.com/cockroachdb/pebble/vfs"
      35              :         "github.com/cockroachdb/pebble/wal"
      36              :         "github.com/cockroachdb/redact"
      37              :         "github.com/prometheus/client_golang/prometheus"
      38              : )
      39              : 
      40              : const (
      41              :         initialMemTableSize = 256 << 10 // 256 KB
      42              : 
      43              :         // The max batch size is limited by the uint32 offsets stored in
      44              :         // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
      45              :         //
      46              :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      47              :         // end of an allocation fits in uint32.
      48              :         //
      49              :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      50              :         // 2GB).
      51              :         maxBatchSize = min(math.MaxUint32, math.MaxInt)
      52              : 
      53              :         // The max memtable size is limited by the uint32 offsets stored in
      54              :         // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
      55              :         //
      56              :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      57              :         // end of an allocation fits in uint32.
      58              :         //
      59              :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      60              :         // 2GB).
      61              :         maxMemTableSize = min(math.MaxUint32, math.MaxInt)
      62              : )
      63              : 
      64              : // FileCacheSize can be used to determine the file
      65              : // cache size for a single db, given the maximum open
      66              : // files which can be used by a file cache which is
      67              : // only used by a single db.
      68            1 : func FileCacheSize(maxOpenFiles int) int {
      69            1 :         fileCacheSize := maxOpenFiles - numNonFileCacheFiles
      70            1 :         if fileCacheSize < minFileCacheSize {
      71            1 :                 fileCacheSize = minFileCacheSize
      72            1 :         }
      73            1 :         return fileCacheSize
      74              : }
      75              : 
      76              : // Open opens a DB whose files live in the given directory.
      77              : //
      78              : // IsCorruptionError() can be use to determine if the error is caused by on-disk
      79              : // corruption.
      80            1 : func Open(dirname string, opts *Options) (db *DB, err error) {
      81            1 :         // Make a copy of the options so that we don't mutate the passed in options.
      82            1 :         opts = opts.Clone()
      83            1 :         opts.EnsureDefaults()
      84            1 :         if opts.Experimental.CompactionScheduler == nil {
      85            0 :                 opts.Experimental.CompactionScheduler = newConcurrencyLimitScheduler(defaultTimeSource{})
      86            0 :         }
      87            1 :         if err := opts.Validate(); err != nil {
      88            0 :                 return nil, err
      89            0 :         }
      90            1 :         if opts.LoggerAndTracer == nil {
      91            1 :                 opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
      92            1 :         } else {
      93            0 :                 opts.Logger = opts.LoggerAndTracer
      94            0 :         }
      95              : 
      96            1 :         if invariants.Sometimes(5) {
      97            1 :                 assertComparer := base.MakeAssertComparer(*opts.Comparer)
      98            1 :                 opts.Comparer = &assertComparer
      99            1 :         }
     100              : 
     101              :         // In all error cases, we return db = nil; this is used by various
     102              :         // deferred cleanups.
     103            1 :         maybeCleanUp := func(fn func() error) {
     104            1 :                 if db == nil {
     105            0 :                         err = errors.CombineErrors(err, fn())
     106            0 :                 }
     107              :         }
     108              : 
     109              :         // Open the database and WAL directories first.
     110            1 :         walDirname, secondaryWalDirName, dataDir, err := prepareAndOpenDirs(dirname, opts)
     111            1 :         if err != nil {
     112            0 :                 return nil, errors.Wrapf(err, "error opening database at %q", dirname)
     113            0 :         }
     114            1 :         defer maybeCleanUp(dataDir.Close)
     115            1 : 
     116            1 :         // Lock the database directory.
     117            1 :         fileLock, err := base.AcquireOrValidateDirectoryLock(opts.Lock, dirname, opts.FS)
     118            1 :         if err != nil {
     119            0 :                 return nil, err
     120            0 :         }
     121            1 :         defer maybeCleanUp(fileLock.Close)
     122            1 : 
     123            1 :         // List the directory contents. This also happens to include WAL log files, if
     124            1 :         // they are in the same dir, but we will ignore those below. The provider is
     125            1 :         // also given this list, but it ignores non sstable files.
     126            1 :         ls, err := opts.FS.List(dirname)
     127            1 :         if err != nil {
     128            0 :                 return nil, err
     129            0 :         }
     130              : 
     131              :         // Establish the format major version.
     132            1 :         formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
     133            1 :         if err != nil {
     134            0 :                 return nil, err
     135            0 :         }
     136            1 :         defer maybeCleanUp(formatVersionMarker.Close)
     137            1 : 
     138            1 :         noFormatVersionMarker := formatVersion == FormatDefault
     139            1 :         if noFormatVersionMarker {
     140            1 :                 // We will initialize the store at the minimum possible format, then upgrade
     141            1 :                 // the format to the desired one. This helps test the format upgrade code.
     142            1 :                 formatVersion = FormatMinSupported
     143            1 :                 if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
     144            1 :                         formatVersion = FormatMinForSharedObjects
     145            1 :                 }
     146              :                 // There is no format version marker file. There are three cases:
     147              :                 //  - we are trying to open an existing store that was created at
     148              :                 //    FormatMostCompatible (the only one without a version marker file)
     149              :                 //  - we are creating a new store;
     150              :                 //  - we are retrying a failed creation.
     151              :                 //
     152              :                 // To error in the first case, we set ErrorIfNotPristine.
     153            1 :                 opts.ErrorIfNotPristine = true
     154            1 :                 defer func() {
     155            1 :                         if err != nil && errors.Is(err, ErrDBNotPristine) {
     156            0 :                                 // We must be trying to open an existing store at FormatMostCompatible.
     157            0 :                                 // Correct the error in this case -we
     158            0 :                                 err = errors.Newf(
     159            0 :                                         "pebble: database %q written in format major version 1 which is no longer supported",
     160            0 :                                         dirname)
     161            0 :                         }
     162              :                 }()
     163              :         }
     164              : 
     165              :         // Find the currently active manifest, if there is one.
     166            1 :         manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
     167            1 :         if err != nil {
     168            0 :                 return nil, errors.Wrapf(err, "pebble: database %q", dirname)
     169            0 :         }
     170            1 :         defer maybeCleanUp(manifestMarker.Close)
     171            1 : 
     172            1 :         // Atomic markers may leave behind obsolete files if there's a crash
     173            1 :         // mid-update. Clean these up if we're not in read-only mode.
     174            1 :         if !opts.ReadOnly {
     175            1 :                 if err := formatVersionMarker.RemoveObsolete(); err != nil {
     176            0 :                         return nil, err
     177            0 :                 }
     178            1 :                 if err := manifestMarker.RemoveObsolete(); err != nil {
     179            0 :                         return nil, err
     180            0 :                 }
     181              :         }
     182              : 
     183            1 :         if opts.Cache == nil {
     184            1 :                 opts.Cache = cache.New(opts.CacheSize)
     185            1 :                 defer opts.Cache.Unref()
     186            1 :         }
     187              : 
     188            1 :         d := &DB{
     189            1 :                 cacheHandle:         opts.Cache.NewHandle(),
     190            1 :                 dirname:             dirname,
     191            1 :                 opts:                opts,
     192            1 :                 cmp:                 opts.Comparer.Compare,
     193            1 :                 equal:               opts.Comparer.Equal,
     194            1 :                 merge:               opts.Merger.Merge,
     195            1 :                 split:               opts.Comparer.Split,
     196            1 :                 abbreviatedKey:      opts.Comparer.AbbreviatedKey,
     197            1 :                 largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
     198            1 :                 dataDirLock:         fileLock,
     199            1 :                 dataDir:             dataDir,
     200            1 :                 closed:              new(atomic.Value),
     201            1 :                 closedCh:            make(chan struct{}),
     202            1 :         }
     203            1 :         d.mu.versions = &versionSet{}
     204            1 :         d.diskAvailBytes.Store(math.MaxUint64)
     205            1 :         d.problemSpans.Init(manifest.NumLevels, opts.Comparer.Compare)
     206            1 : 
     207            1 :         defer func() {
     208            1 :                 // If an error or panic occurs during open, attempt to release the manually
     209            1 :                 // allocated memory resources. Note that rather than look for an error, we
     210            1 :                 // look for the return of a nil DB pointer.
     211            1 :                 if r := recover(); db == nil {
     212            0 :                         // If there's an unused, recycled memtable, we need to release its memory.
     213            0 :                         if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
     214            0 :                                 d.freeMemTable(obsoleteMemTable)
     215            0 :                         }
     216              : 
     217            0 :                         if d.fileCache != nil {
     218            0 :                                 _ = d.fileCache.Close()
     219            0 :                         }
     220            0 :                         d.cacheHandle.Close()
     221            0 : 
     222            0 :                         for _, mem := range d.mu.mem.queue {
     223            0 :                                 switch t := mem.flushable.(type) {
     224            0 :                                 case *memTable:
     225            0 :                                         manual.Free(manual.MemTable, t.arenaBuf)
     226            0 :                                         t.arenaBuf = manual.Buf{}
     227              :                                 }
     228              :                         }
     229            0 :                         if d.cleanupManager != nil {
     230            0 :                                 d.cleanupManager.Close()
     231            0 :                         }
     232            0 :                         if d.objProvider != nil {
     233            0 :                                 _ = d.objProvider.Close()
     234            0 :                         }
     235            0 :                         if d.mu.versions.manifestFile != nil {
     236            0 :                                 _ = d.mu.versions.manifestFile.Close()
     237            0 :                         }
     238            0 :                         if r != nil {
     239            0 :                                 panic(r)
     240              :                         }
     241              :                 }
     242              :         }()
     243              : 
     244            1 :         d.commit = newCommitPipeline(commitEnv{
     245            1 :                 logSeqNum:     &d.mu.versions.logSeqNum,
     246            1 :                 visibleSeqNum: &d.mu.versions.visibleSeqNum,
     247            1 :                 apply:         d.commitApply,
     248            1 :                 write:         d.commitWrite,
     249            1 :         })
     250            1 :         d.mu.nextJobID = 1
     251            1 :         d.mu.mem.nextSize = opts.MemTableSize
     252            1 :         if d.mu.mem.nextSize > initialMemTableSize {
     253            1 :                 d.mu.mem.nextSize = initialMemTableSize
     254            1 :         }
     255            1 :         d.mu.compact.cond.L = &d.mu.Mutex
     256            1 :         d.mu.compact.inProgress = make(map[compaction]struct{})
     257            1 :         d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
     258            1 :         d.mu.snapshots.init()
     259            1 :         // logSeqNum is the next sequence number that will be assigned.
     260            1 :         // Start assigning sequence numbers from base.SeqNumStart to leave
     261            1 :         // room for reserved sequence numbers (see comments around
     262            1 :         // SeqNumStart).
     263            1 :         d.mu.versions.logSeqNum.Store(base.SeqNumStart)
     264            1 :         d.mu.formatVers.vers.Store(uint64(formatVersion))
     265            1 :         d.mu.formatVers.marker = formatVersionMarker
     266            1 : 
     267            1 :         d.timeNow = time.Now
     268            1 :         d.openedAt = d.timeNow()
     269            1 : 
     270            1 :         d.mu.Lock()
     271            1 :         defer d.mu.Unlock()
     272            1 : 
     273            1 :         jobID := d.newJobIDLocked()
     274            1 : 
     275            1 :         providerSettings := opts.MakeObjStorageProviderSettings(dirname)
     276            1 :         providerSettings.FSDirInitialListing = ls
     277            1 :         d.objProvider, err = objstorageprovider.Open(providerSettings)
     278            1 :         if err != nil {
     279            0 :                 return nil, err
     280            0 :         }
     281              : 
     282            1 :         blobRewriteHeuristic := manifest.BlobRewriteHeuristic{
     283            1 :                 CurrentTime: d.timeNow,
     284            1 :                 MinimumAge:  opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
     285            1 :         }
     286            1 : 
     287            1 :         if !manifestExists {
     288            1 :                 // DB does not exist.
     289            1 :                 if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
     290            0 :                         return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
     291            0 :                 }
     292              : 
     293              :                 // Create the DB.
     294            1 :                 if err := d.mu.versions.create(
     295            1 :                         jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
     296            0 :                         return nil, err
     297            0 :                 }
     298            1 :         } else {
     299            1 :                 if opts.ErrorIfExists {
     300            0 :                         return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
     301            0 :                 }
     302              :                 // Load the version set.
     303            1 :                 if err := d.mu.versions.load(
     304            1 :                         dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
     305            0 :                         return nil, err
     306            0 :                 }
     307            1 :                 if opts.ErrorIfNotPristine {
     308            0 :                         liveFileNums := make(map[base.DiskFileNum]struct{})
     309            0 :                         d.mu.versions.addLiveFileNums(liveFileNums)
     310            0 :                         if len(liveFileNums) != 0 {
     311            0 :                                 return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
     312            0 :                         }
     313              :                 }
     314              :         }
     315              : 
     316              :         // In read-only mode, we replay directly into the mutable memtable but never
     317              :         // flush it. We need to delay creation of the memtable until we know the
     318              :         // sequence number of the first batch that will be inserted.
     319            1 :         if !d.opts.ReadOnly {
     320            1 :                 var entry *flushableEntry
     321            1 :                 d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     322            1 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     323            1 :         }
     324              : 
     325            1 :         d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     326            1 :                 Buckets: FsyncLatencyBuckets,
     327            1 :         })
     328            1 : 
     329            1 :         walOpts := wal.Options{
     330            1 :                 Primary:              wal.Dir{FS: opts.FS, Dirname: walDirname},
     331            1 :                 Secondary:            wal.Dir{},
     332            1 :                 MinUnflushedWALNum:   wal.NumWAL(d.mu.versions.minUnflushedLogNum),
     333            1 :                 MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
     334            1 :                 NoSyncOnClose:        opts.NoSyncOnClose,
     335            1 :                 BytesPerSync:         opts.WALBytesPerSync,
     336            1 :                 PreallocateSize:      d.walPreallocateSize,
     337            1 :                 MinSyncInterval:      opts.WALMinSyncInterval,
     338            1 :                 FsyncLatency:         d.mu.log.metrics.fsyncLatency,
     339            1 :                 QueueSemChan:         d.commit.logSyncQSem,
     340            1 :                 Logger:               opts.Logger,
     341            1 :                 EventListener:        walEventListenerAdaptor{l: opts.EventListener},
     342            1 :                 WriteWALSyncOffsets:  func() bool { return d.FormatMajorVersion() >= FormatWALSyncChunks },
     343              :         }
     344              :         // Ensure we release the WAL directory locks if we fail to open the
     345              :         // database. If we fail before initializing the WAL manager, this defer is
     346              :         // responsible for releasing the locks. If we fail after initializing the
     347              :         // WAL manager, closing the WAL manager will release the locks.
     348              :         //
     349              :         // TODO(jackson): Open's cleanup error handling logic is convoluted; can we
     350              :         // simplify it?
     351            1 :         defer maybeCleanUp(func() (err error) {
     352            0 :                 if d.mu.log.manager == nil {
     353            0 :                         if walOpts.Primary.Lock != nil {
     354            0 :                                 err = errors.CombineErrors(err, walOpts.Primary.Lock.Close())
     355            0 :                         }
     356            0 :                         if walOpts.Secondary.Lock != nil {
     357            0 :                                 err = errors.CombineErrors(err, walOpts.Secondary.Lock.Close())
     358            0 :                         }
     359            0 :                         return err
     360              :                 }
     361            0 :                 return nil
     362              :         })
     363              : 
     364              :         // Lock the dedicated WAL directory, if configured.
     365            1 :         if walDirname != dirname {
     366            1 :                 walOpts.Primary.Lock, err = base.AcquireOrValidateDirectoryLock(opts.WALDirLock, walDirname, opts.FS)
     367            1 :                 if err != nil {
     368            0 :                         return nil, err
     369            0 :                 }
     370              :         }
     371            1 :         if opts.WALFailover != nil {
     372            1 :                 walOpts.Secondary = opts.WALFailover.Secondary
     373            1 :                 // Lock the secondary WAL directory, if distinct from the data directory
     374            1 :                 // and primary WAL directory.
     375            1 :                 if secondaryWalDirName != dirname && secondaryWalDirName != walDirname {
     376            1 :                         walOpts.Secondary.Lock, err = base.AcquireOrValidateDirectoryLock(
     377            1 :                                 opts.WALFailover.Secondary.Lock, secondaryWalDirName, opts.WALFailover.Secondary.FS)
     378            1 :                         if err != nil {
     379            0 :                                 return nil, err
     380            0 :                         }
     381              :                 }
     382            1 :                 walOpts.Secondary.Dirname = secondaryWalDirName
     383            1 :                 walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
     384            1 :                 walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     385            1 :                         Buckets: FsyncLatencyBuckets,
     386            1 :                 })
     387              :         }
     388            1 :         walDirs := walOpts.Dirs()
     389            1 :         walRecoveryLocks := make([]*base.DirLock, len(opts.WALRecoveryDirs))
     390            1 :         defer func() {
     391            1 :                 // We only need the recovery WALs during Open, so we can release
     392            1 :                 // the locks after the WALs have been scanned.
     393            1 :                 for _, l := range walRecoveryLocks {
     394            0 :                         if l != nil {
     395            0 :                                 _ = l.Close()
     396            0 :                         }
     397              :                 }
     398              :         }()
     399            1 :         for i, dir := range opts.WALRecoveryDirs {
     400            0 :                 dir.Dirname = resolveStorePath(dirname, dir.Dirname)
     401            0 :                 if dir.Dirname != dirname {
     402            0 :                         // Acquire a lock on the WAL recovery directory.
     403            0 :                         walRecoveryLocks[i], err = base.AcquireOrValidateDirectoryLock(dir.Lock, dir.Dirname, dir.FS)
     404            0 :                         if err != nil {
     405            0 :                                 return nil, errors.Wrapf(err, "error acquiring lock on WAL recovery directory %q", dir.Dirname)
     406            0 :                         }
     407              :                 }
     408            0 :                 walDirs = append(walDirs, dir)
     409              :         }
     410            1 :         wals, err := wal.Scan(walDirs...)
     411            1 :         if err != nil {
     412            0 :                 return nil, err
     413            0 :         }
     414            1 :         d.opts.Logger.Infof("Found %d WALs", redact.Safe(len(wals)))
     415            1 :         for i := range wals {
     416            1 :                 d.opts.Logger.Infof("  - %s", wals[i])
     417            1 :         }
     418            1 :         walManager, err := wal.Init(walOpts, wals)
     419            1 :         if err != nil {
     420            0 :                 return nil, err
     421            0 :         }
     422            1 :         defer maybeCleanUp(walManager.Close)
     423            1 :         d.mu.log.manager = walManager
     424            1 : 
     425            1 :         d.cleanupManager = openCleanupManager(opts, d.objProvider, d.getDeletionPacerInfo)
     426            1 : 
     427            1 :         if manifestExists && !opts.DisableConsistencyCheck {
     428            1 :                 curVersion := d.mu.versions.currentVersion()
     429            1 :                 if err := checkConsistency(curVersion, d.objProvider); err != nil {
     430            0 :                         return nil, err
     431            0 :                 }
     432              :         }
     433              : 
     434            1 :         fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
     435            1 :         if opts.FileCache == nil {
     436            1 :                 opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
     437            1 :                 defer opts.FileCache.Unref()
     438            1 :         }
     439            1 :         d.fileCache = opts.FileCache.newHandle(d.cacheHandle, d.objProvider, d.opts.LoggerAndTracer, d.opts.MakeReaderOptions(), d.reportCorruption)
     440            1 :         d.newIters = d.fileCache.newIters
     441            1 :         d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
     442            1 : 
     443            1 :         d.mu.annotators.totalFileSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
     444            0 :                 return true
     445            0 :         })
     446            1 :         d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
     447            0 :                 meta, err := d.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
     448            0 :                 if err != nil {
     449            0 :                         return false
     450            0 :                 }
     451            0 :                 return meta.IsRemote()
     452              :         })
     453            1 :         d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
     454            0 :                 meta, err := d.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
     455            0 :                 if err != nil {
     456            0 :                         return false
     457            0 :                 }
     458            0 :                 return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
     459              :         })
     460              : 
     461            1 :         var previousOptionsFileNum base.DiskFileNum
     462            1 :         var previousOptionsFilename string
     463            1 :         for _, filename := range ls {
     464            1 :                 ft, fn, ok := base.ParseFilename(opts.FS, filename)
     465            1 :                 if !ok {
     466            1 :                         continue
     467              :                 }
     468              : 
     469              :                 // Don't reuse any obsolete file numbers to avoid modifying an
     470              :                 // ingested sstable's original external file.
     471            1 :                 d.mu.versions.markFileNumUsed(fn)
     472            1 : 
     473            1 :                 switch ft {
     474            0 :                 case base.FileTypeLog:
     475              :                         // Ignore.
     476            1 :                 case base.FileTypeOptions:
     477            1 :                         if previousOptionsFileNum < fn {
     478            1 :                                 previousOptionsFileNum = fn
     479            1 :                                 previousOptionsFilename = filename
     480            1 :                         }
     481            0 :                 case base.FileTypeTemp, base.FileTypeOldTemp:
     482            0 :                         if !d.opts.ReadOnly {
     483            0 :                                 // Some codepaths write to a temporary file and then
     484            0 :                                 // rename it to its final location when complete.  A
     485            0 :                                 // temp file is leftover if a process exits before the
     486            0 :                                 // rename.  Remove it.
     487            0 :                                 err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
     488            0 :                                 if err != nil {
     489            0 :                                         return nil, err
     490            0 :                                 }
     491              :                         }
     492              :                 }
     493              :         }
     494            1 :         if n := len(wals); n > 0 {
     495            1 :                 // Don't reuse any obsolete file numbers to avoid modifying an
     496            1 :                 // ingested sstable's original external file.
     497            1 :                 d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
     498            1 :         }
     499              : 
     500              :         // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
     501              :         // objProvider. This avoids FileNum collisions with obsolete sstables.
     502            1 :         objects := d.objProvider.List()
     503            1 :         for _, obj := range objects {
     504            1 :                 d.mu.versions.markFileNumUsed(obj.DiskFileNum)
     505            1 :         }
     506              : 
     507              :         // Validate the most-recent OPTIONS file, if there is one.
     508            1 :         if previousOptionsFilename != "" {
     509            1 :                 path := opts.FS.PathJoin(dirname, previousOptionsFilename)
     510            1 :                 previousOptions, err := readOptionsFile(opts, path)
     511            1 :                 if err != nil {
     512            0 :                         return nil, err
     513            0 :                 }
     514            1 :                 if err := opts.CheckCompatibility(dirname, previousOptions); err != nil {
     515            0 :                         return nil, err
     516            0 :                 }
     517              :         }
     518              : 
     519              :         // Replay any newer log files than the ones named in the manifest.
     520            1 :         var replayWALs wal.Logs
     521            1 :         for i, w := range wals {
     522            1 :                 if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
     523            1 :                         replayWALs = wals[i:]
     524            1 :                         break
     525              :                 }
     526              :         }
     527            1 :         var flushableIngests []*ingestedFlushable
     528            1 :         for i, lf := range replayWALs {
     529            1 :                 // WALs other than the last one would have been closed cleanly.
     530            1 :                 //
     531            1 :                 // Note: we used to never require strict WAL tails when reading from older
     532            1 :                 // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
     533            1 :                 // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
     534            1 :                 // compatible Pebble format is newer and guarantees a clean EOF.
     535            1 :                 strictWALTail := i < len(replayWALs)-1
     536            1 :                 fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
     537            1 :                 if err != nil {
     538            0 :                         return nil, err
     539            0 :                 }
     540            1 :                 if len(fi) > 0 {
     541            1 :                         flushableIngests = append(flushableIngests, fi...)
     542            1 :                 }
     543            1 :                 if d.mu.versions.logSeqNum.Load() < maxSeqNum {
     544            1 :                         d.mu.versions.logSeqNum.Store(maxSeqNum)
     545            1 :                 }
     546              :         }
     547            1 :         if d.mu.mem.mutable == nil {
     548            1 :                 // Recreate the mutable memtable if replayWAL got rid of it.
     549            1 :                 var entry *flushableEntry
     550            1 :                 d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     551            1 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     552            1 :         }
     553            1 :         d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
     554            1 : 
     555            1 :         // Register with the CompactionScheduler before calling
     556            1 :         // d.maybeScheduleFlush, since completion of the flush can trigger
     557            1 :         // compactions.
     558            1 :         d.opts.Experimental.CompactionScheduler.Register(2, d)
     559            1 :         if !d.opts.ReadOnly {
     560            1 :                 d.maybeScheduleFlush()
     561            1 :                 for d.mu.compact.flushing {
     562            1 :                         d.mu.compact.cond.Wait()
     563            1 :                 }
     564              : 
     565              :                 // Create an empty .log file for the mutable memtable.
     566            1 :                 newLogNum := d.mu.versions.getNextDiskFileNum()
     567            1 :                 d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
     568            1 :                 if err != nil {
     569            0 :                         return nil, err
     570            0 :                 }
     571              : 
     572              :                 // This isn't strictly necessary as we don't use the log number for
     573              :                 // memtables being flushed, only for the next unflushed memtable.
     574            1 :                 d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
     575              :         }
     576            1 :         d.updateReadStateLocked(d.opts.DebugCheck)
     577            1 : 
     578            1 :         if !d.opts.ReadOnly {
     579            1 :                 // If the Options specify a format major version higher than the
     580            1 :                 // loaded database's, upgrade it. If this is a new database, this
     581            1 :                 // code path also performs an initial upgrade from the starting
     582            1 :                 // implicit MinSupported version.
     583            1 :                 //
     584            1 :                 // We ratchet the version this far into Open so that migrations have a read
     585            1 :                 // state available. Note that this also results in creating/updating the
     586            1 :                 // format version marker file.
     587            1 :                 if opts.FormatMajorVersion > d.FormatMajorVersion() {
     588            1 :                         if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
     589            0 :                                 return nil, err
     590            0 :                         }
     591            1 :                 } else if noFormatVersionMarker {
     592            1 :                         // We are creating a new store. Create the format version marker file.
     593            1 :                         if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
     594            0 :                                 return nil, err
     595            0 :                         }
     596              :                 }
     597              : 
     598              :                 // Write the current options to disk.
     599            1 :                 d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
     600            1 :                 tmpPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeTemp, d.optionsFileNum)
     601            1 :                 optionsPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeOptions, d.optionsFileNum)
     602            1 : 
     603            1 :                 // Write them to a temporary file first, in case we crash before
     604            1 :                 // we're done. A corrupt options file prevents opening the
     605            1 :                 // database.
     606            1 :                 optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
     607            1 :                 if err != nil {
     608            0 :                         return nil, err
     609            0 :                 }
     610            1 :                 serializedOpts := []byte(opts.String())
     611            1 :                 if _, err := optionsFile.Write(serializedOpts); err != nil {
     612            0 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     613            0 :                 }
     614            1 :                 d.optionsFileSize = uint64(len(serializedOpts))
     615            1 :                 if err := optionsFile.Sync(); err != nil {
     616            0 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     617            0 :                 }
     618            1 :                 if err := optionsFile.Close(); err != nil {
     619            0 :                         return nil, err
     620            0 :                 }
     621              :                 // Atomically rename to the OPTIONS-XXXXXX path. This rename is
     622              :                 // guaranteed to be atomic because the destination path does not
     623              :                 // exist.
     624            1 :                 if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
     625            0 :                         return nil, err
     626            0 :                 }
     627            1 :                 if err := d.dataDir.Sync(); err != nil {
     628            0 :                         return nil, err
     629            0 :                 }
     630              :         }
     631              : 
     632            1 :         if !d.opts.ReadOnly {
     633            1 :                 // Get a fresh list of files, in case some of the earlier flushes/compactions
     634            1 :                 // have deleted some files.
     635            1 :                 ls, err := opts.FS.List(dirname)
     636            1 :                 if err != nil {
     637            0 :                         return nil, err
     638            0 :                 }
     639            1 :                 d.scanObsoleteFiles(ls, flushableIngests)
     640            1 :                 d.deleteObsoleteFiles(jobID)
     641              :         }
     642              :         // Else, nothing is obsolete.
     643              : 
     644            1 :         d.mu.tableStats.cond.L = &d.mu.Mutex
     645            1 :         d.mu.tableValidation.cond.L = &d.mu.Mutex
     646            1 :         if !d.opts.ReadOnly {
     647            1 :                 d.maybeCollectTableStatsLocked()
     648            1 :         }
     649            1 :         d.calculateDiskAvailableBytes()
     650            1 : 
     651            1 :         d.maybeScheduleFlush()
     652            1 :         d.maybeScheduleCompaction()
     653            1 : 
     654            1 :         // Note: this is a no-op if invariants are disabled or race is enabled.
     655            1 :         //
     656            1 :         // Setting a finalizer on *DB causes *DB to never be reclaimed and the
     657            1 :         // finalizer to never be run. The problem is due to this limitation of
     658            1 :         // finalizers mention in the SetFinalizer docs:
     659            1 :         //
     660            1 :         //   If a cyclic structure includes a block with a finalizer, that cycle is
     661            1 :         //   not guaranteed to be garbage collected and the finalizer is not
     662            1 :         //   guaranteed to run, because there is no ordering that respects the
     663            1 :         //   dependencies.
     664            1 :         //
     665            1 :         // DB has cycles with several of its internal structures: readState,
     666            1 :         // newIters, fileCache, versions, etc. Each of this individually cause a
     667            1 :         // cycle and prevent the finalizer from being run. But we can workaround this
     668            1 :         // finializer limitation by setting a finalizer on another object that is
     669            1 :         // tied to the lifetime of DB: the DB.closed atomic.Value.
     670            1 :         dPtr := fmt.Sprintf("%p", d)
     671            1 :         invariants.SetFinalizer(d.closed, func(obj interface{}) {
     672            0 :                 v := obj.(*atomic.Value)
     673            0 :                 if err := v.Load(); err == nil {
     674            0 :                         fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
     675            0 :                         os.Exit(1)
     676            0 :                 }
     677              :         })
     678              : 
     679            1 :         return d, nil
     680              : }
     681              : 
     682              : // prepareAndOpenDirs opens the directories for the store (and creates them if
     683              : // necessary).
     684              : //
     685              : // Returns an error if ReadOnly is set and the directories don't exist.
     686              : func prepareAndOpenDirs(
     687              :         dirname string, opts *Options,
     688            1 : ) (walDirname string, secondaryWalDirName string, dataDir vfs.File, err error) {
     689            1 :         walDirname = dirname
     690            1 :         if opts.WALDir != "" {
     691            1 :                 walDirname = resolveStorePath(dirname, opts.WALDir)
     692            1 :         }
     693            1 :         if opts.WALFailover != nil {
     694            1 :                 secondaryWalDirName = resolveStorePath(dirname, opts.WALFailover.Secondary.Dirname)
     695            1 :         }
     696              : 
     697              :         // Create directories if needed.
     698            1 :         if !opts.ReadOnly {
     699            1 :                 f, err := mkdirAllAndSyncParents(opts.FS, dirname)
     700            1 :                 if err != nil {
     701            0 :                         return "", "", nil, err
     702            0 :                 }
     703            1 :                 f.Close()
     704            1 :                 if walDirname != dirname {
     705            1 :                         f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
     706            1 :                         if err != nil {
     707            0 :                                 return "", "", nil, err
     708            0 :                         }
     709            1 :                         f.Close()
     710              :                 }
     711            1 :                 if opts.WALFailover != nil {
     712            1 :                         f, err := mkdirAllAndSyncParents(opts.WALFailover.Secondary.FS, secondaryWalDirName)
     713            1 :                         if err != nil {
     714            0 :                                 return "", "", nil, err
     715            0 :                         }
     716            1 :                         f.Close()
     717              :                 }
     718              :         }
     719              : 
     720            1 :         dataDir, err = opts.FS.OpenDir(dirname)
     721            1 :         if err != nil {
     722            0 :                 if opts.ReadOnly && oserror.IsNotExist(err) {
     723            0 :                         return "", "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
     724            0 :                 }
     725            0 :                 return "", "", nil, err
     726              :         }
     727            1 :         if opts.ReadOnly && walDirname != dirname {
     728            0 :                 // Check that the wal dir exists.
     729            0 :                 walDir, err := opts.FS.OpenDir(walDirname)
     730            0 :                 if err != nil {
     731            0 :                         dataDir.Close()
     732            0 :                         return "", "", nil, err
     733            0 :                 }
     734            0 :                 walDir.Close()
     735              :         }
     736              : 
     737            1 :         return walDirname, secondaryWalDirName, dataDir, nil
     738              : }
     739              : 
     740              : // GetVersion returns the engine version string from the latest options
     741              : // file present in dir. Used to check what Pebble or RocksDB version was last
     742              : // used to write to the database stored in this directory. An empty string is
     743              : // returned if no valid OPTIONS file with a version key was found.
     744            0 : func GetVersion(dir string, fs vfs.FS) (string, error) {
     745            0 :         ls, err := fs.List(dir)
     746            0 :         if err != nil {
     747            0 :                 return "", err
     748            0 :         }
     749            0 :         var version string
     750            0 :         lastOptionsSeen := base.DiskFileNum(0)
     751            0 :         for _, filename := range ls {
     752            0 :                 ft, fn, ok := base.ParseFilename(fs, filename)
     753            0 :                 if !ok {
     754            0 :                         continue
     755              :                 }
     756            0 :                 switch ft {
     757            0 :                 case base.FileTypeOptions:
     758            0 :                         // If this file has a higher number than the last options file
     759            0 :                         // processed, reset version. This is because rocksdb often
     760            0 :                         // writes multiple options files without deleting previous ones.
     761            0 :                         // Otherwise, skip parsing this options file.
     762            0 :                         if fn > lastOptionsSeen {
     763            0 :                                 version = ""
     764            0 :                                 lastOptionsSeen = fn
     765            0 :                         } else {
     766            0 :                                 continue
     767              :                         }
     768            0 :                         f, err := fs.Open(fs.PathJoin(dir, filename))
     769            0 :                         if err != nil {
     770            0 :                                 return "", err
     771            0 :                         }
     772            0 :                         data, err := io.ReadAll(f)
     773            0 :                         f.Close()
     774            0 : 
     775            0 :                         if err != nil {
     776            0 :                                 return "", err
     777            0 :                         }
     778            0 :                         err = parseOptions(string(data), parseOptionsFuncs{
     779            0 :                                 visitKeyValue: func(i, j int, section, key, value string) error {
     780            0 :                                         switch {
     781            0 :                                         case section == "Version":
     782            0 :                                                 switch key {
     783            0 :                                                 case "pebble_version":
     784            0 :                                                         version = value
     785            0 :                                                 case "rocksdb_version":
     786            0 :                                                         version = fmt.Sprintf("rocksdb v%s", value)
     787              :                                                 }
     788              :                                         }
     789            0 :                                         return nil
     790              :                                 },
     791              :                         })
     792            0 :                         if err != nil {
     793            0 :                                 return "", err
     794            0 :                         }
     795              :                 }
     796              :         }
     797            0 :         return version, nil
     798              : }
     799              : 
     800              : func (d *DB) replayIngestedFlushable(
     801              :         b *Batch, logNum base.DiskFileNum,
     802            1 : ) (entry *flushableEntry, err error) {
     803            1 :         br := b.Reader()
     804            1 :         seqNum := b.SeqNum()
     805            1 : 
     806            1 :         fileNums := make([]base.DiskFileNum, 0, b.Count())
     807            1 :         var exciseSpan KeyRange
     808            1 :         addFileNum := func(encodedFileNum []byte) {
     809            1 :                 fileNum, n := binary.Uvarint(encodedFileNum)
     810            1 :                 if n <= 0 {
     811            0 :                         panic("pebble: ingest sstable file num is invalid")
     812              :                 }
     813            1 :                 fileNums = append(fileNums, base.DiskFileNum(fileNum))
     814              :         }
     815              : 
     816            1 :         for i := 0; i < int(b.Count()); i++ {
     817            1 :                 kind, key, val, ok, err := br.Next()
     818            1 :                 if err != nil {
     819            0 :                         return nil, err
     820            0 :                 }
     821            1 :                 if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
     822            0 :                         panic("pebble: invalid batch key kind")
     823              :                 }
     824            1 :                 if !ok {
     825            0 :                         panic("pebble: invalid batch count")
     826              :                 }
     827            1 :                 if kind == base.InternalKeyKindExcise {
     828            1 :                         if exciseSpan.Valid() {
     829            0 :                                 panic("pebble: multiple excise spans in a single batch")
     830              :                         }
     831            1 :                         exciseSpan.Start = slices.Clone(key)
     832            1 :                         exciseSpan.End = slices.Clone(val)
     833            1 :                         continue
     834              :                 }
     835            1 :                 addFileNum(key)
     836              :         }
     837              : 
     838            1 :         if _, _, _, ok, err := br.Next(); err != nil {
     839            0 :                 return nil, err
     840            1 :         } else if ok {
     841            0 :                 panic("pebble: invalid number of entries in batch")
     842              :         }
     843              : 
     844            1 :         meta := make([]*manifest.TableMetadata, len(fileNums))
     845            1 :         var lastRangeKey keyspan.Span
     846            1 :         for i, n := range fileNums {
     847            1 :                 readable, err := d.objProvider.OpenForReading(context.TODO(), base.FileTypeTable, n,
     848            1 :                         objstorage.OpenOptions{MustExist: true})
     849            1 :                 if err != nil {
     850            0 :                         return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
     851            0 :                 }
     852              :                 // NB: ingestLoad1 will close readable.
     853            1 :                 meta[i], lastRangeKey, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
     854            1 :                         readable, d.cacheHandle, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
     855            1 :                 if err != nil {
     856            0 :                         return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
     857            0 :                 }
     858              :         }
     859            1 :         if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
     860            0 :                 return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
     861            0 :                         d.opts.Comparer.FormatKey(lastRangeKey.End))
     862            0 :         }
     863              : 
     864            1 :         numFiles := len(meta)
     865            1 :         if exciseSpan.Valid() {
     866            1 :                 numFiles++
     867            1 :         }
     868            1 :         if uint32(numFiles) != b.Count() {
     869            0 :                 panic("pebble: couldn't load all files in WAL entry")
     870              :         }
     871              : 
     872            1 :         return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
     873              : }
     874              : 
     875              : // replayWAL replays the edits in the specified WAL. If the DB is in read
     876              : // only mode, then the WALs are replayed into memtables and not flushed. If
     877              : // the DB is not in read only mode, then the contents of the WAL are
     878              : // guaranteed to be flushed when a flush is scheduled after this method is run.
     879              : // Note that this flushing is very important for guaranteeing durability:
     880              : // the application may have had a number of pending
     881              : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
     882              : // happened but the corresponding data may now be readable from the WAL (while
     883              : // sitting in write-back caches in the kernel or the storage device). By
     884              : // reading the WAL (including the non-fsynced data) and then flushing all
     885              : // these changes (flush does fsyncs), we are able to guarantee that the
     886              : // initial state of the DB is durable.
     887              : //
     888              : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
     889              : // WALs into the flushable queue. Flushing of the queue is expected to be handled
     890              : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
     891              : //
     892              : // d.mu must be held when calling this, but the mutex may be dropped and
     893              : // re-acquired during the course of this method.
     894              : func (d *DB) replayWAL(
     895              :         jobID JobID, ll wal.LogicalLog, strictWALTail bool,
     896            1 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
     897            1 :         rr := ll.OpenForRead()
     898            1 :         defer func() { _ = rr.Close() }()
     899            1 :         var (
     900            1 :                 b               Batch
     901            1 :                 buf             bytes.Buffer
     902            1 :                 mem             *memTable
     903            1 :                 entry           *flushableEntry
     904            1 :                 offset          wal.Offset
     905            1 :                 lastFlushOffset int64
     906            1 :                 keysReplayed    int64 // number of keys replayed
     907            1 :                 batchesReplayed int64 // number of batches replayed
     908            1 :         )
     909            1 : 
     910            1 :         // TODO(jackson): This function is interspersed with panics, in addition to
     911            1 :         // corruption error propagation. Audit them to ensure we're truly only
     912            1 :         // panicking where the error points to Pebble bug and not user or
     913            1 :         // hardware-induced corruption.
     914            1 : 
     915            1 :         // "Flushes" (ie. closes off) the current memtable, if not nil.
     916            1 :         flushMem := func() {
     917            1 :                 if mem == nil {
     918            1 :                         return
     919            1 :                 }
     920            1 :                 mem.writerUnref()
     921            1 :                 if d.mu.mem.mutable == mem {
     922            1 :                         d.mu.mem.mutable = nil
     923            1 :                 }
     924            1 :                 entry.flushForced = !d.opts.ReadOnly
     925            1 :                 var logSize uint64
     926            1 :                 mergedOffset := offset.Physical + offset.PreviousFilesBytes
     927            1 :                 if mergedOffset >= lastFlushOffset {
     928            1 :                         logSize = uint64(mergedOffset - lastFlushOffset)
     929            1 :                 }
     930              :                 // Else, this was the initial memtable in the read-only case which must have
     931              :                 // been empty, but we need to flush it since we don't want to add to it later.
     932            1 :                 lastFlushOffset = mergedOffset
     933            1 :                 entry.logSize = logSize
     934            1 :                 mem, entry = nil, nil
     935              :         }
     936              : 
     937            1 :         mem = d.mu.mem.mutable
     938            1 :         if mem != nil {
     939            1 :                 entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
     940            1 :                 if !d.opts.ReadOnly {
     941            1 :                         flushMem()
     942            1 :                 }
     943              :         }
     944              : 
     945              :         // Creates a new memtable if there is no current memtable.
     946            1 :         ensureMem := func(seqNum base.SeqNum) {
     947            1 :                 if mem != nil {
     948            1 :                         return
     949            1 :                 }
     950            1 :                 mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
     951            1 :                 d.mu.mem.mutable = mem
     952            1 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     953              :         }
     954              : 
     955            1 :         defer func() {
     956            1 :                 if err != nil {
     957            0 :                         err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
     958            0 :                 }
     959              :         }()
     960              : 
     961            1 :         for {
     962            1 :                 var r io.Reader
     963            1 :                 var err error
     964            1 :                 r, offset, err = rr.NextRecord()
     965            1 :                 if err == nil {
     966            1 :                         _, err = io.Copy(&buf, r)
     967            1 :                 }
     968            1 :                 if err != nil {
     969            1 :                         // It is common to encounter a zeroed or invalid chunk due to WAL
     970            1 :                         // preallocation and WAL recycling. However zeroed or invalid chunks
     971            1 :                         // can also be a consequence of corruption / disk rot. When the log
     972            1 :                         // reader encounters one of these cases, it attempts to disambiguate
     973            1 :                         // by reading ahead looking for a future record. If a future chunk
     974            1 :                         // indicates the chunk at the original offset should've been valid, it
     975            1 :                         // surfaces record.ErrInvalidChunk or record.ErrZeroedChunk. These
     976            1 :                         // errors are always indicative of corruption and data loss.
     977            1 :                         //
     978            1 :                         // Otherwise, the reader surfaces record.ErrUnexpectedEOF indicating
     979            1 :                         // that the WAL terminated uncleanly and ambiguously. If the WAL is
     980            1 :                         // the most recent logical WAL, the caller passes in
     981            1 :                         // (strictWALTail=false), indicating we should tolerate the unclean
     982            1 :                         // ending. If the WAL is an older WAL, the caller passes in
     983            1 :                         // (strictWALTail=true), indicating that the WAL should have been
     984            1 :                         // closed cleanly, and we should interpret the
     985            1 :                         // `record.ErrUnexpectedEOF` as corruption and stop recovery.
     986            1 :                         if errors.Is(err, io.EOF) {
     987            1 :                                 break
     988            0 :                         } else if errors.Is(err, record.ErrUnexpectedEOF) && !strictWALTail {
     989            0 :                                 break
     990            0 :                         } else if (errors.Is(err, record.ErrUnexpectedEOF) && strictWALTail) ||
     991            0 :                                 errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) {
     992            0 :                                 // If a read-ahead returns record.ErrInvalidChunk or
     993            0 :                                 // record.ErrZeroedChunk, then there's definitively corruption.
     994            0 :                                 //
     995            0 :                                 // If strictWALTail=true, then record.ErrUnexpectedEOF should
     996            0 :                                 // also be considered corruption because the strictWALTail
     997            0 :                                 // indicates we expect a clean end to the WAL.
     998            0 :                                 //
     999            0 :                                 // Other I/O related errors should not be marked with corruption
    1000            0 :                                 // and simply returned.
    1001            0 :                                 err = errors.Mark(err, ErrCorruption)
    1002            0 :                         }
    1003              : 
    1004            0 :                         return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
    1005              :                 }
    1006              : 
    1007            1 :                 if buf.Len() < batchrepr.HeaderLen {
    1008            0 :                         return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
    1009            0 :                                 errors.Safe(base.DiskFileNum(ll.Num)), offset)
    1010            0 :                 }
    1011              : 
    1012            1 :                 if d.opts.ErrorIfNotPristine {
    1013            0 :                         return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
    1014            0 :                 }
    1015              : 
    1016              :                 // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
    1017              :                 // which is used below.
    1018            1 :                 b = Batch{}
    1019            1 :                 b.db = d
    1020            1 :                 if err := b.SetRepr(buf.Bytes()); err != nil {
    1021            0 :                         return nil, 0, err
    1022            0 :                 }
    1023            1 :                 seqNum := b.SeqNum()
    1024            1 :                 maxSeqNum = seqNum + base.SeqNum(b.Count())
    1025            1 :                 keysReplayed += int64(b.Count())
    1026            1 :                 batchesReplayed++
    1027            1 :                 {
    1028            1 :                         br := b.Reader()
    1029            1 :                         if kind, _, _, ok, err := br.Next(); err != nil {
    1030            0 :                                 return nil, 0, err
    1031            1 :                         } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
    1032            1 :                                 // We're in the flushable ingests (+ possibly excises) case.
    1033            1 :                                 //
    1034            1 :                                 // Ingests require an up-to-date view of the LSM to determine the target
    1035            1 :                                 // level of ingested sstables, and to accurately compute excises. Instead of
    1036            1 :                                 // doing an ingest in this function, we just enqueue a flushable ingest
    1037            1 :                                 // in the flushables queue and run a regular flush.
    1038            1 :                                 flushMem()
    1039            1 :                                 // mem is nil here.
    1040            1 :                                 entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
    1041            1 :                                 if err != nil {
    1042            0 :                                         return nil, 0, err
    1043            0 :                                 }
    1044            1 :                                 fi := entry.flushable.(*ingestedFlushable)
    1045            1 :                                 flushableIngests = append(flushableIngests, fi)
    1046            1 :                                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1047            1 :                                 // A flushable ingest is always followed by a WAL rotation.
    1048            1 :                                 break
    1049              :                         }
    1050              :                 }
    1051              : 
    1052            1 :                 if b.memTableSize >= uint64(d.largeBatchThreshold) {
    1053            1 :                         flushMem()
    1054            1 :                         // Make a copy of the data slice since it is currently owned by buf and will
    1055            1 :                         // be reused in the next iteration.
    1056            1 :                         b.data = slices.Clone(b.data)
    1057            1 :                         b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
    1058            1 :                         if err != nil {
    1059            0 :                                 return nil, 0, err
    1060            0 :                         }
    1061            1 :                         entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
    1062            1 :                         // Disable memory accounting by adding a reader ref that will never be
    1063            1 :                         // removed.
    1064            1 :                         entry.readerRefs.Add(1)
    1065            1 :                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1066            1 :                 } else {
    1067            1 :                         ensureMem(seqNum)
    1068            1 :                         if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
    1069            0 :                                 return nil, 0, err
    1070            0 :                         }
    1071              :                         // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
    1072              :                         // batch may not initially fit, but will eventually fit (since it is smaller than
    1073              :                         // largeBatchThreshold).
    1074            1 :                         for err == arenaskl.ErrArenaFull {
    1075            1 :                                 flushMem()
    1076            1 :                                 ensureMem(seqNum)
    1077            1 :                                 err = mem.prepare(&b)
    1078            1 :                                 if err != nil && err != arenaskl.ErrArenaFull {
    1079            0 :                                         return nil, 0, err
    1080            0 :                                 }
    1081              :                         }
    1082            1 :                         if err = mem.apply(&b, seqNum); err != nil {
    1083            0 :                                 return nil, 0, err
    1084            0 :                         }
    1085            1 :                         mem.writerUnref()
    1086              :                 }
    1087            1 :                 buf.Reset()
    1088              :         }
    1089              : 
    1090            1 :         d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
    1091            1 :                 jobID, ll.String(), offset, keysReplayed, batchesReplayed)
    1092            1 :         if !d.opts.ReadOnly {
    1093            1 :                 flushMem()
    1094            1 :         }
    1095              : 
    1096              :         // mem is nil here, if !ReadOnly.
    1097            1 :         return flushableIngests, maxSeqNum, err
    1098              : }
    1099              : 
    1100            1 : func readOptionsFile(opts *Options, path string) (string, error) {
    1101            1 :         f, err := opts.FS.Open(path)
    1102            1 :         if err != nil {
    1103            0 :                 return "", err
    1104            0 :         }
    1105            1 :         defer f.Close()
    1106            1 : 
    1107            1 :         data, err := io.ReadAll(f)
    1108            1 :         if err != nil {
    1109            0 :                 return "", err
    1110            0 :         }
    1111            1 :         return string(data), nil
    1112              : }
    1113              : 
    1114              : // DBDesc briefly describes high-level state about a database.
    1115              : type DBDesc struct {
    1116              :         // Exists is true if an existing database was found.
    1117              :         Exists bool
    1118              :         // FormatMajorVersion indicates the database's current format
    1119              :         // version.
    1120              :         FormatMajorVersion FormatMajorVersion
    1121              :         // ManifestFilename is the filename of the current active manifest,
    1122              :         // if the database exists.
    1123              :         ManifestFilename string
    1124              :         // OptionsFilename is the filename of the most recent OPTIONS file, if it
    1125              :         // exists.
    1126              :         OptionsFilename string
    1127              : }
    1128              : 
    1129              : // String implements fmt.Stringer.
    1130            0 : func (d *DBDesc) String() string {
    1131            0 :         if !d.Exists {
    1132            0 :                 return "uninitialized"
    1133            0 :         }
    1134            0 :         var buf bytes.Buffer
    1135            0 :         fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
    1136            0 :         fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
    1137            0 :         fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
    1138            0 :         return buf.String()
    1139              : }
    1140              : 
    1141              : // Peek looks for an existing database in dirname on the provided FS. It
    1142              : // returns a brief description of the database. Peek is read-only and
    1143              : // does not open the database
    1144            0 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
    1145            0 :         ls, err := fs.List(dirname)
    1146            0 :         if err != nil {
    1147            0 :                 return nil, err
    1148            0 :         }
    1149              : 
    1150            0 :         vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
    1151            0 :         if err != nil {
    1152            0 :                 return nil, err
    1153            0 :         }
    1154              :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1155              :         // PeekMarker variant that avoids opening the directory.
    1156            0 :         if err := versMarker.Close(); err != nil {
    1157            0 :                 return nil, err
    1158            0 :         }
    1159              : 
    1160              :         // Find the currently active manifest, if there is one.
    1161            0 :         manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
    1162            0 :         if err != nil {
    1163            0 :                 return nil, err
    1164            0 :         }
    1165              :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1166              :         // PeekMarker variant that avoids opening the directory.
    1167            0 :         if err := manifestMarker.Close(); err != nil {
    1168            0 :                 return nil, err
    1169            0 :         }
    1170              : 
    1171            0 :         desc := &DBDesc{
    1172            0 :                 Exists:             exists,
    1173            0 :                 FormatMajorVersion: vers,
    1174            0 :         }
    1175            0 : 
    1176            0 :         // Find the OPTIONS file with the highest file number within the list of
    1177            0 :         // directory entries.
    1178            0 :         var previousOptionsFileNum base.DiskFileNum
    1179            0 :         for _, filename := range ls {
    1180            0 :                 ft, fn, ok := base.ParseFilename(fs, filename)
    1181            0 :                 if !ok || ft != base.FileTypeOptions || fn < previousOptionsFileNum {
    1182            0 :                         continue
    1183              :                 }
    1184            0 :                 previousOptionsFileNum = fn
    1185            0 :                 desc.OptionsFilename = fs.PathJoin(dirname, filename)
    1186              :         }
    1187              : 
    1188            0 :         if exists {
    1189            0 :                 desc.ManifestFilename = base.MakeFilepath(fs, dirname, base.FileTypeManifest, manifestFileNum)
    1190            0 :         }
    1191            0 :         return desc, nil
    1192              : }
    1193              : 
    1194              : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
    1195              : // does not exist.
    1196              : //
    1197              : // Note that errors can be wrapped with more details; use errors.Is().
    1198              : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
    1199              : 
    1200              : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
    1201              : // already exists.
    1202              : //
    1203              : // Note that errors can be wrapped with more details; use errors.Is().
    1204              : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
    1205              : 
    1206              : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
    1207              : // already exists and is not pristine.
    1208              : //
    1209              : // Note that errors can be wrapped with more details; use errors.Is().
    1210              : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
    1211              : 
    1212            1 : func checkConsistency(v *manifest.Version, objProvider objstorage.Provider) error {
    1213            1 :         var errs []error
    1214            1 :         dedup := make(map[base.DiskFileNum]struct{})
    1215            1 :         for level, files := range v.Levels {
    1216            1 :                 for f := range files.All() {
    1217            1 :                         backingState := f.TableBacking
    1218            1 :                         if _, ok := dedup[backingState.DiskFileNum]; ok {
    1219            1 :                                 continue
    1220              :                         }
    1221            1 :                         dedup[backingState.DiskFileNum] = struct{}{}
    1222            1 :                         fileNum := backingState.DiskFileNum
    1223            1 :                         fileSize := backingState.Size
    1224            1 :                         // We skip over remote objects; those are instead checked asynchronously
    1225            1 :                         // by the table stats loading job.
    1226            1 :                         meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
    1227            1 :                         var size int64
    1228            1 :                         if err == nil {
    1229            1 :                                 if meta.IsRemote() {
    1230            1 :                                         continue
    1231              :                                 }
    1232            1 :                                 size, err = objProvider.Size(meta)
    1233              :                         }
    1234            1 :                         if err != nil {
    1235            0 :                                 errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
    1236            0 :                                 continue
    1237              :                         }
    1238              : 
    1239            1 :                         if size != int64(fileSize) {
    1240            0 :                                 errs = append(errs, errors.Errorf(
    1241            0 :                                         "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
    1242            0 :                                         errors.Safe(level), fileNum, objProvider.Path(meta),
    1243            0 :                                         errors.Safe(size), errors.Safe(fileSize)))
    1244            0 :                                 continue
    1245              :                         }
    1246              :                 }
    1247              :         }
    1248            1 :         return errors.Join(errs...)
    1249              : }
    1250              : 
    1251              : type walEventListenerAdaptor struct {
    1252              :         l *EventListener
    1253              : }
    1254              : 
    1255            1 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
    1256            1 :         // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
    1257            1 :         // is insufficient to infer whether primary or secondary.
    1258            1 :         wci := WALCreateInfo{
    1259            1 :                 JobID:           ci.JobID,
    1260            1 :                 Path:            ci.Path,
    1261            1 :                 FileNum:         base.DiskFileNum(ci.Num),
    1262            1 :                 RecycledFileNum: ci.RecycledFileNum,
    1263            1 :                 Err:             ci.Err,
    1264            1 :         }
    1265            1 :         l.l.WALCreated(wci)
    1266            1 : }
        

Generated by: LCOV version 2.0-1