LCOV - code coverage report
Current view: top level - pebble - open.go (source / functions) Coverage Total Hit
Test: 2025-05-07 08:18Z 0f3d2b5c - tests only.lcov Lines: 88.1 % 931 820
Test Date: 2025-05-07 08:19:13 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "context"
      10              :         "encoding/binary"
      11              :         "fmt"
      12              :         "io"
      13              :         "math"
      14              :         "os"
      15              :         "slices"
      16              :         "sync/atomic"
      17              :         "time"
      18              : 
      19              :         "github.com/cockroachdb/crlib/crtime"
      20              :         "github.com/cockroachdb/errors"
      21              :         "github.com/cockroachdb/errors/oserror"
      22              :         "github.com/cockroachdb/pebble/batchrepr"
      23              :         "github.com/cockroachdb/pebble/internal/arenaskl"
      24              :         "github.com/cockroachdb/pebble/internal/base"
      25              :         "github.com/cockroachdb/pebble/internal/cache"
      26              :         "github.com/cockroachdb/pebble/internal/constants"
      27              :         "github.com/cockroachdb/pebble/internal/invariants"
      28              :         "github.com/cockroachdb/pebble/internal/keyspan"
      29              :         "github.com/cockroachdb/pebble/internal/manifest"
      30              :         "github.com/cockroachdb/pebble/internal/manual"
      31              :         "github.com/cockroachdb/pebble/objstorage"
      32              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      33              :         "github.com/cockroachdb/pebble/objstorage/remote"
      34              :         "github.com/cockroachdb/pebble/record"
      35              :         "github.com/cockroachdb/pebble/vfs"
      36              :         "github.com/cockroachdb/pebble/wal"
      37              :         "github.com/prometheus/client_golang/prometheus"
      38              : )
      39              : 
      40              : const (
      41              :         initialMemTableSize = 256 << 10 // 256 KB
      42              : 
      43              :         // The max batch size is limited by the uint32 offsets stored in
      44              :         // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
      45              :         //
      46              :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      47              :         // end of an allocation fits in uint32.
      48              :         //
      49              :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      50              :         // 2GB).
      51              :         maxBatchSize = constants.MaxUint32OrInt
      52              : 
      53              :         // The max memtable size is limited by the uint32 offsets stored in
      54              :         // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
      55              :         //
      56              :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      57              :         // end of an allocation fits in uint32.
      58              :         //
      59              :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      60              :         // 2GB).
      61              :         maxMemTableSize = constants.MaxUint32OrInt
      62              : )
      63              : 
      64              : // FileCacheSize can be used to determine the file
      65              : // cache size for a single db, given the maximum open
      66              : // files which can be used by a file cache which is
      67              : // only used by a single db.
      68            1 : func FileCacheSize(maxOpenFiles int) int {
      69            1 :         fileCacheSize := maxOpenFiles - numNonFileCacheFiles
      70            1 :         if fileCacheSize < minFileCacheSize {
      71            0 :                 fileCacheSize = minFileCacheSize
      72            0 :         }
      73            1 :         return fileCacheSize
      74              : }
      75              : 
      76              : // Open opens a DB whose files live in the given directory.
      77              : //
      78              : // IsCorruptionError() can be use to determine if the error is caused by on-disk
      79              : // corruption.
      80            1 : func Open(dirname string, opts *Options) (db *DB, err error) {
      81            1 :         // Make a copy of the options so that we don't mutate the passed in options.
      82            1 :         opts = opts.Clone()
      83            1 :         opts.EnsureDefaults()
      84            1 :         if opts.Experimental.CompactionScheduler == nil {
      85            1 :                 opts.Experimental.CompactionScheduler = newConcurrencyLimitScheduler(defaultTimeSource{})
      86            1 :         }
      87            1 :         if err := opts.Validate(); err != nil {
      88            0 :                 return nil, err
      89            0 :         }
      90            1 :         if opts.LoggerAndTracer == nil {
      91            1 :                 opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
      92            1 :         } else {
      93            1 :                 opts.Logger = opts.LoggerAndTracer
      94            1 :         }
      95              : 
      96            1 :         if invariants.Sometimes(5) {
      97            1 :                 assertComparer := base.MakeAssertComparer(*opts.Comparer)
      98            1 :                 opts.Comparer = &assertComparer
      99            1 :         }
     100              : 
     101              :         // In all error cases, we return db = nil; this is used by various
     102              :         // deferred cleanups.
     103              : 
     104              :         // Open the database and WAL directories first.
     105            1 :         walDirname, dataDir, err := prepareAndOpenDirs(dirname, opts)
     106            1 :         if err != nil {
     107            1 :                 return nil, errors.Wrapf(err, "error opening database at %q", dirname)
     108            1 :         }
     109            1 :         defer func() {
     110            1 :                 if db == nil {
     111            1 :                         dataDir.Close()
     112            1 :                 }
     113              :         }()
     114              : 
     115              :         // Lock the database directory.
     116            1 :         var fileLock *Lock
     117            1 :         if opts.Lock != nil {
     118            1 :                 // The caller already acquired the database lock. Ensure that the
     119            1 :                 // directory matches.
     120            1 :                 if err := opts.Lock.pathMatches(dirname); err != nil {
     121            0 :                         return nil, err
     122            0 :                 }
     123            1 :                 if err := opts.Lock.refForOpen(); err != nil {
     124            1 :                         return nil, err
     125            1 :                 }
     126            1 :                 fileLock = opts.Lock
     127            1 :         } else {
     128            1 :                 fileLock, err = LockDirectory(dirname, opts.FS)
     129            1 :                 if err != nil {
     130            1 :                         return nil, err
     131            1 :                 }
     132              :         }
     133            1 :         defer func() {
     134            1 :                 if db == nil {
     135            1 :                         _ = fileLock.Close()
     136            1 :                 }
     137              :         }()
     138              : 
     139              :         // List the directory contents. This also happens to include WAL log files, if
     140              :         // they are in the same dir, but we will ignore those below. The provider is
     141              :         // also given this list, but it ignores non sstable files.
     142            1 :         ls, err := opts.FS.List(dirname)
     143            1 :         if err != nil {
     144            1 :                 return nil, err
     145            1 :         }
     146              : 
     147              :         // Establish the format major version.
     148            1 :         formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
     149            1 :         if err != nil {
     150            1 :                 return nil, err
     151            1 :         }
     152            1 :         defer func() {
     153            1 :                 if db == nil {
     154            1 :                         _ = formatVersionMarker.Close()
     155            1 :                 }
     156              :         }()
     157              : 
     158            1 :         noFormatVersionMarker := formatVersion == FormatDefault
     159            1 :         if noFormatVersionMarker {
     160            1 :                 // We will initialize the store at the minimum possible format, then upgrade
     161            1 :                 // the format to the desired one. This helps test the format upgrade code.
     162            1 :                 formatVersion = FormatMinSupported
     163            1 :                 if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
     164            1 :                         formatVersion = FormatMinForSharedObjects
     165            1 :                 }
     166              :                 // There is no format version marker file. There are three cases:
     167              :                 //  - we are trying to open an existing store that was created at
     168              :                 //    FormatMostCompatible (the only one without a version marker file)
     169              :                 //  - we are creating a new store;
     170              :                 //  - we are retrying a failed creation.
     171              :                 //
     172              :                 // To error in the first case, we set ErrorIfNotPristine.
     173            1 :                 opts.ErrorIfNotPristine = true
     174            1 :                 defer func() {
     175            1 :                         if err != nil && errors.Is(err, ErrDBNotPristine) {
     176            0 :                                 // We must be trying to open an existing store at FormatMostCompatible.
     177            0 :                                 // Correct the error in this case -we
     178            0 :                                 err = errors.Newf(
     179            0 :                                         "pebble: database %q written in format major version 1 which is no longer supported",
     180            0 :                                         dirname)
     181            0 :                         }
     182              :                 }()
     183            1 :         } else {
     184            1 :                 if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone && formatVersion < FormatMinForSharedObjects {
     185            0 :                         return nil, errors.Newf(
     186            0 :                                 "pebble: database %q configured with shared objects but written in too old format major version %d",
     187            0 :                                 dirname, formatVersion)
     188            0 :                 }
     189              :         }
     190              : 
     191              :         // Find the currently active manifest, if there is one.
     192            1 :         manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
     193            1 :         if err != nil {
     194            1 :                 return nil, errors.Wrapf(err, "pebble: database %q", dirname)
     195            1 :         }
     196            1 :         defer func() {
     197            1 :                 if db == nil {
     198            1 :                         _ = manifestMarker.Close()
     199            1 :                 }
     200              :         }()
     201              : 
     202              :         // Atomic markers may leave behind obsolete files if there's a crash
     203              :         // mid-update. Clean these up if we're not in read-only mode.
     204            1 :         if !opts.ReadOnly {
     205            1 :                 if err := formatVersionMarker.RemoveObsolete(); err != nil {
     206            0 :                         return nil, err
     207            0 :                 }
     208            1 :                 if err := manifestMarker.RemoveObsolete(); err != nil {
     209            0 :                         return nil, err
     210            0 :                 }
     211              :         }
     212              : 
     213            1 :         if opts.Cache == nil {
     214            1 :                 opts.Cache = cache.New(opts.CacheSize)
     215            1 :                 defer opts.Cache.Unref()
     216            1 :         }
     217              : 
     218            1 :         d := &DB{
     219            1 :                 cacheHandle:         opts.Cache.NewHandle(),
     220            1 :                 dirname:             dirname,
     221            1 :                 opts:                opts,
     222            1 :                 cmp:                 opts.Comparer.Compare,
     223            1 :                 equal:               opts.Comparer.Equal,
     224            1 :                 merge:               opts.Merger.Merge,
     225            1 :                 split:               opts.Comparer.Split,
     226            1 :                 abbreviatedKey:      opts.Comparer.AbbreviatedKey,
     227            1 :                 largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
     228            1 :                 fileLock:            fileLock,
     229            1 :                 dataDir:             dataDir,
     230            1 :                 closed:              new(atomic.Value),
     231            1 :                 closedCh:            make(chan struct{}),
     232            1 :         }
     233            1 :         d.mu.versions = &versionSet{}
     234            1 :         d.diskAvailBytes.Store(math.MaxUint64)
     235            1 :         d.problemSpans.Init(manifest.NumLevels, opts.Comparer.Compare)
     236            1 : 
     237            1 :         defer func() {
     238            1 :                 // If an error or panic occurs during open, attempt to release the manually
     239            1 :                 // allocated memory resources. Note that rather than look for an error, we
     240            1 :                 // look for the return of a nil DB pointer.
     241            1 :                 if r := recover(); db == nil {
     242            1 :                         // If there's an unused, recycled memtable, we need to release its memory.
     243            1 :                         if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
     244            1 :                                 d.freeMemTable(obsoleteMemTable)
     245            1 :                         }
     246              : 
     247            1 :                         if d.fileCache != nil {
     248            1 :                                 _ = d.fileCache.Close()
     249            1 :                         }
     250            1 :                         d.cacheHandle.Close()
     251            1 : 
     252            1 :                         for _, mem := range d.mu.mem.queue {
     253            1 :                                 switch t := mem.flushable.(type) {
     254            1 :                                 case *memTable:
     255            1 :                                         manual.Free(manual.MemTable, t.arenaBuf)
     256            1 :                                         t.arenaBuf = manual.Buf{}
     257              :                                 }
     258              :                         }
     259            1 :                         if d.cleanupManager != nil {
     260            1 :                                 d.cleanupManager.Close()
     261            1 :                         }
     262            1 :                         if d.objProvider != nil {
     263            1 :                                 _ = d.objProvider.Close()
     264            1 :                         }
     265            1 :                         if r != nil {
     266            1 :                                 panic(r)
     267              :                         }
     268              :                 }
     269              :         }()
     270              : 
     271            1 :         d.commit = newCommitPipeline(commitEnv{
     272            1 :                 logSeqNum:     &d.mu.versions.logSeqNum,
     273            1 :                 visibleSeqNum: &d.mu.versions.visibleSeqNum,
     274            1 :                 apply:         d.commitApply,
     275            1 :                 write:         d.commitWrite,
     276            1 :         })
     277            1 :         d.mu.nextJobID = 1
     278            1 :         d.mu.mem.nextSize = opts.MemTableSize
     279            1 :         if d.mu.mem.nextSize > initialMemTableSize {
     280            1 :                 d.mu.mem.nextSize = initialMemTableSize
     281            1 :         }
     282            1 :         d.mu.compact.cond.L = &d.mu.Mutex
     283            1 :         d.mu.compact.inProgress = make(map[*compaction]struct{})
     284            1 :         d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
     285            1 :         d.mu.snapshots.init()
     286            1 :         // logSeqNum is the next sequence number that will be assigned.
     287            1 :         // Start assigning sequence numbers from base.SeqNumStart to leave
     288            1 :         // room for reserved sequence numbers (see comments around
     289            1 :         // SeqNumStart).
     290            1 :         d.mu.versions.logSeqNum.Store(base.SeqNumStart)
     291            1 :         d.mu.formatVers.vers.Store(uint64(formatVersion))
     292            1 :         d.mu.formatVers.marker = formatVersionMarker
     293            1 : 
     294            1 :         d.timeNow = time.Now
     295            1 :         d.openedAt = d.timeNow()
     296            1 : 
     297            1 :         d.mu.Lock()
     298            1 :         defer d.mu.Unlock()
     299            1 : 
     300            1 :         jobID := d.newJobIDLocked()
     301            1 : 
     302            1 :         providerSettings := objstorageprovider.Settings{
     303            1 :                 Logger:              opts.Logger,
     304            1 :                 FS:                  opts.FS,
     305            1 :                 FSDirName:           dirname,
     306            1 :                 FSDirInitialListing: ls,
     307            1 :                 FSCleaner:           opts.Cleaner,
     308            1 :                 NoSyncOnClose:       opts.NoSyncOnClose,
     309            1 :                 BytesPerSync:        opts.BytesPerSync,
     310            1 :         }
     311            1 :         providerSettings.Local.ReadaheadConfig = opts.Local.ReadaheadConfig
     312            1 :         providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
     313            1 :         providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
     314            1 :         providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
     315            1 :         providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
     316            1 : 
     317            1 :         d.objProvider, err = objstorageprovider.Open(providerSettings)
     318            1 :         if err != nil {
     319            1 :                 return nil, err
     320            1 :         }
     321              : 
     322            1 :         if !manifestExists {
     323            1 :                 // DB does not exist.
     324            1 :                 if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
     325            1 :                         return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
     326            1 :                 }
     327              : 
     328              :                 // Create the DB.
     329            1 :                 if err := d.mu.versions.create(
     330            1 :                         jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     331            1 :                         return nil, err
     332            1 :                 }
     333            1 :         } else {
     334            1 :                 if opts.ErrorIfExists {
     335            1 :                         return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
     336            1 :                 }
     337              :                 // Load the version set.
     338            1 :                 if err := d.mu.versions.load(
     339            1 :                         dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     340            1 :                         return nil, err
     341            1 :                 }
     342            1 :                 if opts.ErrorIfNotPristine {
     343            1 :                         liveFileNums := make(map[base.DiskFileNum]struct{})
     344            1 :                         d.mu.versions.addLiveFileNums(liveFileNums)
     345            1 :                         if len(liveFileNums) != 0 {
     346            1 :                                 return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
     347            1 :                         }
     348              :                 }
     349              :         }
     350              : 
     351              :         // In read-only mode, we replay directly into the mutable memtable but never
     352              :         // flush it. We need to delay creation of the memtable until we know the
     353              :         // sequence number of the first batch that will be inserted.
     354            1 :         if !d.opts.ReadOnly {
     355            1 :                 var entry *flushableEntry
     356            1 :                 d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     357            1 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     358            1 :         }
     359              : 
     360            1 :         d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     361            1 :                 Buckets: FsyncLatencyBuckets,
     362            1 :         })
     363            1 :         walOpts := wal.Options{
     364            1 :                 Primary:              wal.Dir{FS: opts.FS, Dirname: walDirname},
     365            1 :                 Secondary:            wal.Dir{},
     366            1 :                 MinUnflushedWALNum:   wal.NumWAL(d.mu.versions.minUnflushedLogNum),
     367            1 :                 MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
     368            1 :                 NoSyncOnClose:        opts.NoSyncOnClose,
     369            1 :                 BytesPerSync:         opts.WALBytesPerSync,
     370            1 :                 PreallocateSize:      d.walPreallocateSize,
     371            1 :                 MinSyncInterval:      opts.WALMinSyncInterval,
     372            1 :                 FsyncLatency:         d.mu.log.metrics.fsyncLatency,
     373            1 :                 QueueSemChan:         d.commit.logSyncQSem,
     374            1 :                 Logger:               opts.Logger,
     375            1 :                 EventListener:        walEventListenerAdaptor{l: opts.EventListener},
     376            1 :                 WriteWALSyncOffsets:  func() bool { return d.FormatMajorVersion() >= FormatWALSyncChunks },
     377              :         }
     378            1 :         if opts.WALFailover != nil {
     379            1 :                 walOpts.Secondary = opts.WALFailover.Secondary
     380            1 :                 walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
     381            1 :                 walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     382            1 :                         Buckets: FsyncLatencyBuckets,
     383            1 :                 })
     384            1 :         }
     385            1 :         walDirs := append(walOpts.Dirs(), opts.WALRecoveryDirs...)
     386            1 :         wals, err := wal.Scan(walDirs...)
     387            1 :         if err != nil {
     388            1 :                 return nil, err
     389            1 :         }
     390            1 :         walManager, err := wal.Init(walOpts, wals)
     391            1 :         if err != nil {
     392            1 :                 return nil, err
     393            1 :         }
     394            1 :         defer func() {
     395            1 :                 if db == nil {
     396            1 :                         _ = walManager.Close()
     397            1 :                 }
     398              :         }()
     399              : 
     400            1 :         d.mu.log.manager = walManager
     401            1 : 
     402            1 :         d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
     403            1 : 
     404            1 :         if manifestExists && !opts.DisableConsistencyCheck {
     405            1 :                 curVersion := d.mu.versions.currentVersion()
     406            1 :                 if err := checkConsistency(curVersion, d.objProvider); err != nil {
     407            1 :                         return nil, err
     408            1 :                 }
     409              :         }
     410              : 
     411            1 :         fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
     412            1 :         if opts.FileCache == nil {
     413            1 :                 opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
     414            1 :                 defer opts.FileCache.Unref()
     415            1 :         }
     416            1 :         d.fileCache = opts.FileCache.newHandle(d.cacheHandle, d.objProvider, d.opts.LoggerAndTracer, d.opts.MakeReaderOptions(), d.reportCorruption)
     417            1 :         d.newIters = d.fileCache.newIters
     418            1 :         d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
     419            1 : 
     420            1 :         d.mu.annotators.totalSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
     421            1 :                 return true
     422            1 :         })
     423            1 :         d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
     424            1 :                 meta, err := d.objProvider.Lookup(base.FileTypeTable, f.FileBacking.DiskFileNum)
     425            1 :                 if err != nil {
     426            0 :                         return false
     427            0 :                 }
     428            1 :                 return meta.IsRemote()
     429              :         })
     430            1 :         d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
     431            1 :                 meta, err := d.objProvider.Lookup(base.FileTypeTable, f.FileBacking.DiskFileNum)
     432            1 :                 if err != nil {
     433            0 :                         return false
     434            0 :                 }
     435            1 :                 return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
     436              :         })
     437              : 
     438            1 :         var previousOptionsFileNum base.DiskFileNum
     439            1 :         var previousOptionsFilename string
     440            1 :         for _, filename := range ls {
     441            1 :                 ft, fn, ok := base.ParseFilename(opts.FS, filename)
     442            1 :                 if !ok {
     443            1 :                         continue
     444              :                 }
     445              : 
     446              :                 // Don't reuse any obsolete file numbers to avoid modifying an
     447              :                 // ingested sstable's original external file.
     448            1 :                 d.mu.versions.markFileNumUsed(fn)
     449            1 : 
     450            1 :                 switch ft {
     451            0 :                 case base.FileTypeLog:
     452              :                         // Ignore.
     453            1 :                 case base.FileTypeOptions:
     454            1 :                         if previousOptionsFileNum < fn {
     455            1 :                                 previousOptionsFileNum = fn
     456            1 :                                 previousOptionsFilename = filename
     457            1 :                         }
     458            1 :                 case base.FileTypeTemp, base.FileTypeOldTemp:
     459            1 :                         if !d.opts.ReadOnly {
     460            1 :                                 // Some codepaths write to a temporary file and then
     461            1 :                                 // rename it to its final location when complete.  A
     462            1 :                                 // temp file is leftover if a process exits before the
     463            1 :                                 // rename.  Remove it.
     464            1 :                                 err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
     465            1 :                                 if err != nil {
     466            0 :                                         return nil, err
     467            0 :                                 }
     468              :                         }
     469              :                 }
     470              :         }
     471            1 :         if n := len(wals); n > 0 {
     472            1 :                 // Don't reuse any obsolete file numbers to avoid modifying an
     473            1 :                 // ingested sstable's original external file.
     474            1 :                 d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
     475            1 :         }
     476              : 
     477              :         // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
     478              :         // objProvider. This avoids FileNum collisions with obsolete sstables.
     479            1 :         objects := d.objProvider.List()
     480            1 :         for _, obj := range objects {
     481            1 :                 d.mu.versions.markFileNumUsed(obj.DiskFileNum)
     482            1 :         }
     483              : 
     484              :         // Validate the most-recent OPTIONS file, if there is one.
     485            1 :         if previousOptionsFilename != "" {
     486            1 :                 path := opts.FS.PathJoin(dirname, previousOptionsFilename)
     487            1 :                 previousOptions, err := readOptionsFile(opts, path)
     488            1 :                 if err != nil {
     489            0 :                         return nil, err
     490            0 :                 }
     491            1 :                 if err := opts.CheckCompatibility(previousOptions); err != nil {
     492            1 :                         return nil, err
     493            1 :                 }
     494              :         }
     495              : 
     496              :         // Replay any newer log files than the ones named in the manifest.
     497            1 :         var replayWALs wal.Logs
     498            1 :         for i, w := range wals {
     499            1 :                 if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
     500            1 :                         replayWALs = wals[i:]
     501            1 :                         break
     502              :                 }
     503              :         }
     504            1 :         var flushableIngests []*ingestedFlushable
     505            1 :         for i, lf := range replayWALs {
     506            1 :                 // WALs other than the last one would have been closed cleanly.
     507            1 :                 //
     508            1 :                 // Note: we used to never require strict WAL tails when reading from older
     509            1 :                 // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
     510            1 :                 // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
     511            1 :                 // compatible Pebble format is newer and guarantees a clean EOF.
     512            1 :                 strictWALTail := i < len(replayWALs)-1
     513            1 :                 fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
     514            1 :                 if err != nil {
     515            1 :                         return nil, err
     516            1 :                 }
     517            1 :                 if len(fi) > 0 {
     518            1 :                         flushableIngests = append(flushableIngests, fi...)
     519            1 :                 }
     520            1 :                 if d.mu.versions.logSeqNum.Load() < maxSeqNum {
     521            1 :                         d.mu.versions.logSeqNum.Store(maxSeqNum)
     522            1 :                 }
     523              :         }
     524            1 :         if d.mu.mem.mutable == nil {
     525            1 :                 // Recreate the mutable memtable if replayWAL got rid of it.
     526            1 :                 var entry *flushableEntry
     527            1 :                 d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     528            1 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     529            1 :         }
     530            1 :         d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
     531            1 : 
     532            1 :         // Register with the CompactionScheduler before calling
     533            1 :         // d.maybeScheduleFlush, since completion of the flush can trigger
     534            1 :         // compactions.
     535            1 :         d.opts.Experimental.CompactionScheduler.Register(2, d)
     536            1 :         if !d.opts.ReadOnly {
     537            1 :                 d.maybeScheduleFlush()
     538            1 :                 for d.mu.compact.flushing {
     539            1 :                         d.mu.compact.cond.Wait()
     540            1 :                 }
     541              : 
     542              :                 // Create an empty .log file for the mutable memtable.
     543            1 :                 newLogNum := d.mu.versions.getNextDiskFileNum()
     544            1 :                 d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
     545            1 :                 if err != nil {
     546            1 :                         return nil, err
     547            1 :                 }
     548              : 
     549              :                 // This isn't strictly necessary as we don't use the log number for
     550              :                 // memtables being flushed, only for the next unflushed memtable.
     551            1 :                 d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
     552              :         }
     553            1 :         d.updateReadStateLocked(d.opts.DebugCheck)
     554            1 : 
     555            1 :         if !d.opts.ReadOnly {
     556            1 :                 // If the Options specify a format major version higher than the
     557            1 :                 // loaded database's, upgrade it. If this is a new database, this
     558            1 :                 // code path also performs an initial upgrade from the starting
     559            1 :                 // implicit MinSupported version.
     560            1 :                 //
     561            1 :                 // We ratchet the version this far into Open so that migrations have a read
     562            1 :                 // state available. Note that this also results in creating/updating the
     563            1 :                 // format version marker file.
     564            1 :                 if opts.FormatMajorVersion > d.FormatMajorVersion() {
     565            1 :                         if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
     566            0 :                                 return nil, err
     567            0 :                         }
     568            1 :                 } else if noFormatVersionMarker {
     569            1 :                         // We are creating a new store. Create the format version marker file.
     570            1 :                         if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
     571            1 :                                 return nil, err
     572            1 :                         }
     573              :                 }
     574              : 
     575              :                 // Write the current options to disk.
     576            1 :                 d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
     577            1 :                 tmpPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeTemp, d.optionsFileNum)
     578            1 :                 optionsPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeOptions, d.optionsFileNum)
     579            1 : 
     580            1 :                 // Write them to a temporary file first, in case we crash before
     581            1 :                 // we're done. A corrupt options file prevents opening the
     582            1 :                 // database.
     583            1 :                 optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
     584            1 :                 if err != nil {
     585            1 :                         return nil, err
     586            1 :                 }
     587            1 :                 serializedOpts := []byte(opts.String())
     588            1 :                 if _, err := optionsFile.Write(serializedOpts); err != nil {
     589            1 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     590            1 :                 }
     591            1 :                 d.optionsFileSize = uint64(len(serializedOpts))
     592            1 :                 if err := optionsFile.Sync(); err != nil {
     593            1 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     594            1 :                 }
     595            1 :                 if err := optionsFile.Close(); err != nil {
     596            0 :                         return nil, err
     597            0 :                 }
     598              :                 // Atomically rename to the OPTIONS-XXXXXX path. This rename is
     599              :                 // guaranteed to be atomic because the destination path does not
     600              :                 // exist.
     601            1 :                 if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
     602            1 :                         return nil, err
     603            1 :                 }
     604            1 :                 if err := d.dataDir.Sync(); err != nil {
     605            1 :                         return nil, err
     606            1 :                 }
     607              :         }
     608              : 
     609            1 :         if !d.opts.ReadOnly {
     610            1 :                 // Get a fresh list of files, in case some of the earlier flushes/compactions
     611            1 :                 // have deleted some files.
     612            1 :                 ls, err := opts.FS.List(dirname)
     613            1 :                 if err != nil {
     614            1 :                         return nil, err
     615            1 :                 }
     616            1 :                 d.scanObsoleteFiles(ls, flushableIngests)
     617            1 :                 d.deleteObsoleteFiles(jobID)
     618              :         }
     619              :         // Else, nothing is obsolete.
     620              : 
     621            1 :         d.mu.tableStats.cond.L = &d.mu.Mutex
     622            1 :         d.mu.tableValidation.cond.L = &d.mu.Mutex
     623            1 :         if !d.opts.ReadOnly {
     624            1 :                 d.maybeCollectTableStatsLocked()
     625            1 :         }
     626            1 :         d.calculateDiskAvailableBytes()
     627            1 : 
     628            1 :         d.maybeScheduleFlush()
     629            1 :         d.maybeScheduleCompaction()
     630            1 : 
     631            1 :         // Note: this is a no-op if invariants are disabled or race is enabled.
     632            1 :         //
     633            1 :         // Setting a finalizer on *DB causes *DB to never be reclaimed and the
     634            1 :         // finalizer to never be run. The problem is due to this limitation of
     635            1 :         // finalizers mention in the SetFinalizer docs:
     636            1 :         //
     637            1 :         //   If a cyclic structure includes a block with a finalizer, that cycle is
     638            1 :         //   not guaranteed to be garbage collected and the finalizer is not
     639            1 :         //   guaranteed to run, because there is no ordering that respects the
     640            1 :         //   dependencies.
     641            1 :         //
     642            1 :         // DB has cycles with several of its internal structures: readState,
     643            1 :         // newIters, fileCache, versions, etc. Each of this individually cause a
     644            1 :         // cycle and prevent the finalizer from being run. But we can workaround this
     645            1 :         // finializer limitation by setting a finalizer on another object that is
     646            1 :         // tied to the lifetime of DB: the DB.closed atomic.Value.
     647            1 :         dPtr := fmt.Sprintf("%p", d)
     648            1 :         invariants.SetFinalizer(d.closed, func(obj interface{}) {
     649            0 :                 v := obj.(*atomic.Value)
     650            0 :                 if err := v.Load(); err == nil {
     651            0 :                         fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
     652            0 :                         os.Exit(1)
     653            0 :                 }
     654              :         })
     655              : 
     656            1 :         return d, nil
     657              : }
     658              : 
     659              : // prepareAndOpenDirs opens the directories for the store (and creates them if
     660              : // necessary).
     661              : //
     662              : // Returns an error if ReadOnly is set and the directories don't exist.
     663              : func prepareAndOpenDirs(
     664              :         dirname string, opts *Options,
     665            1 : ) (walDirname string, dataDir vfs.File, err error) {
     666            1 :         walDirname = opts.WALDir
     667            1 :         if opts.WALDir == "" {
     668            1 :                 walDirname = dirname
     669            1 :         }
     670              : 
     671              :         // Create directories if needed.
     672            1 :         if !opts.ReadOnly {
     673            1 :                 f, err := mkdirAllAndSyncParents(opts.FS, dirname)
     674            1 :                 if err != nil {
     675            1 :                         return "", nil, err
     676            1 :                 }
     677            1 :                 f.Close()
     678            1 :                 if walDirname != dirname {
     679            1 :                         f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
     680            1 :                         if err != nil {
     681            0 :                                 return "", nil, err
     682            0 :                         }
     683            1 :                         f.Close()
     684              :                 }
     685            1 :                 if opts.WALFailover != nil {
     686            1 :                         secondary := opts.WALFailover.Secondary
     687            1 :                         f, err := mkdirAllAndSyncParents(secondary.FS, secondary.Dirname)
     688            1 :                         if err != nil {
     689            0 :                                 return "", nil, err
     690            0 :                         }
     691            1 :                         f.Close()
     692              :                 }
     693              :         }
     694              : 
     695            1 :         dataDir, err = opts.FS.OpenDir(dirname)
     696            1 :         if err != nil {
     697            1 :                 if opts.ReadOnly && oserror.IsNotExist(err) {
     698            1 :                         return "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
     699            1 :                 }
     700            1 :                 return "", nil, err
     701              :         }
     702            1 :         if opts.ReadOnly && walDirname != dirname {
     703            1 :                 // Check that the wal dir exists.
     704            1 :                 walDir, err := opts.FS.OpenDir(walDirname)
     705            1 :                 if err != nil {
     706            1 :                         dataDir.Close()
     707            1 :                         return "", nil, err
     708            1 :                 }
     709            0 :                 walDir.Close()
     710              :         }
     711              : 
     712            1 :         return walDirname, dataDir, nil
     713              : }
     714              : 
     715              : // GetVersion returns the engine version string from the latest options
     716              : // file present in dir. Used to check what Pebble or RocksDB version was last
     717              : // used to write to the database stored in this directory. An empty string is
     718              : // returned if no valid OPTIONS file with a version key was found.
     719            1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
     720            1 :         ls, err := fs.List(dir)
     721            1 :         if err != nil {
     722            0 :                 return "", err
     723            0 :         }
     724            1 :         var version string
     725            1 :         lastOptionsSeen := base.DiskFileNum(0)
     726            1 :         for _, filename := range ls {
     727            1 :                 ft, fn, ok := base.ParseFilename(fs, filename)
     728            1 :                 if !ok {
     729            1 :                         continue
     730              :                 }
     731            1 :                 switch ft {
     732            1 :                 case base.FileTypeOptions:
     733            1 :                         // If this file has a higher number than the last options file
     734            1 :                         // processed, reset version. This is because rocksdb often
     735            1 :                         // writes multiple options files without deleting previous ones.
     736            1 :                         // Otherwise, skip parsing this options file.
     737            1 :                         if fn > lastOptionsSeen {
     738            1 :                                 version = ""
     739            1 :                                 lastOptionsSeen = fn
     740            1 :                         } else {
     741            0 :                                 continue
     742              :                         }
     743            1 :                         f, err := fs.Open(fs.PathJoin(dir, filename))
     744            1 :                         if err != nil {
     745            0 :                                 return "", err
     746            0 :                         }
     747            1 :                         data, err := io.ReadAll(f)
     748            1 :                         f.Close()
     749            1 : 
     750            1 :                         if err != nil {
     751            0 :                                 return "", err
     752            0 :                         }
     753            1 :                         err = parseOptions(string(data), parseOptionsFuncs{
     754            1 :                                 visitKeyValue: func(i, j int, section, key, value string) error {
     755            1 :                                         switch {
     756            1 :                                         case section == "Version":
     757            1 :                                                 switch key {
     758            1 :                                                 case "pebble_version":
     759            1 :                                                         version = value
     760            1 :                                                 case "rocksdb_version":
     761            1 :                                                         version = fmt.Sprintf("rocksdb v%s", value)
     762              :                                                 }
     763              :                                         }
     764            1 :                                         return nil
     765              :                                 },
     766              :                         })
     767            1 :                         if err != nil {
     768            0 :                                 return "", err
     769            0 :                         }
     770              :                 }
     771              :         }
     772            1 :         return version, nil
     773              : }
     774              : 
     775              : func (d *DB) replayIngestedFlushable(
     776              :         b *Batch, logNum base.DiskFileNum,
     777            1 : ) (entry *flushableEntry, err error) {
     778            1 :         br := b.Reader()
     779            1 :         seqNum := b.SeqNum()
     780            1 : 
     781            1 :         fileNums := make([]base.DiskFileNum, 0, b.Count())
     782            1 :         var exciseSpan KeyRange
     783            1 :         addFileNum := func(encodedFileNum []byte) {
     784            1 :                 fileNum, n := binary.Uvarint(encodedFileNum)
     785            1 :                 if n <= 0 {
     786            0 :                         panic("pebble: ingest sstable file num is invalid")
     787              :                 }
     788            1 :                 fileNums = append(fileNums, base.DiskFileNum(fileNum))
     789              :         }
     790              : 
     791            1 :         for i := 0; i < int(b.Count()); i++ {
     792            1 :                 kind, key, val, ok, err := br.Next()
     793            1 :                 if err != nil {
     794            0 :                         return nil, err
     795            0 :                 }
     796            1 :                 if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
     797            0 :                         panic("pebble: invalid batch key kind")
     798              :                 }
     799            1 :                 if !ok {
     800            0 :                         panic("pebble: invalid batch count")
     801              :                 }
     802            1 :                 if kind == base.InternalKeyKindExcise {
     803            0 :                         if exciseSpan.Valid() {
     804            0 :                                 panic("pebble: multiple excise spans in a single batch")
     805              :                         }
     806            0 :                         exciseSpan.Start = slices.Clone(key)
     807            0 :                         exciseSpan.End = slices.Clone(val)
     808            0 :                         continue
     809              :                 }
     810            1 :                 addFileNum(key)
     811              :         }
     812              : 
     813            1 :         if _, _, _, ok, err := br.Next(); err != nil {
     814            0 :                 return nil, err
     815            1 :         } else if ok {
     816            0 :                 panic("pebble: invalid number of entries in batch")
     817              :         }
     818              : 
     819            1 :         meta := make([]*tableMetadata, len(fileNums))
     820            1 :         var lastRangeKey keyspan.Span
     821            1 :         for i, n := range fileNums {
     822            1 :                 readable, err := d.objProvider.OpenForReading(context.TODO(), base.FileTypeTable, n,
     823            1 :                         objstorage.OpenOptions{MustExist: true})
     824            1 :                 if err != nil {
     825            0 :                         return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
     826            0 :                 }
     827              :                 // NB: ingestLoad1 will close readable.
     828            1 :                 meta[i], lastRangeKey, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
     829            1 :                         readable, d.cacheHandle, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
     830            1 :                 if err != nil {
     831            0 :                         return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
     832            0 :                 }
     833              :         }
     834            1 :         if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
     835            0 :                 return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
     836            0 :                         d.opts.Comparer.FormatKey(lastRangeKey.End))
     837            0 :         }
     838              : 
     839            1 :         numFiles := len(meta)
     840            1 :         if exciseSpan.Valid() {
     841            0 :                 numFiles++
     842            0 :         }
     843            1 :         if uint32(numFiles) != b.Count() {
     844            0 :                 panic("pebble: couldn't load all files in WAL entry")
     845              :         }
     846              : 
     847            1 :         return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
     848              : }
     849              : 
     850              : // replayWAL replays the edits in the specified WAL. If the DB is in read
     851              : // only mode, then the WALs are replayed into memtables and not flushed. If
     852              : // the DB is not in read only mode, then the contents of the WAL are
     853              : // guaranteed to be flushed when a flush is scheduled after this method is run.
     854              : // Note that this flushing is very important for guaranteeing durability:
     855              : // the application may have had a number of pending
     856              : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
     857              : // happened but the corresponding data may now be readable from the WAL (while
     858              : // sitting in write-back caches in the kernel or the storage device). By
     859              : // reading the WAL (including the non-fsynced data) and then flushing all
     860              : // these changes (flush does fsyncs), we are able to guarantee that the
     861              : // initial state of the DB is durable.
     862              : //
     863              : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
     864              : // WALs into the flushable queue. Flushing of the queue is expected to be handled
     865              : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
     866              : //
     867              : // d.mu must be held when calling this, but the mutex may be dropped and
     868              : // re-acquired during the course of this method.
     869              : func (d *DB) replayWAL(
     870              :         jobID JobID, ll wal.LogicalLog, strictWALTail bool,
     871            1 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
     872            1 :         rr := ll.OpenForRead()
     873            1 :         defer func() { _ = rr.Close() }()
     874            1 :         var (
     875            1 :                 b               Batch
     876            1 :                 buf             bytes.Buffer
     877            1 :                 mem             *memTable
     878            1 :                 entry           *flushableEntry
     879            1 :                 offset          wal.Offset
     880            1 :                 lastFlushOffset int64
     881            1 :                 keysReplayed    int64 // number of keys replayed
     882            1 :                 batchesReplayed int64 // number of batches replayed
     883            1 :         )
     884            1 : 
     885            1 :         // TODO(jackson): This function is interspersed with panics, in addition to
     886            1 :         // corruption error propagation. Audit them to ensure we're truly only
     887            1 :         // panicking where the error points to Pebble bug and not user or
     888            1 :         // hardware-induced corruption.
     889            1 : 
     890            1 :         // "Flushes" (ie. closes off) the current memtable, if not nil.
     891            1 :         flushMem := func() {
     892            1 :                 if mem == nil {
     893            1 :                         return
     894            1 :                 }
     895            1 :                 mem.writerUnref()
     896            1 :                 if d.mu.mem.mutable == mem {
     897            1 :                         d.mu.mem.mutable = nil
     898            1 :                 }
     899            1 :                 entry.flushForced = !d.opts.ReadOnly
     900            1 :                 var logSize uint64
     901            1 :                 mergedOffset := offset.Physical + offset.PreviousFilesBytes
     902            1 :                 if mergedOffset >= lastFlushOffset {
     903            1 :                         logSize = uint64(mergedOffset - lastFlushOffset)
     904            1 :                 }
     905              :                 // Else, this was the initial memtable in the read-only case which must have
     906              :                 // been empty, but we need to flush it since we don't want to add to it later.
     907            1 :                 lastFlushOffset = mergedOffset
     908            1 :                 entry.logSize = logSize
     909            1 :                 mem, entry = nil, nil
     910              :         }
     911              : 
     912            1 :         mem = d.mu.mem.mutable
     913            1 :         if mem != nil {
     914            1 :                 entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
     915            1 :                 if !d.opts.ReadOnly {
     916            1 :                         flushMem()
     917            1 :                 }
     918              :         }
     919              : 
     920              :         // Creates a new memtable if there is no current memtable.
     921            1 :         ensureMem := func(seqNum base.SeqNum) {
     922            1 :                 if mem != nil {
     923            1 :                         return
     924            1 :                 }
     925            1 :                 mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
     926            1 :                 d.mu.mem.mutable = mem
     927            1 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     928              :         }
     929              : 
     930            1 :         defer func() {
     931            1 :                 if err != nil {
     932            1 :                         err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
     933            1 :                 }
     934              :         }()
     935              : 
     936            1 :         for {
     937            1 :                 var r io.Reader
     938            1 :                 var err error
     939            1 :                 r, offset, err = rr.NextRecord()
     940            1 :                 if err == nil {
     941            1 :                         _, err = io.Copy(&buf, r)
     942            1 :                 }
     943            1 :                 if err != nil {
     944            1 :                         // It is common to encounter a zeroed or invalid chunk due to WAL
     945            1 :                         // preallocation and WAL recycling. However zeroed or invalid chunks
     946            1 :                         // can also be a consequence of corruption / disk rot. When the log
     947            1 :                         // reader encounters one of these cases, it attempts to disambiguate
     948            1 :                         // by reading ahead looking for a future record. If a future chunk
     949            1 :                         // indicates the chunk at the original offset should've been valid, it
     950            1 :                         // surfaces record.ErrInvalidChunk or record.ErrZeroedChunk. These
     951            1 :                         // errors are always indicative of corruption and data loss.
     952            1 :                         //
     953            1 :                         // Otherwise, the reader surfaces record.ErrUnexpectedEOF indicating
     954            1 :                         // that the WAL terminated uncleanly and ambiguously. If the WAL is
     955            1 :                         // the most recent logical WAL, the caller passes in
     956            1 :                         // (strictWALTail=false), indicating we should tolerate the unclean
     957            1 :                         // ending. If the WAL is an older WAL, the caller passes in
     958            1 :                         // (strictWALTail=true), indicating that the WAL should have been
     959            1 :                         // closed cleanly, and we should interpret the
     960            1 :                         // `record.ErrUnexpectedEOF` as corruption and stop recovery.
     961            1 :                         if errors.Is(err, io.EOF) {
     962            1 :                                 break
     963            1 :                         } else if errors.Is(err, record.ErrUnexpectedEOF) && !strictWALTail {
     964            1 :                                 break
     965            1 :                         } else if (errors.Is(err, record.ErrUnexpectedEOF) && strictWALTail) ||
     966            1 :                                 errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) {
     967            1 :                                 // If a read-ahead returns record.ErrInvalidChunk or
     968            1 :                                 // record.ErrZeroedChunk, then there's definitively corruption.
     969            1 :                                 //
     970            1 :                                 // If strictWALTail=true, then record.ErrUnexpectedEOF should
     971            1 :                                 // also be considered corruption because the strictWALTail
     972            1 :                                 // indicates we expect a clean end to the WAL.
     973            1 :                                 //
     974            1 :                                 // Other I/O related errors should not be marked with corruption
     975            1 :                                 // and simply returned.
     976            1 :                                 err = errors.Mark(err, ErrCorruption)
     977            1 :                         }
     978              : 
     979            1 :                         return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
     980              :                 }
     981              : 
     982            1 :                 if buf.Len() < batchrepr.HeaderLen {
     983            0 :                         return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
     984            0 :                                 errors.Safe(base.DiskFileNum(ll.Num)), offset)
     985            0 :                 }
     986              : 
     987            1 :                 if d.opts.ErrorIfNotPristine {
     988            1 :                         return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
     989            1 :                 }
     990              : 
     991              :                 // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
     992              :                 // which is used below.
     993            1 :                 b = Batch{}
     994            1 :                 b.db = d
     995            1 :                 if err := b.SetRepr(buf.Bytes()); err != nil {
     996            0 :                         return nil, 0, err
     997            0 :                 }
     998            1 :                 seqNum := b.SeqNum()
     999            1 :                 maxSeqNum = seqNum + base.SeqNum(b.Count())
    1000            1 :                 keysReplayed += int64(b.Count())
    1001            1 :                 batchesReplayed++
    1002            1 :                 {
    1003            1 :                         br := b.Reader()
    1004            1 :                         if kind, _, _, ok, err := br.Next(); err != nil {
    1005            0 :                                 return nil, 0, err
    1006            1 :                         } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
    1007            1 :                                 // We're in the flushable ingests (+ possibly excises) case.
    1008            1 :                                 //
    1009            1 :                                 // Ingests require an up-to-date view of the LSM to determine the target
    1010            1 :                                 // level of ingested sstables, and to accurately compute excises. Instead of
    1011            1 :                                 // doing an ingest in this function, we just enqueue a flushable ingest
    1012            1 :                                 // in the flushables queue and run a regular flush.
    1013            1 :                                 flushMem()
    1014            1 :                                 // mem is nil here.
    1015            1 :                                 entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
    1016            1 :                                 if err != nil {
    1017            0 :                                         return nil, 0, err
    1018            0 :                                 }
    1019            1 :                                 fi := entry.flushable.(*ingestedFlushable)
    1020            1 :                                 flushableIngests = append(flushableIngests, fi)
    1021            1 :                                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1022            1 :                                 // A flushable ingest is always followed by a WAL rotation.
    1023            1 :                                 break
    1024              :                         }
    1025              :                 }
    1026              : 
    1027            1 :                 if b.memTableSize >= uint64(d.largeBatchThreshold) {
    1028            1 :                         flushMem()
    1029            1 :                         // Make a copy of the data slice since it is currently owned by buf and will
    1030            1 :                         // be reused in the next iteration.
    1031            1 :                         b.data = slices.Clone(b.data)
    1032            1 :                         b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
    1033            1 :                         if err != nil {
    1034            0 :                                 return nil, 0, err
    1035            0 :                         }
    1036            1 :                         entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
    1037            1 :                         // Disable memory accounting by adding a reader ref that will never be
    1038            1 :                         // removed.
    1039            1 :                         entry.readerRefs.Add(1)
    1040            1 :                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1041            1 :                 } else {
    1042            1 :                         ensureMem(seqNum)
    1043            1 :                         if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
    1044            0 :                                 return nil, 0, err
    1045            0 :                         }
    1046              :                         // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
    1047              :                         // batch may not initially fit, but will eventually fit (since it is smaller than
    1048              :                         // largeBatchThreshold).
    1049            1 :                         for err == arenaskl.ErrArenaFull {
    1050            1 :                                 flushMem()
    1051            1 :                                 ensureMem(seqNum)
    1052            1 :                                 err = mem.prepare(&b)
    1053            1 :                                 if err != nil && err != arenaskl.ErrArenaFull {
    1054            0 :                                         return nil, 0, err
    1055            0 :                                 }
    1056              :                         }
    1057            1 :                         if err = mem.apply(&b, seqNum); err != nil {
    1058            0 :                                 return nil, 0, err
    1059            0 :                         }
    1060            1 :                         mem.writerUnref()
    1061              :                 }
    1062            1 :                 buf.Reset()
    1063              :         }
    1064              : 
    1065            1 :         d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
    1066            1 :                 jobID, ll.String(), offset, keysReplayed, batchesReplayed)
    1067            1 :         if !d.opts.ReadOnly {
    1068            1 :                 flushMem()
    1069            1 :         }
    1070              : 
    1071              :         // mem is nil here, if !ReadOnly.
    1072            1 :         return flushableIngests, maxSeqNum, err
    1073              : }
    1074              : 
    1075            1 : func readOptionsFile(opts *Options, path string) (string, error) {
    1076            1 :         f, err := opts.FS.Open(path)
    1077            1 :         if err != nil {
    1078            0 :                 return "", err
    1079            0 :         }
    1080            1 :         defer f.Close()
    1081            1 : 
    1082            1 :         data, err := io.ReadAll(f)
    1083            1 :         if err != nil {
    1084            0 :                 return "", err
    1085            0 :         }
    1086            1 :         return string(data), nil
    1087              : }
    1088              : 
    1089              : // DBDesc briefly describes high-level state about a database.
    1090              : type DBDesc struct {
    1091              :         // Exists is true if an existing database was found.
    1092              :         Exists bool
    1093              :         // FormatMajorVersion indicates the database's current format
    1094              :         // version.
    1095              :         FormatMajorVersion FormatMajorVersion
    1096              :         // ManifestFilename is the filename of the current active manifest,
    1097              :         // if the database exists.
    1098              :         ManifestFilename string
    1099              :         // OptionsFilename is the filename of the most recent OPTIONS file, if it
    1100              :         // exists.
    1101              :         OptionsFilename string
    1102              : }
    1103              : 
    1104              : // String implements fmt.Stringer.
    1105            1 : func (d *DBDesc) String() string {
    1106            1 :         if !d.Exists {
    1107            1 :                 return "uninitialized"
    1108            1 :         }
    1109            1 :         var buf bytes.Buffer
    1110            1 :         fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
    1111            1 :         fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
    1112            1 :         fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
    1113            1 :         return buf.String()
    1114              : }
    1115              : 
    1116              : // Peek looks for an existing database in dirname on the provided FS. It
    1117              : // returns a brief description of the database. Peek is read-only and
    1118              : // does not open the database
    1119            1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
    1120            1 :         ls, err := fs.List(dirname)
    1121            1 :         if err != nil {
    1122            1 :                 return nil, err
    1123            1 :         }
    1124              : 
    1125            1 :         vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
    1126            1 :         if err != nil {
    1127            0 :                 return nil, err
    1128            0 :         }
    1129              :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1130              :         // PeekMarker variant that avoids opening the directory.
    1131            1 :         if err := versMarker.Close(); err != nil {
    1132            0 :                 return nil, err
    1133            0 :         }
    1134              : 
    1135              :         // Find the currently active manifest, if there is one.
    1136            1 :         manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
    1137            1 :         if err != nil {
    1138            0 :                 return nil, err
    1139            0 :         }
    1140              :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1141              :         // PeekMarker variant that avoids opening the directory.
    1142            1 :         if err := manifestMarker.Close(); err != nil {
    1143            0 :                 return nil, err
    1144            0 :         }
    1145              : 
    1146            1 :         desc := &DBDesc{
    1147            1 :                 Exists:             exists,
    1148            1 :                 FormatMajorVersion: vers,
    1149            1 :         }
    1150            1 : 
    1151            1 :         // Find the OPTIONS file with the highest file number within the list of
    1152            1 :         // directory entries.
    1153            1 :         var previousOptionsFileNum base.DiskFileNum
    1154            1 :         for _, filename := range ls {
    1155            1 :                 ft, fn, ok := base.ParseFilename(fs, filename)
    1156            1 :                 if !ok || ft != base.FileTypeOptions || fn < previousOptionsFileNum {
    1157            1 :                         continue
    1158              :                 }
    1159            1 :                 previousOptionsFileNum = fn
    1160            1 :                 desc.OptionsFilename = fs.PathJoin(dirname, filename)
    1161              :         }
    1162              : 
    1163            1 :         if exists {
    1164            1 :                 desc.ManifestFilename = base.MakeFilepath(fs, dirname, base.FileTypeManifest, manifestFileNum)
    1165            1 :         }
    1166            1 :         return desc, nil
    1167              : }
    1168              : 
    1169              : // LockDirectory acquires the database directory lock in the named directory,
    1170              : // preventing another process from opening the database. LockDirectory returns a
    1171              : // handle to the held lock that may be passed to Open through Options.Lock to
    1172              : // subsequently open the database, skipping lock acquistion during Open.
    1173              : //
    1174              : // LockDirectory may be used to expand the critical section protected by the
    1175              : // database lock to include setup before the call to Open.
    1176            1 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
    1177            1 :         fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, base.FileTypeLock, base.DiskFileNum(0)))
    1178            1 :         if err != nil {
    1179            1 :                 return nil, err
    1180            1 :         }
    1181            1 :         l := &Lock{dirname: dirname, fileLock: fileLock}
    1182            1 :         l.refs.Store(1)
    1183            1 :         invariants.SetFinalizer(l, func(obj interface{}) {
    1184            1 :                 if refs := obj.(*Lock).refs.Load(); refs > 0 {
    1185            0 :                         panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
    1186              :                 }
    1187              :         })
    1188            1 :         return l, nil
    1189              : }
    1190              : 
    1191              : // Lock represents a file lock on a directory. It may be passed to Open through
    1192              : // Options.Lock to elide lock aquisition during Open.
    1193              : type Lock struct {
    1194              :         dirname  string
    1195              :         fileLock io.Closer
    1196              :         // refs is a count of the number of handles on the lock. refs must be 0, 1
    1197              :         // or 2.
    1198              :         //
    1199              :         // When acquired by the client and passed to Open, refs = 1 and the Open
    1200              :         // call increments it to 2. When the database is closed, it's decremented to
    1201              :         // 1. Finally when the original caller, calls Close on the Lock, it's
    1202              :         // drecemented to zero and the underlying file lock is released.
    1203              :         //
    1204              :         // When Open acquires the file lock, refs remains at 1 until the database is
    1205              :         // closed.
    1206              :         refs atomic.Int32
    1207              : }
    1208              : 
    1209            1 : func (l *Lock) refForOpen() error {
    1210            1 :         // During Open, when a user passed in a lock, the reference count must be
    1211            1 :         // exactly 1. If it's zero, the lock is no longer held and is invalid. If
    1212            1 :         // it's 2, the lock is already in use by another database within the
    1213            1 :         // process.
    1214            1 :         if !l.refs.CompareAndSwap(1, 2) {
    1215            1 :                 return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
    1216            1 :         }
    1217            1 :         return nil
    1218              : }
    1219              : 
    1220              : // Close releases the lock, permitting another process to lock and open the
    1221              : // database. Close must not be called until after a database using the Lock has
    1222              : // been closed.
    1223            1 : func (l *Lock) Close() error {
    1224            1 :         if l.refs.Add(-1) > 0 {
    1225            1 :                 return nil
    1226            1 :         }
    1227            1 :         defer func() { l.fileLock = nil }()
    1228            1 :         return l.fileLock.Close()
    1229              : }
    1230              : 
    1231            1 : func (l *Lock) pathMatches(dirname string) error {
    1232            1 :         if dirname == l.dirname {
    1233            1 :                 return nil
    1234            1 :         }
    1235              :         // Check for relative paths, symlinks, etc. This isn't ideal because we're
    1236              :         // circumventing the vfs.FS interface here.
    1237              :         //
    1238              :         // TODO(jackson): We could add support for retrieving file inodes through Stat
    1239              :         // calls in the VFS interface on platforms where it's available and use that
    1240              :         // to differentiate.
    1241            1 :         dirStat, err1 := os.Stat(dirname)
    1242            1 :         lockDirStat, err2 := os.Stat(l.dirname)
    1243            1 :         if err1 == nil && err2 == nil && os.SameFile(dirStat, lockDirStat) {
    1244            1 :                 return nil
    1245            1 :         }
    1246            0 :         return errors.Join(
    1247            0 :                 errors.Newf("pebble: opts.Lock acquired in %q not %q", l.dirname, dirname),
    1248            0 :                 err1, err2)
    1249              : }
    1250              : 
    1251              : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
    1252              : // does not exist.
    1253              : //
    1254              : // Note that errors can be wrapped with more details; use errors.Is().
    1255              : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
    1256              : 
    1257              : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
    1258              : // already exists.
    1259              : //
    1260              : // Note that errors can be wrapped with more details; use errors.Is().
    1261              : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
    1262              : 
    1263              : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
    1264              : // already exists and is not pristine.
    1265              : //
    1266              : // Note that errors can be wrapped with more details; use errors.Is().
    1267              : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
    1268              : 
    1269            1 : func checkConsistency(v *manifest.Version, objProvider objstorage.Provider) error {
    1270            1 :         var errs []error
    1271            1 :         dedup := make(map[base.DiskFileNum]struct{})
    1272            1 :         for level, files := range v.Levels {
    1273            1 :                 for f := range files.All() {
    1274            1 :                         backingState := f.FileBacking
    1275            1 :                         if _, ok := dedup[backingState.DiskFileNum]; ok {
    1276            1 :                                 continue
    1277              :                         }
    1278            1 :                         dedup[backingState.DiskFileNum] = struct{}{}
    1279            1 :                         fileNum := backingState.DiskFileNum
    1280            1 :                         fileSize := backingState.Size
    1281            1 :                         // We skip over remote objects; those are instead checked asynchronously
    1282            1 :                         // by the table stats loading job.
    1283            1 :                         meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
    1284            1 :                         var size int64
    1285            1 :                         if err == nil {
    1286            1 :                                 if meta.IsRemote() {
    1287            1 :                                         continue
    1288              :                                 }
    1289            1 :                                 size, err = objProvider.Size(meta)
    1290              :                         }
    1291            1 :                         if err != nil {
    1292            1 :                                 errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
    1293            1 :                                 continue
    1294              :                         }
    1295              : 
    1296            1 :                         if size != int64(fileSize) {
    1297            0 :                                 errs = append(errs, errors.Errorf(
    1298            0 :                                         "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
    1299            0 :                                         errors.Safe(level), fileNum, objProvider.Path(meta),
    1300            0 :                                         errors.Safe(size), errors.Safe(fileSize)))
    1301            0 :                                 continue
    1302              :                         }
    1303              :                 }
    1304              :         }
    1305            1 :         return errors.Join(errs...)
    1306              : }
    1307              : 
    1308              : type walEventListenerAdaptor struct {
    1309              :         l *EventListener
    1310              : }
    1311              : 
    1312            1 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
    1313            1 :         // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
    1314            1 :         // is insufficient to infer whether primary or secondary.
    1315            1 :         wci := WALCreateInfo{
    1316            1 :                 JobID:           ci.JobID,
    1317            1 :                 Path:            ci.Path,
    1318            1 :                 FileNum:         base.DiskFileNum(ci.Num),
    1319            1 :                 RecycledFileNum: ci.RecycledFileNum,
    1320            1 :                 Err:             ci.Err,
    1321            1 :         }
    1322            1 :         l.l.WALCreated(wci)
    1323            1 : }
        

Generated by: LCOV version 2.0-1