LCOV - code coverage report
Current view: top level - pebble - open.go (source / functions) Coverage Total Hit
Test: 2025-06-18 08:19Z c54cc1a1 - meta test only.lcov Lines: 61.4 % 917 563
Test Date: 2025-06-18 08:20:13 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "context"
      10              :         "encoding/binary"
      11              :         "fmt"
      12              :         "io"
      13              :         "math"
      14              :         "os"
      15              :         "slices"
      16              :         "sync/atomic"
      17              :         "time"
      18              : 
      19              :         "github.com/cockroachdb/crlib/crtime"
      20              :         "github.com/cockroachdb/errors"
      21              :         "github.com/cockroachdb/errors/oserror"
      22              :         "github.com/cockroachdb/pebble/batchrepr"
      23              :         "github.com/cockroachdb/pebble/internal/arenaskl"
      24              :         "github.com/cockroachdb/pebble/internal/base"
      25              :         "github.com/cockroachdb/pebble/internal/cache"
      26              :         "github.com/cockroachdb/pebble/internal/constants"
      27              :         "github.com/cockroachdb/pebble/internal/invariants"
      28              :         "github.com/cockroachdb/pebble/internal/keyspan"
      29              :         "github.com/cockroachdb/pebble/internal/manifest"
      30              :         "github.com/cockroachdb/pebble/internal/manual"
      31              :         "github.com/cockroachdb/pebble/objstorage"
      32              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      33              :         "github.com/cockroachdb/pebble/objstorage/remote"
      34              :         "github.com/cockroachdb/pebble/record"
      35              :         "github.com/cockroachdb/pebble/vfs"
      36              :         "github.com/cockroachdb/pebble/wal"
      37              :         "github.com/prometheus/client_golang/prometheus"
      38              : )
      39              : 
      40              : const (
      41              :         initialMemTableSize = 256 << 10 // 256 KB
      42              : 
      43              :         // The max batch size is limited by the uint32 offsets stored in
      44              :         // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
      45              :         //
      46              :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      47              :         // end of an allocation fits in uint32.
      48              :         //
      49              :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      50              :         // 2GB).
      51              :         maxBatchSize = constants.MaxUint32OrInt
      52              : 
      53              :         // The max memtable size is limited by the uint32 offsets stored in
      54              :         // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
      55              :         //
      56              :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      57              :         // end of an allocation fits in uint32.
      58              :         //
      59              :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      60              :         // 2GB).
      61              :         maxMemTableSize = constants.MaxUint32OrInt
      62              : )
      63              : 
      64              : // FileCacheSize can be used to determine the file
      65              : // cache size for a single db, given the maximum open
      66              : // files which can be used by a file cache which is
      67              : // only used by a single db.
      68            1 : func FileCacheSize(maxOpenFiles int) int {
      69            1 :         fileCacheSize := maxOpenFiles - numNonFileCacheFiles
      70            1 :         if fileCacheSize < minFileCacheSize {
      71            1 :                 fileCacheSize = minFileCacheSize
      72            1 :         }
      73            1 :         return fileCacheSize
      74              : }
      75              : 
      76              : // Open opens a DB whose files live in the given directory.
      77              : //
      78              : // IsCorruptionError() can be use to determine if the error is caused by on-disk
      79              : // corruption.
      80            1 : func Open(dirname string, opts *Options) (db *DB, err error) {
      81            1 :         // Make a copy of the options so that we don't mutate the passed in options.
      82            1 :         opts = opts.Clone()
      83            1 :         opts.EnsureDefaults()
      84            1 :         if opts.Experimental.CompactionScheduler == nil {
      85            0 :                 opts.Experimental.CompactionScheduler = newConcurrencyLimitScheduler(defaultTimeSource{})
      86            0 :         }
      87            1 :         if err := opts.Validate(); err != nil {
      88            0 :                 return nil, err
      89            0 :         }
      90            1 :         if opts.LoggerAndTracer == nil {
      91            1 :                 opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
      92            1 :         } else {
      93            0 :                 opts.Logger = opts.LoggerAndTracer
      94            0 :         }
      95              : 
      96            1 :         if invariants.Sometimes(5) {
      97            1 :                 assertComparer := base.MakeAssertComparer(*opts.Comparer)
      98            1 :                 opts.Comparer = &assertComparer
      99            1 :         }
     100              : 
     101              :         // In all error cases, we return db = nil; this is used by various
     102              :         // deferred cleanups.
     103              : 
     104              :         // Open the database and WAL directories first.
     105            1 :         walDirname, dataDir, err := prepareAndOpenDirs(dirname, opts)
     106            1 :         if err != nil {
     107            0 :                 return nil, errors.Wrapf(err, "error opening database at %q", dirname)
     108            0 :         }
     109            1 :         defer func() {
     110            1 :                 if db == nil {
     111            0 :                         dataDir.Close()
     112            0 :                 }
     113              :         }()
     114              : 
     115              :         // Lock the database directory.
     116            1 :         var fileLock *Lock
     117            1 :         if opts.Lock != nil {
     118            0 :                 // The caller already acquired the database lock. Ensure that the
     119            0 :                 // directory matches.
     120            0 :                 if err := opts.Lock.pathMatches(dirname); err != nil {
     121            0 :                         return nil, err
     122            0 :                 }
     123            0 :                 if err := opts.Lock.refForOpen(); err != nil {
     124            0 :                         return nil, err
     125            0 :                 }
     126            0 :                 fileLock = opts.Lock
     127            1 :         } else {
     128            1 :                 fileLock, err = LockDirectory(dirname, opts.FS)
     129            1 :                 if err != nil {
     130            0 :                         return nil, err
     131            0 :                 }
     132              :         }
     133            1 :         defer func() {
     134            1 :                 if db == nil {
     135            0 :                         _ = fileLock.Close()
     136            0 :                 }
     137              :         }()
     138              : 
     139              :         // List the directory contents. This also happens to include WAL log files, if
     140              :         // they are in the same dir, but we will ignore those below. The provider is
     141              :         // also given this list, but it ignores non sstable files.
     142            1 :         ls, err := opts.FS.List(dirname)
     143            1 :         if err != nil {
     144            0 :                 return nil, err
     145            0 :         }
     146              : 
     147              :         // Establish the format major version.
     148            1 :         formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
     149            1 :         if err != nil {
     150            0 :                 return nil, err
     151            0 :         }
     152            1 :         defer func() {
     153            1 :                 if db == nil {
     154            0 :                         _ = formatVersionMarker.Close()
     155            0 :                 }
     156              :         }()
     157              : 
     158            1 :         noFormatVersionMarker := formatVersion == FormatDefault
     159            1 :         if noFormatVersionMarker {
     160            1 :                 // We will initialize the store at the minimum possible format, then upgrade
     161            1 :                 // the format to the desired one. This helps test the format upgrade code.
     162            1 :                 formatVersion = FormatMinSupported
     163            1 :                 if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
     164            1 :                         formatVersion = FormatMinForSharedObjects
     165            1 :                 }
     166              :                 // There is no format version marker file. There are three cases:
     167              :                 //  - we are trying to open an existing store that was created at
     168              :                 //    FormatMostCompatible (the only one without a version marker file)
     169              :                 //  - we are creating a new store;
     170              :                 //  - we are retrying a failed creation.
     171              :                 //
     172              :                 // To error in the first case, we set ErrorIfNotPristine.
     173            1 :                 opts.ErrorIfNotPristine = true
     174            1 :                 defer func() {
     175            1 :                         if err != nil && errors.Is(err, ErrDBNotPristine) {
     176            0 :                                 // We must be trying to open an existing store at FormatMostCompatible.
     177            0 :                                 // Correct the error in this case -we
     178            0 :                                 err = errors.Newf(
     179            0 :                                         "pebble: database %q written in format major version 1 which is no longer supported",
     180            0 :                                         dirname)
     181            0 :                         }
     182              :                 }()
     183              :         }
     184              : 
     185              :         // Find the currently active manifest, if there is one.
     186            1 :         manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
     187            1 :         if err != nil {
     188            0 :                 return nil, errors.Wrapf(err, "pebble: database %q", dirname)
     189            0 :         }
     190            1 :         defer func() {
     191            1 :                 if db == nil {
     192            0 :                         _ = manifestMarker.Close()
     193            0 :                 }
     194              :         }()
     195              : 
     196              :         // Atomic markers may leave behind obsolete files if there's a crash
     197              :         // mid-update. Clean these up if we're not in read-only mode.
     198            1 :         if !opts.ReadOnly {
     199            1 :                 if err := formatVersionMarker.RemoveObsolete(); err != nil {
     200            0 :                         return nil, err
     201            0 :                 }
     202            1 :                 if err := manifestMarker.RemoveObsolete(); err != nil {
     203            0 :                         return nil, err
     204            0 :                 }
     205              :         }
     206              : 
     207            1 :         if opts.Cache == nil {
     208            1 :                 opts.Cache = cache.New(opts.CacheSize)
     209            1 :                 defer opts.Cache.Unref()
     210            1 :         }
     211              : 
     212            1 :         d := &DB{
     213            1 :                 cacheHandle:         opts.Cache.NewHandle(),
     214            1 :                 dirname:             dirname,
     215            1 :                 opts:                opts,
     216            1 :                 cmp:                 opts.Comparer.Compare,
     217            1 :                 equal:               opts.Comparer.Equal,
     218            1 :                 merge:               opts.Merger.Merge,
     219            1 :                 split:               opts.Comparer.Split,
     220            1 :                 abbreviatedKey:      opts.Comparer.AbbreviatedKey,
     221            1 :                 largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
     222            1 :                 fileLock:            fileLock,
     223            1 :                 dataDir:             dataDir,
     224            1 :                 closed:              new(atomic.Value),
     225            1 :                 closedCh:            make(chan struct{}),
     226            1 :         }
     227            1 :         d.mu.versions = &versionSet{}
     228            1 :         d.diskAvailBytes.Store(math.MaxUint64)
     229            1 :         d.problemSpans.Init(manifest.NumLevels, opts.Comparer.Compare)
     230            1 : 
     231            1 :         defer func() {
     232            1 :                 // If an error or panic occurs during open, attempt to release the manually
     233            1 :                 // allocated memory resources. Note that rather than look for an error, we
     234            1 :                 // look for the return of a nil DB pointer.
     235            1 :                 if r := recover(); db == nil {
     236            0 :                         // If there's an unused, recycled memtable, we need to release its memory.
     237            0 :                         if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
     238            0 :                                 d.freeMemTable(obsoleteMemTable)
     239            0 :                         }
     240              : 
     241            0 :                         if d.fileCache != nil {
     242            0 :                                 _ = d.fileCache.Close()
     243            0 :                         }
     244            0 :                         d.cacheHandle.Close()
     245            0 : 
     246            0 :                         for _, mem := range d.mu.mem.queue {
     247            0 :                                 switch t := mem.flushable.(type) {
     248            0 :                                 case *memTable:
     249            0 :                                         manual.Free(manual.MemTable, t.arenaBuf)
     250            0 :                                         t.arenaBuf = manual.Buf{}
     251              :                                 }
     252              :                         }
     253            0 :                         if d.cleanupManager != nil {
     254            0 :                                 d.cleanupManager.Close()
     255            0 :                         }
     256            0 :                         if d.objProvider != nil {
     257            0 :                                 _ = d.objProvider.Close()
     258            0 :                         }
     259            0 :                         if r != nil {
     260            0 :                                 panic(r)
     261              :                         }
     262              :                 }
     263              :         }()
     264              : 
     265            1 :         d.commit = newCommitPipeline(commitEnv{
     266            1 :                 logSeqNum:     &d.mu.versions.logSeqNum,
     267            1 :                 visibleSeqNum: &d.mu.versions.visibleSeqNum,
     268            1 :                 apply:         d.commitApply,
     269            1 :                 write:         d.commitWrite,
     270            1 :         })
     271            1 :         d.mu.nextJobID = 1
     272            1 :         d.mu.mem.nextSize = opts.MemTableSize
     273            1 :         if d.mu.mem.nextSize > initialMemTableSize {
     274            1 :                 d.mu.mem.nextSize = initialMemTableSize
     275            1 :         }
     276            1 :         d.mu.compact.cond.L = &d.mu.Mutex
     277            1 :         d.mu.compact.inProgress = make(map[*compaction]struct{})
     278            1 :         d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
     279            1 :         d.mu.snapshots.init()
     280            1 :         // logSeqNum is the next sequence number that will be assigned.
     281            1 :         // Start assigning sequence numbers from base.SeqNumStart to leave
     282            1 :         // room for reserved sequence numbers (see comments around
     283            1 :         // SeqNumStart).
     284            1 :         d.mu.versions.logSeqNum.Store(base.SeqNumStart)
     285            1 :         d.mu.formatVers.vers.Store(uint64(formatVersion))
     286            1 :         d.mu.formatVers.marker = formatVersionMarker
     287            1 : 
     288            1 :         d.timeNow = time.Now
     289            1 :         d.openedAt = d.timeNow()
     290            1 : 
     291            1 :         d.mu.Lock()
     292            1 :         defer d.mu.Unlock()
     293            1 : 
     294            1 :         jobID := d.newJobIDLocked()
     295            1 : 
     296            1 :         providerSettings := opts.MakeObjStorageProviderSettings(dirname)
     297            1 :         providerSettings.FSDirInitialListing = ls
     298            1 :         d.objProvider, err = objstorageprovider.Open(providerSettings)
     299            1 :         if err != nil {
     300            0 :                 return nil, err
     301            0 :         }
     302              : 
     303            1 :         if !manifestExists {
     304            1 :                 // DB does not exist.
     305            1 :                 if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
     306            0 :                         return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
     307            0 :                 }
     308              : 
     309              :                 // Create the DB.
     310            1 :                 if err := d.mu.versions.create(
     311            1 :                         jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     312            0 :                         return nil, err
     313            0 :                 }
     314            1 :         } else {
     315            1 :                 if opts.ErrorIfExists {
     316            0 :                         return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
     317            0 :                 }
     318              :                 // Load the version set.
     319            1 :                 if err := d.mu.versions.load(
     320            1 :                         dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     321            0 :                         return nil, err
     322            0 :                 }
     323            1 :                 if opts.ErrorIfNotPristine {
     324            0 :                         liveFileNums := make(map[base.DiskFileNum]struct{})
     325            0 :                         d.mu.versions.addLiveFileNums(liveFileNums)
     326            0 :                         if len(liveFileNums) != 0 {
     327            0 :                                 return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
     328            0 :                         }
     329              :                 }
     330              :         }
     331              : 
     332              :         // In read-only mode, we replay directly into the mutable memtable but never
     333              :         // flush it. We need to delay creation of the memtable until we know the
     334              :         // sequence number of the first batch that will be inserted.
     335            1 :         if !d.opts.ReadOnly {
     336            1 :                 var entry *flushableEntry
     337            1 :                 d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     338            1 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     339            1 :         }
     340              : 
     341            1 :         d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     342            1 :                 Buckets: FsyncLatencyBuckets,
     343            1 :         })
     344            1 :         walOpts := wal.Options{
     345            1 :                 Primary:              wal.Dir{FS: opts.FS, Dirname: walDirname},
     346            1 :                 Secondary:            wal.Dir{},
     347            1 :                 MinUnflushedWALNum:   wal.NumWAL(d.mu.versions.minUnflushedLogNum),
     348            1 :                 MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
     349            1 :                 NoSyncOnClose:        opts.NoSyncOnClose,
     350            1 :                 BytesPerSync:         opts.WALBytesPerSync,
     351            1 :                 PreallocateSize:      d.walPreallocateSize,
     352            1 :                 MinSyncInterval:      opts.WALMinSyncInterval,
     353            1 :                 FsyncLatency:         d.mu.log.metrics.fsyncLatency,
     354            1 :                 QueueSemChan:         d.commit.logSyncQSem,
     355            1 :                 Logger:               opts.Logger,
     356            1 :                 EventListener:        walEventListenerAdaptor{l: opts.EventListener},
     357            1 :                 WriteWALSyncOffsets:  func() bool { return d.FormatMajorVersion() >= FormatWALSyncChunks },
     358              :         }
     359            1 :         if opts.WALFailover != nil {
     360            1 :                 walOpts.Secondary = opts.WALFailover.Secondary
     361            1 :                 walOpts.Secondary.Dirname = resolveStorePath(dirname, walOpts.Secondary.Dirname)
     362            1 :                 walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
     363            1 :                 walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     364            1 :                         Buckets: FsyncLatencyBuckets,
     365            1 :                 })
     366            1 :         }
     367            1 :         walDirs := walOpts.Dirs()
     368            1 :         for _, dir := range opts.WALRecoveryDirs {
     369            0 :                 dir.Dirname = resolveStorePath(dirname, dir.Dirname)
     370            0 :                 walDirs = append(walDirs, dir)
     371            0 :         }
     372            1 :         wals, err := wal.Scan(walDirs...)
     373            1 :         if err != nil {
     374            0 :                 return nil, err
     375            0 :         }
     376            1 :         walManager, err := wal.Init(walOpts, wals)
     377            1 :         if err != nil {
     378            0 :                 return nil, err
     379            0 :         }
     380            1 :         defer func() {
     381            1 :                 if db == nil {
     382            0 :                         _ = walManager.Close()
     383            0 :                 }
     384              :         }()
     385              : 
     386            1 :         d.mu.log.manager = walManager
     387            1 : 
     388            1 :         d.cleanupManager = openCleanupManager(opts, d.objProvider, d.getDeletionPacerInfo)
     389            1 : 
     390            1 :         if manifestExists && !opts.DisableConsistencyCheck {
     391            1 :                 curVersion := d.mu.versions.currentVersion()
     392            1 :                 if err := checkConsistency(curVersion, d.objProvider); err != nil {
     393            0 :                         return nil, err
     394            0 :                 }
     395              :         }
     396              : 
     397            1 :         fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
     398            1 :         if opts.FileCache == nil {
     399            1 :                 opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
     400            1 :                 defer opts.FileCache.Unref()
     401            1 :         }
     402            1 :         d.fileCache = opts.FileCache.newHandle(d.cacheHandle, d.objProvider, d.opts.LoggerAndTracer, d.opts.MakeReaderOptions(), d.reportCorruption)
     403            1 :         d.newIters = d.fileCache.newIters
     404            1 :         d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
     405            1 : 
     406            1 :         d.mu.annotators.totalFileSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
     407            0 :                 return true
     408            0 :         })
     409            1 :         d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
     410            0 :                 meta, err := d.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
     411            0 :                 if err != nil {
     412            0 :                         return false
     413            0 :                 }
     414            0 :                 return meta.IsRemote()
     415              :         })
     416            1 :         d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
     417            0 :                 meta, err := d.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
     418            0 :                 if err != nil {
     419            0 :                         return false
     420            0 :                 }
     421            0 :                 return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
     422              :         })
     423              : 
     424            1 :         var previousOptionsFileNum base.DiskFileNum
     425            1 :         var previousOptionsFilename string
     426            1 :         for _, filename := range ls {
     427            1 :                 ft, fn, ok := base.ParseFilename(opts.FS, filename)
     428            1 :                 if !ok {
     429            1 :                         continue
     430              :                 }
     431              : 
     432              :                 // Don't reuse any obsolete file numbers to avoid modifying an
     433              :                 // ingested sstable's original external file.
     434            1 :                 d.mu.versions.markFileNumUsed(fn)
     435            1 : 
     436            1 :                 switch ft {
     437            0 :                 case base.FileTypeLog:
     438              :                         // Ignore.
     439            1 :                 case base.FileTypeOptions:
     440            1 :                         if previousOptionsFileNum < fn {
     441            1 :                                 previousOptionsFileNum = fn
     442            1 :                                 previousOptionsFilename = filename
     443            1 :                         }
     444            0 :                 case base.FileTypeTemp, base.FileTypeOldTemp:
     445            0 :                         if !d.opts.ReadOnly {
     446            0 :                                 // Some codepaths write to a temporary file and then
     447            0 :                                 // rename it to its final location when complete.  A
     448            0 :                                 // temp file is leftover if a process exits before the
     449            0 :                                 // rename.  Remove it.
     450            0 :                                 err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
     451            0 :                                 if err != nil {
     452            0 :                                         return nil, err
     453            0 :                                 }
     454              :                         }
     455              :                 }
     456              :         }
     457            1 :         if n := len(wals); n > 0 {
     458            1 :                 // Don't reuse any obsolete file numbers to avoid modifying an
     459            1 :                 // ingested sstable's original external file.
     460            1 :                 d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
     461            1 :         }
     462              : 
     463              :         // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
     464              :         // objProvider. This avoids FileNum collisions with obsolete sstables.
     465            1 :         objects := d.objProvider.List()
     466            1 :         for _, obj := range objects {
     467            1 :                 d.mu.versions.markFileNumUsed(obj.DiskFileNum)
     468            1 :         }
     469              : 
     470              :         // Validate the most-recent OPTIONS file, if there is one.
     471            1 :         if previousOptionsFilename != "" {
     472            1 :                 path := opts.FS.PathJoin(dirname, previousOptionsFilename)
     473            1 :                 previousOptions, err := readOptionsFile(opts, path)
     474            1 :                 if err != nil {
     475            0 :                         return nil, err
     476            0 :                 }
     477            1 :                 if err := opts.CheckCompatibility(dirname, previousOptions); err != nil {
     478            0 :                         return nil, err
     479            0 :                 }
     480              :         }
     481              : 
     482              :         // Replay any newer log files than the ones named in the manifest.
     483            1 :         var replayWALs wal.Logs
     484            1 :         for i, w := range wals {
     485            1 :                 if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
     486            1 :                         replayWALs = wals[i:]
     487            1 :                         break
     488              :                 }
     489              :         }
     490            1 :         var flushableIngests []*ingestedFlushable
     491            1 :         for i, lf := range replayWALs {
     492            1 :                 // WALs other than the last one would have been closed cleanly.
     493            1 :                 //
     494            1 :                 // Note: we used to never require strict WAL tails when reading from older
     495            1 :                 // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
     496            1 :                 // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
     497            1 :                 // compatible Pebble format is newer and guarantees a clean EOF.
     498            1 :                 strictWALTail := i < len(replayWALs)-1
     499            1 :                 fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
     500            1 :                 if err != nil {
     501            0 :                         return nil, err
     502            0 :                 }
     503            1 :                 if len(fi) > 0 {
     504            1 :                         flushableIngests = append(flushableIngests, fi...)
     505            1 :                 }
     506            1 :                 if d.mu.versions.logSeqNum.Load() < maxSeqNum {
     507            1 :                         d.mu.versions.logSeqNum.Store(maxSeqNum)
     508            1 :                 }
     509              :         }
     510            1 :         if d.mu.mem.mutable == nil {
     511            1 :                 // Recreate the mutable memtable if replayWAL got rid of it.
     512            1 :                 var entry *flushableEntry
     513            1 :                 d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     514            1 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     515            1 :         }
     516            1 :         d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
     517            1 : 
     518            1 :         // Register with the CompactionScheduler before calling
     519            1 :         // d.maybeScheduleFlush, since completion of the flush can trigger
     520            1 :         // compactions.
     521            1 :         d.opts.Experimental.CompactionScheduler.Register(2, d)
     522            1 :         if !d.opts.ReadOnly {
     523            1 :                 d.maybeScheduleFlush()
     524            1 :                 for d.mu.compact.flushing {
     525            1 :                         d.mu.compact.cond.Wait()
     526            1 :                 }
     527              : 
     528              :                 // Create an empty .log file for the mutable memtable.
     529            1 :                 newLogNum := d.mu.versions.getNextDiskFileNum()
     530            1 :                 d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
     531            1 :                 if err != nil {
     532            0 :                         return nil, err
     533            0 :                 }
     534              : 
     535              :                 // This isn't strictly necessary as we don't use the log number for
     536              :                 // memtables being flushed, only for the next unflushed memtable.
     537            1 :                 d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
     538              :         }
     539            1 :         d.updateReadStateLocked(d.opts.DebugCheck)
     540            1 : 
     541            1 :         if !d.opts.ReadOnly {
     542            1 :                 // If the Options specify a format major version higher than the
     543            1 :                 // loaded database's, upgrade it. If this is a new database, this
     544            1 :                 // code path also performs an initial upgrade from the starting
     545            1 :                 // implicit MinSupported version.
     546            1 :                 //
     547            1 :                 // We ratchet the version this far into Open so that migrations have a read
     548            1 :                 // state available. Note that this also results in creating/updating the
     549            1 :                 // format version marker file.
     550            1 :                 if opts.FormatMajorVersion > d.FormatMajorVersion() {
     551            1 :                         if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
     552            0 :                                 return nil, err
     553            0 :                         }
     554            1 :                 } else if noFormatVersionMarker {
     555            1 :                         // We are creating a new store. Create the format version marker file.
     556            1 :                         if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
     557            0 :                                 return nil, err
     558            0 :                         }
     559              :                 }
     560              : 
     561              :                 // Write the current options to disk.
     562            1 :                 d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
     563            1 :                 tmpPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeTemp, d.optionsFileNum)
     564            1 :                 optionsPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeOptions, d.optionsFileNum)
     565            1 : 
     566            1 :                 // Write them to a temporary file first, in case we crash before
     567            1 :                 // we're done. A corrupt options file prevents opening the
     568            1 :                 // database.
     569            1 :                 optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
     570            1 :                 if err != nil {
     571            0 :                         return nil, err
     572            0 :                 }
     573            1 :                 serializedOpts := []byte(opts.String())
     574            1 :                 if _, err := optionsFile.Write(serializedOpts); err != nil {
     575            0 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     576            0 :                 }
     577            1 :                 d.optionsFileSize = uint64(len(serializedOpts))
     578            1 :                 if err := optionsFile.Sync(); err != nil {
     579            0 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     580            0 :                 }
     581            1 :                 if err := optionsFile.Close(); err != nil {
     582            0 :                         return nil, err
     583            0 :                 }
     584              :                 // Atomically rename to the OPTIONS-XXXXXX path. This rename is
     585              :                 // guaranteed to be atomic because the destination path does not
     586              :                 // exist.
     587            1 :                 if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
     588            0 :                         return nil, err
     589            0 :                 }
     590            1 :                 if err := d.dataDir.Sync(); err != nil {
     591            0 :                         return nil, err
     592            0 :                 }
     593              :         }
     594              : 
     595            1 :         if !d.opts.ReadOnly {
     596            1 :                 // Get a fresh list of files, in case some of the earlier flushes/compactions
     597            1 :                 // have deleted some files.
     598            1 :                 ls, err := opts.FS.List(dirname)
     599            1 :                 if err != nil {
     600            0 :                         return nil, err
     601            0 :                 }
     602            1 :                 d.scanObsoleteFiles(ls, flushableIngests)
     603            1 :                 d.deleteObsoleteFiles(jobID)
     604              :         }
     605              :         // Else, nothing is obsolete.
     606              : 
     607            1 :         d.mu.tableStats.cond.L = &d.mu.Mutex
     608            1 :         d.mu.tableValidation.cond.L = &d.mu.Mutex
     609            1 :         if !d.opts.ReadOnly {
     610            1 :                 d.maybeCollectTableStatsLocked()
     611            1 :         }
     612            1 :         d.calculateDiskAvailableBytes()
     613            1 : 
     614            1 :         d.maybeScheduleFlush()
     615            1 :         d.maybeScheduleCompaction()
     616            1 : 
     617            1 :         // Note: this is a no-op if invariants are disabled or race is enabled.
     618            1 :         //
     619            1 :         // Setting a finalizer on *DB causes *DB to never be reclaimed and the
     620            1 :         // finalizer to never be run. The problem is due to this limitation of
     621            1 :         // finalizers mention in the SetFinalizer docs:
     622            1 :         //
     623            1 :         //   If a cyclic structure includes a block with a finalizer, that cycle is
     624            1 :         //   not guaranteed to be garbage collected and the finalizer is not
     625            1 :         //   guaranteed to run, because there is no ordering that respects the
     626            1 :         //   dependencies.
     627            1 :         //
     628            1 :         // DB has cycles with several of its internal structures: readState,
     629            1 :         // newIters, fileCache, versions, etc. Each of this individually cause a
     630            1 :         // cycle and prevent the finalizer from being run. But we can workaround this
     631            1 :         // finializer limitation by setting a finalizer on another object that is
     632            1 :         // tied to the lifetime of DB: the DB.closed atomic.Value.
     633            1 :         dPtr := fmt.Sprintf("%p", d)
     634            1 :         invariants.SetFinalizer(d.closed, func(obj interface{}) {
     635            0 :                 v := obj.(*atomic.Value)
     636            0 :                 if err := v.Load(); err == nil {
     637            0 :                         fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
     638            0 :                         os.Exit(1)
     639            0 :                 }
     640              :         })
     641              : 
     642            1 :         return d, nil
     643              : }
     644              : 
     645              : // prepareAndOpenDirs opens the directories for the store (and creates them if
     646              : // necessary).
     647              : //
     648              : // Returns an error if ReadOnly is set and the directories don't exist.
     649              : func prepareAndOpenDirs(
     650              :         dirname string, opts *Options,
     651            1 : ) (walDirname string, dataDir vfs.File, err error) {
     652            1 :         walDirname = dirname
     653            1 :         if opts.WALDir != "" {
     654            1 :                 walDirname = resolveStorePath(dirname, opts.WALDir)
     655            1 :         }
     656              : 
     657              :         // Create directories if needed.
     658            1 :         if !opts.ReadOnly {
     659            1 :                 f, err := mkdirAllAndSyncParents(opts.FS, dirname)
     660            1 :                 if err != nil {
     661            0 :                         return "", nil, err
     662            0 :                 }
     663            1 :                 f.Close()
     664            1 :                 if walDirname != dirname {
     665            1 :                         f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
     666            1 :                         if err != nil {
     667            0 :                                 return "", nil, err
     668            0 :                         }
     669            1 :                         f.Close()
     670              :                 }
     671            1 :                 if opts.WALFailover != nil {
     672            1 :                         secondary := opts.WALFailover.Secondary
     673            1 :                         f, err := mkdirAllAndSyncParents(secondary.FS, resolveStorePath(dirname, secondary.Dirname))
     674            1 :                         if err != nil {
     675            0 :                                 return "", nil, err
     676            0 :                         }
     677            1 :                         f.Close()
     678              :                 }
     679              :         }
     680              : 
     681            1 :         dataDir, err = opts.FS.OpenDir(dirname)
     682            1 :         if err != nil {
     683            0 :                 if opts.ReadOnly && oserror.IsNotExist(err) {
     684            0 :                         return "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
     685            0 :                 }
     686            0 :                 return "", nil, err
     687              :         }
     688            1 :         if opts.ReadOnly && walDirname != dirname {
     689            0 :                 // Check that the wal dir exists.
     690            0 :                 walDir, err := opts.FS.OpenDir(walDirname)
     691            0 :                 if err != nil {
     692            0 :                         dataDir.Close()
     693            0 :                         return "", nil, err
     694            0 :                 }
     695            0 :                 walDir.Close()
     696              :         }
     697              : 
     698            1 :         return walDirname, dataDir, nil
     699              : }
     700              : 
     701              : // GetVersion returns the engine version string from the latest options
     702              : // file present in dir. Used to check what Pebble or RocksDB version was last
     703              : // used to write to the database stored in this directory. An empty string is
     704              : // returned if no valid OPTIONS file with a version key was found.
     705            0 : func GetVersion(dir string, fs vfs.FS) (string, error) {
     706            0 :         ls, err := fs.List(dir)
     707            0 :         if err != nil {
     708            0 :                 return "", err
     709            0 :         }
     710            0 :         var version string
     711            0 :         lastOptionsSeen := base.DiskFileNum(0)
     712            0 :         for _, filename := range ls {
     713            0 :                 ft, fn, ok := base.ParseFilename(fs, filename)
     714            0 :                 if !ok {
     715            0 :                         continue
     716              :                 }
     717            0 :                 switch ft {
     718            0 :                 case base.FileTypeOptions:
     719            0 :                         // If this file has a higher number than the last options file
     720            0 :                         // processed, reset version. This is because rocksdb often
     721            0 :                         // writes multiple options files without deleting previous ones.
     722            0 :                         // Otherwise, skip parsing this options file.
     723            0 :                         if fn > lastOptionsSeen {
     724            0 :                                 version = ""
     725            0 :                                 lastOptionsSeen = fn
     726            0 :                         } else {
     727            0 :                                 continue
     728              :                         }
     729            0 :                         f, err := fs.Open(fs.PathJoin(dir, filename))
     730            0 :                         if err != nil {
     731            0 :                                 return "", err
     732            0 :                         }
     733            0 :                         data, err := io.ReadAll(f)
     734            0 :                         f.Close()
     735            0 : 
     736            0 :                         if err != nil {
     737            0 :                                 return "", err
     738            0 :                         }
     739            0 :                         err = parseOptions(string(data), parseOptionsFuncs{
     740            0 :                                 visitKeyValue: func(i, j int, section, key, value string) error {
     741            0 :                                         switch {
     742            0 :                                         case section == "Version":
     743            0 :                                                 switch key {
     744            0 :                                                 case "pebble_version":
     745            0 :                                                         version = value
     746            0 :                                                 case "rocksdb_version":
     747            0 :                                                         version = fmt.Sprintf("rocksdb v%s", value)
     748              :                                                 }
     749              :                                         }
     750            0 :                                         return nil
     751              :                                 },
     752              :                         })
     753            0 :                         if err != nil {
     754            0 :                                 return "", err
     755            0 :                         }
     756              :                 }
     757              :         }
     758            0 :         return version, nil
     759              : }
     760              : 
     761              : func (d *DB) replayIngestedFlushable(
     762              :         b *Batch, logNum base.DiskFileNum,
     763            1 : ) (entry *flushableEntry, err error) {
     764            1 :         br := b.Reader()
     765            1 :         seqNum := b.SeqNum()
     766            1 : 
     767            1 :         fileNums := make([]base.DiskFileNum, 0, b.Count())
     768            1 :         var exciseSpan KeyRange
     769            1 :         addFileNum := func(encodedFileNum []byte) {
     770            1 :                 fileNum, n := binary.Uvarint(encodedFileNum)
     771            1 :                 if n <= 0 {
     772            0 :                         panic("pebble: ingest sstable file num is invalid")
     773              :                 }
     774            1 :                 fileNums = append(fileNums, base.DiskFileNum(fileNum))
     775              :         }
     776              : 
     777            1 :         for i := 0; i < int(b.Count()); i++ {
     778            1 :                 kind, key, val, ok, err := br.Next()
     779            1 :                 if err != nil {
     780            0 :                         return nil, err
     781            0 :                 }
     782            1 :                 if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
     783            0 :                         panic("pebble: invalid batch key kind")
     784              :                 }
     785            1 :                 if !ok {
     786            0 :                         panic("pebble: invalid batch count")
     787              :                 }
     788            1 :                 if kind == base.InternalKeyKindExcise {
     789            1 :                         if exciseSpan.Valid() {
     790            0 :                                 panic("pebble: multiple excise spans in a single batch")
     791              :                         }
     792            1 :                         exciseSpan.Start = slices.Clone(key)
     793            1 :                         exciseSpan.End = slices.Clone(val)
     794            1 :                         continue
     795              :                 }
     796            1 :                 addFileNum(key)
     797              :         }
     798              : 
     799            1 :         if _, _, _, ok, err := br.Next(); err != nil {
     800            0 :                 return nil, err
     801            1 :         } else if ok {
     802            0 :                 panic("pebble: invalid number of entries in batch")
     803              :         }
     804              : 
     805            1 :         meta := make([]*manifest.TableMetadata, len(fileNums))
     806            1 :         var lastRangeKey keyspan.Span
     807            1 :         for i, n := range fileNums {
     808            1 :                 readable, err := d.objProvider.OpenForReading(context.TODO(), base.FileTypeTable, n,
     809            1 :                         objstorage.OpenOptions{MustExist: true})
     810            1 :                 if err != nil {
     811            0 :                         return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
     812            0 :                 }
     813              :                 // NB: ingestLoad1 will close readable.
     814            1 :                 meta[i], lastRangeKey, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
     815            1 :                         readable, d.cacheHandle, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
     816            1 :                 if err != nil {
     817            0 :                         return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
     818            0 :                 }
     819              :         }
     820            1 :         if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
     821            0 :                 return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
     822            0 :                         d.opts.Comparer.FormatKey(lastRangeKey.End))
     823            0 :         }
     824              : 
     825            1 :         numFiles := len(meta)
     826            1 :         if exciseSpan.Valid() {
     827            1 :                 numFiles++
     828            1 :         }
     829            1 :         if uint32(numFiles) != b.Count() {
     830            0 :                 panic("pebble: couldn't load all files in WAL entry")
     831              :         }
     832              : 
     833            1 :         return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
     834              : }
     835              : 
     836              : // replayWAL replays the edits in the specified WAL. If the DB is in read
     837              : // only mode, then the WALs are replayed into memtables and not flushed. If
     838              : // the DB is not in read only mode, then the contents of the WAL are
     839              : // guaranteed to be flushed when a flush is scheduled after this method is run.
     840              : // Note that this flushing is very important for guaranteeing durability:
     841              : // the application may have had a number of pending
     842              : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
     843              : // happened but the corresponding data may now be readable from the WAL (while
     844              : // sitting in write-back caches in the kernel or the storage device). By
     845              : // reading the WAL (including the non-fsynced data) and then flushing all
     846              : // these changes (flush does fsyncs), we are able to guarantee that the
     847              : // initial state of the DB is durable.
     848              : //
     849              : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
     850              : // WALs into the flushable queue. Flushing of the queue is expected to be handled
     851              : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
     852              : //
     853              : // d.mu must be held when calling this, but the mutex may be dropped and
     854              : // re-acquired during the course of this method.
     855              : func (d *DB) replayWAL(
     856              :         jobID JobID, ll wal.LogicalLog, strictWALTail bool,
     857            1 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
     858            1 :         rr := ll.OpenForRead()
     859            1 :         defer func() { _ = rr.Close() }()
     860            1 :         var (
     861            1 :                 b               Batch
     862            1 :                 buf             bytes.Buffer
     863            1 :                 mem             *memTable
     864            1 :                 entry           *flushableEntry
     865            1 :                 offset          wal.Offset
     866            1 :                 lastFlushOffset int64
     867            1 :                 keysReplayed    int64 // number of keys replayed
     868            1 :                 batchesReplayed int64 // number of batches replayed
     869            1 :         )
     870            1 : 
     871            1 :         // TODO(jackson): This function is interspersed with panics, in addition to
     872            1 :         // corruption error propagation. Audit them to ensure we're truly only
     873            1 :         // panicking where the error points to Pebble bug and not user or
     874            1 :         // hardware-induced corruption.
     875            1 : 
     876            1 :         // "Flushes" (ie. closes off) the current memtable, if not nil.
     877            1 :         flushMem := func() {
     878            1 :                 if mem == nil {
     879            1 :                         return
     880            1 :                 }
     881            1 :                 mem.writerUnref()
     882            1 :                 if d.mu.mem.mutable == mem {
     883            1 :                         d.mu.mem.mutable = nil
     884            1 :                 }
     885            1 :                 entry.flushForced = !d.opts.ReadOnly
     886            1 :                 var logSize uint64
     887            1 :                 mergedOffset := offset.Physical + offset.PreviousFilesBytes
     888            1 :                 if mergedOffset >= lastFlushOffset {
     889            1 :                         logSize = uint64(mergedOffset - lastFlushOffset)
     890            1 :                 }
     891              :                 // Else, this was the initial memtable in the read-only case which must have
     892              :                 // been empty, but we need to flush it since we don't want to add to it later.
     893            1 :                 lastFlushOffset = mergedOffset
     894            1 :                 entry.logSize = logSize
     895            1 :                 mem, entry = nil, nil
     896              :         }
     897              : 
     898            1 :         mem = d.mu.mem.mutable
     899            1 :         if mem != nil {
     900            1 :                 entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
     901            1 :                 if !d.opts.ReadOnly {
     902            1 :                         flushMem()
     903            1 :                 }
     904              :         }
     905              : 
     906              :         // Creates a new memtable if there is no current memtable.
     907            1 :         ensureMem := func(seqNum base.SeqNum) {
     908            1 :                 if mem != nil {
     909            1 :                         return
     910            1 :                 }
     911            1 :                 mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
     912            1 :                 d.mu.mem.mutable = mem
     913            1 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     914              :         }
     915              : 
     916            1 :         defer func() {
     917            1 :                 if err != nil {
     918            0 :                         err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
     919            0 :                 }
     920              :         }()
     921              : 
     922            1 :         for {
     923            1 :                 var r io.Reader
     924            1 :                 var err error
     925            1 :                 r, offset, err = rr.NextRecord()
     926            1 :                 if err == nil {
     927            1 :                         _, err = io.Copy(&buf, r)
     928            1 :                 }
     929            1 :                 if err != nil {
     930            1 :                         // It is common to encounter a zeroed or invalid chunk due to WAL
     931            1 :                         // preallocation and WAL recycling. However zeroed or invalid chunks
     932            1 :                         // can also be a consequence of corruption / disk rot. When the log
     933            1 :                         // reader encounters one of these cases, it attempts to disambiguate
     934            1 :                         // by reading ahead looking for a future record. If a future chunk
     935            1 :                         // indicates the chunk at the original offset should've been valid, it
     936            1 :                         // surfaces record.ErrInvalidChunk or record.ErrZeroedChunk. These
     937            1 :                         // errors are always indicative of corruption and data loss.
     938            1 :                         //
     939            1 :                         // Otherwise, the reader surfaces record.ErrUnexpectedEOF indicating
     940            1 :                         // that the WAL terminated uncleanly and ambiguously. If the WAL is
     941            1 :                         // the most recent logical WAL, the caller passes in
     942            1 :                         // (strictWALTail=false), indicating we should tolerate the unclean
     943            1 :                         // ending. If the WAL is an older WAL, the caller passes in
     944            1 :                         // (strictWALTail=true), indicating that the WAL should have been
     945            1 :                         // closed cleanly, and we should interpret the
     946            1 :                         // `record.ErrUnexpectedEOF` as corruption and stop recovery.
     947            1 :                         if errors.Is(err, io.EOF) {
     948            1 :                                 break
     949            0 :                         } else if errors.Is(err, record.ErrUnexpectedEOF) && !strictWALTail {
     950            0 :                                 break
     951            0 :                         } else if (errors.Is(err, record.ErrUnexpectedEOF) && strictWALTail) ||
     952            0 :                                 errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) {
     953            0 :                                 // If a read-ahead returns record.ErrInvalidChunk or
     954            0 :                                 // record.ErrZeroedChunk, then there's definitively corruption.
     955            0 :                                 //
     956            0 :                                 // If strictWALTail=true, then record.ErrUnexpectedEOF should
     957            0 :                                 // also be considered corruption because the strictWALTail
     958            0 :                                 // indicates we expect a clean end to the WAL.
     959            0 :                                 //
     960            0 :                                 // Other I/O related errors should not be marked with corruption
     961            0 :                                 // and simply returned.
     962            0 :                                 err = errors.Mark(err, ErrCorruption)
     963            0 :                         }
     964              : 
     965            0 :                         return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
     966              :                 }
     967              : 
     968            1 :                 if buf.Len() < batchrepr.HeaderLen {
     969            0 :                         return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
     970            0 :                                 errors.Safe(base.DiskFileNum(ll.Num)), offset)
     971            0 :                 }
     972              : 
     973            1 :                 if d.opts.ErrorIfNotPristine {
     974            0 :                         return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
     975            0 :                 }
     976              : 
     977              :                 // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
     978              :                 // which is used below.
     979            1 :                 b = Batch{}
     980            1 :                 b.db = d
     981            1 :                 if err := b.SetRepr(buf.Bytes()); err != nil {
     982            0 :                         return nil, 0, err
     983            0 :                 }
     984            1 :                 seqNum := b.SeqNum()
     985            1 :                 maxSeqNum = seqNum + base.SeqNum(b.Count())
     986            1 :                 keysReplayed += int64(b.Count())
     987            1 :                 batchesReplayed++
     988            1 :                 {
     989            1 :                         br := b.Reader()
     990            1 :                         if kind, _, _, ok, err := br.Next(); err != nil {
     991            0 :                                 return nil, 0, err
     992            1 :                         } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
     993            1 :                                 // We're in the flushable ingests (+ possibly excises) case.
     994            1 :                                 //
     995            1 :                                 // Ingests require an up-to-date view of the LSM to determine the target
     996            1 :                                 // level of ingested sstables, and to accurately compute excises. Instead of
     997            1 :                                 // doing an ingest in this function, we just enqueue a flushable ingest
     998            1 :                                 // in the flushables queue and run a regular flush.
     999            1 :                                 flushMem()
    1000            1 :                                 // mem is nil here.
    1001            1 :                                 entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
    1002            1 :                                 if err != nil {
    1003            0 :                                         return nil, 0, err
    1004            0 :                                 }
    1005            1 :                                 fi := entry.flushable.(*ingestedFlushable)
    1006            1 :                                 flushableIngests = append(flushableIngests, fi)
    1007            1 :                                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1008            1 :                                 // A flushable ingest is always followed by a WAL rotation.
    1009            1 :                                 break
    1010              :                         }
    1011              :                 }
    1012              : 
    1013            1 :                 if b.memTableSize >= uint64(d.largeBatchThreshold) {
    1014            0 :                         flushMem()
    1015            0 :                         // Make a copy of the data slice since it is currently owned by buf and will
    1016            0 :                         // be reused in the next iteration.
    1017            0 :                         b.data = slices.Clone(b.data)
    1018            0 :                         b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
    1019            0 :                         if err != nil {
    1020            0 :                                 return nil, 0, err
    1021            0 :                         }
    1022            0 :                         entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
    1023            0 :                         // Disable memory accounting by adding a reader ref that will never be
    1024            0 :                         // removed.
    1025            0 :                         entry.readerRefs.Add(1)
    1026            0 :                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1027            1 :                 } else {
    1028            1 :                         ensureMem(seqNum)
    1029            1 :                         if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
    1030            0 :                                 return nil, 0, err
    1031            0 :                         }
    1032              :                         // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
    1033              :                         // batch may not initially fit, but will eventually fit (since it is smaller than
    1034              :                         // largeBatchThreshold).
    1035            1 :                         for err == arenaskl.ErrArenaFull {
    1036            1 :                                 flushMem()
    1037            1 :                                 ensureMem(seqNum)
    1038            1 :                                 err = mem.prepare(&b)
    1039            1 :                                 if err != nil && err != arenaskl.ErrArenaFull {
    1040            0 :                                         return nil, 0, err
    1041            0 :                                 }
    1042              :                         }
    1043            1 :                         if err = mem.apply(&b, seqNum); err != nil {
    1044            0 :                                 return nil, 0, err
    1045            0 :                         }
    1046            1 :                         mem.writerUnref()
    1047              :                 }
    1048            1 :                 buf.Reset()
    1049              :         }
    1050              : 
    1051            1 :         d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
    1052            1 :                 jobID, ll.String(), offset, keysReplayed, batchesReplayed)
    1053            1 :         if !d.opts.ReadOnly {
    1054            1 :                 flushMem()
    1055            1 :         }
    1056              : 
    1057              :         // mem is nil here, if !ReadOnly.
    1058            1 :         return flushableIngests, maxSeqNum, err
    1059              : }
    1060              : 
    1061            1 : func readOptionsFile(opts *Options, path string) (string, error) {
    1062            1 :         f, err := opts.FS.Open(path)
    1063            1 :         if err != nil {
    1064            0 :                 return "", err
    1065            0 :         }
    1066            1 :         defer f.Close()
    1067            1 : 
    1068            1 :         data, err := io.ReadAll(f)
    1069            1 :         if err != nil {
    1070            0 :                 return "", err
    1071            0 :         }
    1072            1 :         return string(data), nil
    1073              : }
    1074              : 
    1075              : // DBDesc briefly describes high-level state about a database.
    1076              : type DBDesc struct {
    1077              :         // Exists is true if an existing database was found.
    1078              :         Exists bool
    1079              :         // FormatMajorVersion indicates the database's current format
    1080              :         // version.
    1081              :         FormatMajorVersion FormatMajorVersion
    1082              :         // ManifestFilename is the filename of the current active manifest,
    1083              :         // if the database exists.
    1084              :         ManifestFilename string
    1085              :         // OptionsFilename is the filename of the most recent OPTIONS file, if it
    1086              :         // exists.
    1087              :         OptionsFilename string
    1088              : }
    1089              : 
    1090              : // String implements fmt.Stringer.
    1091            0 : func (d *DBDesc) String() string {
    1092            0 :         if !d.Exists {
    1093            0 :                 return "uninitialized"
    1094            0 :         }
    1095            0 :         var buf bytes.Buffer
    1096            0 :         fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
    1097            0 :         fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
    1098            0 :         fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
    1099            0 :         return buf.String()
    1100              : }
    1101              : 
    1102              : // Peek looks for an existing database in dirname on the provided FS. It
    1103              : // returns a brief description of the database. Peek is read-only and
    1104              : // does not open the database
    1105            0 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
    1106            0 :         ls, err := fs.List(dirname)
    1107            0 :         if err != nil {
    1108            0 :                 return nil, err
    1109            0 :         }
    1110              : 
    1111            0 :         vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
    1112            0 :         if err != nil {
    1113            0 :                 return nil, err
    1114            0 :         }
    1115              :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1116              :         // PeekMarker variant that avoids opening the directory.
    1117            0 :         if err := versMarker.Close(); err != nil {
    1118            0 :                 return nil, err
    1119            0 :         }
    1120              : 
    1121              :         // Find the currently active manifest, if there is one.
    1122            0 :         manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
    1123            0 :         if err != nil {
    1124            0 :                 return nil, err
    1125            0 :         }
    1126              :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1127              :         // PeekMarker variant that avoids opening the directory.
    1128            0 :         if err := manifestMarker.Close(); err != nil {
    1129            0 :                 return nil, err
    1130            0 :         }
    1131              : 
    1132            0 :         desc := &DBDesc{
    1133            0 :                 Exists:             exists,
    1134            0 :                 FormatMajorVersion: vers,
    1135            0 :         }
    1136            0 : 
    1137            0 :         // Find the OPTIONS file with the highest file number within the list of
    1138            0 :         // directory entries.
    1139            0 :         var previousOptionsFileNum base.DiskFileNum
    1140            0 :         for _, filename := range ls {
    1141            0 :                 ft, fn, ok := base.ParseFilename(fs, filename)
    1142            0 :                 if !ok || ft != base.FileTypeOptions || fn < previousOptionsFileNum {
    1143            0 :                         continue
    1144              :                 }
    1145            0 :                 previousOptionsFileNum = fn
    1146            0 :                 desc.OptionsFilename = fs.PathJoin(dirname, filename)
    1147              :         }
    1148              : 
    1149            0 :         if exists {
    1150            0 :                 desc.ManifestFilename = base.MakeFilepath(fs, dirname, base.FileTypeManifest, manifestFileNum)
    1151            0 :         }
    1152            0 :         return desc, nil
    1153              : }
    1154              : 
    1155              : // LockDirectory acquires the database directory lock in the named directory,
    1156              : // preventing another process from opening the database. LockDirectory returns a
    1157              : // handle to the held lock that may be passed to Open through Options.Lock to
    1158              : // subsequently open the database, skipping lock acquistion during Open.
    1159              : //
    1160              : // LockDirectory may be used to expand the critical section protected by the
    1161              : // database lock to include setup before the call to Open.
    1162            1 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
    1163            1 :         fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, base.FileTypeLock, base.DiskFileNum(0)))
    1164            1 :         if err != nil {
    1165            0 :                 return nil, err
    1166            0 :         }
    1167            1 :         l := &Lock{dirname: dirname, fileLock: fileLock}
    1168            1 :         l.refs.Store(1)
    1169            1 :         invariants.SetFinalizer(l, func(obj interface{}) {
    1170            1 :                 if refs := obj.(*Lock).refs.Load(); refs > 0 {
    1171            0 :                         panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
    1172              :                 }
    1173              :         })
    1174            1 :         return l, nil
    1175              : }
    1176              : 
    1177              : // Lock represents a file lock on a directory. It may be passed to Open through
    1178              : // Options.Lock to elide lock aquisition during Open.
    1179              : type Lock struct {
    1180              :         dirname  string
    1181              :         fileLock io.Closer
    1182              :         // refs is a count of the number of handles on the lock. refs must be 0, 1
    1183              :         // or 2.
    1184              :         //
    1185              :         // When acquired by the client and passed to Open, refs = 1 and the Open
    1186              :         // call increments it to 2. When the database is closed, it's decremented to
    1187              :         // 1. Finally when the original caller, calls Close on the Lock, it's
    1188              :         // drecemented to zero and the underlying file lock is released.
    1189              :         //
    1190              :         // When Open acquires the file lock, refs remains at 1 until the database is
    1191              :         // closed.
    1192              :         refs atomic.Int32
    1193              : }
    1194              : 
    1195            0 : func (l *Lock) refForOpen() error {
    1196            0 :         // During Open, when a user passed in a lock, the reference count must be
    1197            0 :         // exactly 1. If it's zero, the lock is no longer held and is invalid. If
    1198            0 :         // it's 2, the lock is already in use by another database within the
    1199            0 :         // process.
    1200            0 :         if !l.refs.CompareAndSwap(1, 2) {
    1201            0 :                 return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
    1202            0 :         }
    1203            0 :         return nil
    1204              : }
    1205              : 
    1206              : // Close releases the lock, permitting another process to lock and open the
    1207              : // database. Close must not be called until after a database using the Lock has
    1208              : // been closed.
    1209            1 : func (l *Lock) Close() error {
    1210            1 :         if l.refs.Add(-1) > 0 {
    1211            0 :                 return nil
    1212            0 :         }
    1213            1 :         defer func() { l.fileLock = nil }()
    1214            1 :         return l.fileLock.Close()
    1215              : }
    1216              : 
    1217            0 : func (l *Lock) pathMatches(dirname string) error {
    1218            0 :         if dirname == l.dirname {
    1219            0 :                 return nil
    1220            0 :         }
    1221              :         // Check for relative paths, symlinks, etc. This isn't ideal because we're
    1222              :         // circumventing the vfs.FS interface here.
    1223              :         //
    1224              :         // TODO(jackson): We could add support for retrieving file inodes through Stat
    1225              :         // calls in the VFS interface on platforms where it's available and use that
    1226              :         // to differentiate.
    1227            0 :         dirStat, err1 := os.Stat(dirname)
    1228            0 :         lockDirStat, err2 := os.Stat(l.dirname)
    1229            0 :         if err1 == nil && err2 == nil && os.SameFile(dirStat, lockDirStat) {
    1230            0 :                 return nil
    1231            0 :         }
    1232            0 :         return errors.Join(
    1233            0 :                 errors.Newf("pebble: opts.Lock acquired in %q not %q", l.dirname, dirname),
    1234            0 :                 err1, err2)
    1235              : }
    1236              : 
    1237              : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
    1238              : // does not exist.
    1239              : //
    1240              : // Note that errors can be wrapped with more details; use errors.Is().
    1241              : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
    1242              : 
    1243              : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
    1244              : // already exists.
    1245              : //
    1246              : // Note that errors can be wrapped with more details; use errors.Is().
    1247              : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
    1248              : 
    1249              : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
    1250              : // already exists and is not pristine.
    1251              : //
    1252              : // Note that errors can be wrapped with more details; use errors.Is().
    1253              : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
    1254              : 
    1255            1 : func checkConsistency(v *manifest.Version, objProvider objstorage.Provider) error {
    1256            1 :         var errs []error
    1257            1 :         dedup := make(map[base.DiskFileNum]struct{})
    1258            1 :         for level, files := range v.Levels {
    1259            1 :                 for f := range files.All() {
    1260            1 :                         backingState := f.TableBacking
    1261            1 :                         if _, ok := dedup[backingState.DiskFileNum]; ok {
    1262            1 :                                 continue
    1263              :                         }
    1264            1 :                         dedup[backingState.DiskFileNum] = struct{}{}
    1265            1 :                         fileNum := backingState.DiskFileNum
    1266            1 :                         fileSize := backingState.Size
    1267            1 :                         // We skip over remote objects; those are instead checked asynchronously
    1268            1 :                         // by the table stats loading job.
    1269            1 :                         meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
    1270            1 :                         var size int64
    1271            1 :                         if err == nil {
    1272            1 :                                 if meta.IsRemote() {
    1273            1 :                                         continue
    1274              :                                 }
    1275            1 :                                 size, err = objProvider.Size(meta)
    1276              :                         }
    1277            1 :                         if err != nil {
    1278            0 :                                 errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
    1279            0 :                                 continue
    1280              :                         }
    1281              : 
    1282            1 :                         if size != int64(fileSize) {
    1283            0 :                                 errs = append(errs, errors.Errorf(
    1284            0 :                                         "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
    1285            0 :                                         errors.Safe(level), fileNum, objProvider.Path(meta),
    1286            0 :                                         errors.Safe(size), errors.Safe(fileSize)))
    1287            0 :                                 continue
    1288              :                         }
    1289              :                 }
    1290              :         }
    1291            1 :         return errors.Join(errs...)
    1292              : }
    1293              : 
    1294              : type walEventListenerAdaptor struct {
    1295              :         l *EventListener
    1296              : }
    1297              : 
    1298            1 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
    1299            1 :         // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
    1300            1 :         // is insufficient to infer whether primary or secondary.
    1301            1 :         wci := WALCreateInfo{
    1302            1 :                 JobID:           ci.JobID,
    1303            1 :                 Path:            ci.Path,
    1304            1 :                 FileNum:         base.DiskFileNum(ci.Num),
    1305            1 :                 RecycledFileNum: ci.RecycledFileNum,
    1306            1 :                 Err:             ci.Err,
    1307            1 :         }
    1308            1 :         l.l.WALCreated(wci)
    1309            1 : }
        

Generated by: LCOV version 2.0-1