LCOV - code coverage report
Current view: top level - pebble - open.go (source / functions) Hit Total Coverage
Test: 2024-09-08 08:16Z 376d455b - meta test only.lcov Lines: 586 958 61.2 %
Date: 2024-09-08 08:17:00 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "io"
      13             :         "math"
      14             :         "os"
      15             :         "slices"
      16             :         "sync/atomic"
      17             :         "time"
      18             : 
      19             :         "github.com/cockroachdb/errors"
      20             :         "github.com/cockroachdb/errors/oserror"
      21             :         "github.com/cockroachdb/pebble/batchrepr"
      22             :         "github.com/cockroachdb/pebble/internal/arenaskl"
      23             :         "github.com/cockroachdb/pebble/internal/base"
      24             :         "github.com/cockroachdb/pebble/internal/cache"
      25             :         "github.com/cockroachdb/pebble/internal/constants"
      26             :         "github.com/cockroachdb/pebble/internal/invariants"
      27             :         "github.com/cockroachdb/pebble/internal/manifest"
      28             :         "github.com/cockroachdb/pebble/internal/manual"
      29             :         "github.com/cockroachdb/pebble/objstorage"
      30             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      31             :         "github.com/cockroachdb/pebble/objstorage/remote"
      32             :         "github.com/cockroachdb/pebble/record"
      33             :         "github.com/cockroachdb/pebble/sstable"
      34             :         "github.com/cockroachdb/pebble/vfs"
      35             :         "github.com/cockroachdb/pebble/wal"
      36             :         "github.com/prometheus/client_golang/prometheus"
      37             : )
      38             : 
      39             : const (
      40             :         initialMemTableSize = 256 << 10 // 256 KB
      41             : 
      42             :         // The max batch size is limited by the uint32 offsets stored in
      43             :         // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
      44             :         //
      45             :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      46             :         // end of an allocation fits in uint32.
      47             :         //
      48             :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      49             :         // 2GB).
      50             :         maxBatchSize = constants.MaxUint32OrInt
      51             : 
      52             :         // The max memtable size is limited by the uint32 offsets stored in
      53             :         // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
      54             :         //
      55             :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      56             :         // end of an allocation fits in uint32.
      57             :         //
      58             :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      59             :         // 2GB).
      60             :         maxMemTableSize = constants.MaxUint32OrInt
      61             : )
      62             : 
      63             : // TableCacheSize can be used to determine the table
      64             : // cache size for a single db, given the maximum open
      65             : // files which can be used by a table cache which is
      66             : // only used by a single db.
      67           1 : func TableCacheSize(maxOpenFiles int) int {
      68           1 :         tableCacheSize := maxOpenFiles - numNonTableCacheFiles
      69           1 :         if tableCacheSize < minTableCacheSize {
      70           1 :                 tableCacheSize = minTableCacheSize
      71           1 :         }
      72           1 :         return tableCacheSize
      73             : }
      74             : 
      75             : // Open opens a DB whose files live in the given directory.
      76           1 : func Open(dirname string, opts *Options) (db *DB, err error) {
      77           1 :         // Make a copy of the options so that we don't mutate the passed in options.
      78           1 :         opts = opts.Clone()
      79           1 :         opts = opts.EnsureDefaults()
      80           1 :         if err := opts.Validate(); err != nil {
      81           0 :                 return nil, err
      82           0 :         }
      83           1 :         if opts.LoggerAndTracer == nil {
      84           1 :                 opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
      85           1 :         } else {
      86           0 :                 opts.Logger = opts.LoggerAndTracer
      87           0 :         }
      88             : 
      89           1 :         if invariants.Sometimes(5) {
      90           1 :                 assertComparer := base.MakeAssertComparer(*opts.Comparer)
      91           1 :                 opts.Comparer = &assertComparer
      92           1 :         }
      93             : 
      94             :         // In all error cases, we return db = nil; this is used by various
      95             :         // deferred cleanups.
      96             : 
      97             :         // Open the database and WAL directories first.
      98           1 :         walDirname, dataDir, err := prepareAndOpenDirs(dirname, opts)
      99           1 :         if err != nil {
     100           0 :                 return nil, errors.Wrapf(err, "error opening database at %q", dirname)
     101           0 :         }
     102           1 :         defer func() {
     103           1 :                 if db == nil {
     104           0 :                         dataDir.Close()
     105           0 :                 }
     106             :         }()
     107             : 
     108             :         // Lock the database directory.
     109           1 :         var fileLock *Lock
     110           1 :         if opts.Lock != nil {
     111           0 :                 // The caller already acquired the database lock. Ensure that the
     112           0 :                 // directory matches.
     113           0 :                 if err := opts.Lock.pathMatches(dirname); err != nil {
     114           0 :                         return nil, err
     115           0 :                 }
     116           0 :                 if err := opts.Lock.refForOpen(); err != nil {
     117           0 :                         return nil, err
     118           0 :                 }
     119           0 :                 fileLock = opts.Lock
     120           1 :         } else {
     121           1 :                 fileLock, err = LockDirectory(dirname, opts.FS)
     122           1 :                 if err != nil {
     123           0 :                         return nil, err
     124           0 :                 }
     125             :         }
     126           1 :         defer func() {
     127           1 :                 if db == nil {
     128           0 :                         fileLock.Close()
     129           0 :                 }
     130             :         }()
     131             : 
     132             :         // List the directory contents. This also happens to include WAL log files, if
     133             :         // they are in the same dir, but we will ignore those below. The provider is
     134             :         // also given this list, but it ignores non sstable files.
     135           1 :         ls, err := opts.FS.List(dirname)
     136           1 :         if err != nil {
     137           0 :                 return nil, err
     138           0 :         }
     139             : 
     140             :         // Establish the format major version.
     141           1 :         formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
     142           1 :         if err != nil {
     143           0 :                 return nil, err
     144           0 :         }
     145           1 :         defer func() {
     146           1 :                 if db == nil {
     147           0 :                         formatVersionMarker.Close()
     148           0 :                 }
     149             :         }()
     150             : 
     151           1 :         noFormatVersionMarker := formatVersion == FormatDefault
     152           1 :         if noFormatVersionMarker {
     153           1 :                 // We will initialize the store at the minimum possible format, then upgrade
     154           1 :                 // the format to the desired one. This helps test the format upgrade code.
     155           1 :                 formatVersion = FormatMinSupported
     156           1 :                 if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
     157           1 :                         formatVersion = FormatMinForSharedObjects
     158           1 :                 }
     159             :                 // There is no format version marker file. There are three cases:
     160             :                 //  - we are trying to open an existing store that was created at
     161             :                 //    FormatMostCompatible (the only one without a version marker file)
     162             :                 //  - we are creating a new store;
     163             :                 //  - we are retrying a failed creation.
     164             :                 //
     165             :                 // To error in the first case, we set ErrorIfNotPristine.
     166           1 :                 opts.ErrorIfNotPristine = true
     167           1 :                 defer func() {
     168           1 :                         if err != nil && errors.Is(err, ErrDBNotPristine) {
     169           0 :                                 // We must be trying to open an existing store at FormatMostCompatible.
     170           0 :                                 // Correct the error in this case -we
     171           0 :                                 err = errors.Newf(
     172           0 :                                         "pebble: database %q written in format major version 1 which is no longer supported",
     173           0 :                                         dirname)
     174           0 :                         }
     175             :                 }()
     176           1 :         } else {
     177           1 :                 if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone && formatVersion < FormatMinForSharedObjects {
     178           0 :                         return nil, errors.Newf(
     179           0 :                                 "pebble: database %q configured with shared objects but written in too old format major version %d",
     180           0 :                                 formatVersion)
     181           0 :                 }
     182             :         }
     183             : 
     184             :         // Find the currently active manifest, if there is one.
     185           1 :         manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
     186           1 :         if err != nil {
     187           0 :                 return nil, errors.Wrapf(err, "pebble: database %q", dirname)
     188           0 :         }
     189           1 :         defer func() {
     190           1 :                 if db == nil {
     191           0 :                         manifestMarker.Close()
     192           0 :                 }
     193             :         }()
     194             : 
     195             :         // Atomic markers may leave behind obsolete files if there's a crash
     196             :         // mid-update. Clean these up if we're not in read-only mode.
     197           1 :         if !opts.ReadOnly {
     198           1 :                 if err := formatVersionMarker.RemoveObsolete(); err != nil {
     199           0 :                         return nil, err
     200           0 :                 }
     201           1 :                 if err := manifestMarker.RemoveObsolete(); err != nil {
     202           0 :                         return nil, err
     203           0 :                 }
     204             :         }
     205             : 
     206           1 :         if opts.Cache == nil {
     207           0 :                 opts.Cache = cache.New(cacheDefaultSize)
     208           1 :         } else {
     209           1 :                 opts.Cache.Ref()
     210           1 :         }
     211             : 
     212           1 :         d := &DB{
     213           1 :                 cacheID:             opts.Cache.NewID(),
     214           1 :                 dirname:             dirname,
     215           1 :                 opts:                opts,
     216           1 :                 cmp:                 opts.Comparer.Compare,
     217           1 :                 equal:               opts.Comparer.Equal,
     218           1 :                 merge:               opts.Merger.Merge,
     219           1 :                 split:               opts.Comparer.Split,
     220           1 :                 abbreviatedKey:      opts.Comparer.AbbreviatedKey,
     221           1 :                 largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
     222           1 :                 fileLock:            fileLock,
     223           1 :                 dataDir:             dataDir,
     224           1 :                 closed:              new(atomic.Value),
     225           1 :                 closedCh:            make(chan struct{}),
     226           1 :         }
     227           1 :         d.mu.versions = &versionSet{}
     228           1 :         d.diskAvailBytes.Store(math.MaxUint64)
     229           1 : 
     230           1 :         defer func() {
     231           1 :                 // If an error or panic occurs during open, attempt to release the manually
     232           1 :                 // allocated memory resources. Note that rather than look for an error, we
     233           1 :                 // look for the return of a nil DB pointer.
     234           1 :                 if r := recover(); db == nil {
     235           0 :                         // Release our references to the Cache. Note that both the DB, and
     236           0 :                         // tableCache have a reference. When we release the reference to
     237           0 :                         // the tableCache, and if there are no other references to
     238           0 :                         // the tableCache, then the tableCache will also release its
     239           0 :                         // reference to the cache.
     240           0 :                         opts.Cache.Unref()
     241           0 : 
     242           0 :                         if d.tableCache != nil {
     243           0 :                                 _ = d.tableCache.close()
     244           0 :                         }
     245             : 
     246           0 :                         for _, mem := range d.mu.mem.queue {
     247           0 :                                 switch t := mem.flushable.(type) {
     248           0 :                                 case *memTable:
     249           0 :                                         manual.Free(manual.MemTable, t.arenaBuf)
     250           0 :                                         t.arenaBuf = nil
     251             :                                 }
     252             :                         }
     253           0 :                         if d.cleanupManager != nil {
     254           0 :                                 d.cleanupManager.Close()
     255           0 :                         }
     256           0 :                         if d.objProvider != nil {
     257           0 :                                 d.objProvider.Close()
     258           0 :                         }
     259           0 :                         if r != nil {
     260           0 :                                 panic(r)
     261             :                         }
     262             :                 }
     263             :         }()
     264             : 
     265           1 :         d.commit = newCommitPipeline(commitEnv{
     266           1 :                 logSeqNum:     &d.mu.versions.logSeqNum,
     267           1 :                 visibleSeqNum: &d.mu.versions.visibleSeqNum,
     268           1 :                 apply:         d.commitApply,
     269           1 :                 write:         d.commitWrite,
     270           1 :         })
     271           1 :         d.mu.nextJobID = 1
     272           1 :         d.mu.mem.nextSize = opts.MemTableSize
     273           1 :         if d.mu.mem.nextSize > initialMemTableSize {
     274           1 :                 d.mu.mem.nextSize = initialMemTableSize
     275           1 :         }
     276           1 :         d.mu.compact.cond.L = &d.mu.Mutex
     277           1 :         d.mu.compact.inProgress = make(map[*compaction]struct{})
     278           1 :         d.mu.compact.noOngoingFlushStartTime = time.Now()
     279           1 :         d.mu.snapshots.init()
     280           1 :         // logSeqNum is the next sequence number that will be assigned.
     281           1 :         // Start assigning sequence numbers from base.SeqNumStart to leave
     282           1 :         // room for reserved sequence numbers (see comments around
     283           1 :         // SeqNumStart).
     284           1 :         d.mu.versions.logSeqNum.Store(base.SeqNumStart)
     285           1 :         d.mu.formatVers.vers.Store(uint64(formatVersion))
     286           1 :         d.mu.formatVers.marker = formatVersionMarker
     287           1 : 
     288           1 :         d.timeNow = time.Now
     289           1 :         d.openedAt = d.timeNow()
     290           1 : 
     291           1 :         d.mu.Lock()
     292           1 :         defer d.mu.Unlock()
     293           1 : 
     294           1 :         jobID := d.newJobIDLocked()
     295           1 : 
     296           1 :         providerSettings := objstorageprovider.Settings{
     297           1 :                 Logger:              opts.Logger,
     298           1 :                 FS:                  opts.FS,
     299           1 :                 FSDirName:           dirname,
     300           1 :                 FSDirInitialListing: ls,
     301           1 :                 FSCleaner:           opts.Cleaner,
     302           1 :                 NoSyncOnClose:       opts.NoSyncOnClose,
     303           1 :                 BytesPerSync:        opts.BytesPerSync,
     304           1 :         }
     305           1 :         providerSettings.Local.ReadaheadConfig = opts.Local.ReadaheadConfig
     306           1 :         providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
     307           1 :         providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
     308           1 :         providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
     309           1 :         providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
     310           1 : 
     311           1 :         d.objProvider, err = objstorageprovider.Open(providerSettings)
     312           1 :         if err != nil {
     313           0 :                 return nil, err
     314           0 :         }
     315             : 
     316           1 :         if !manifestExists {
     317           1 :                 // DB does not exist.
     318           1 :                 if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
     319           0 :                         return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
     320           0 :                 }
     321             : 
     322             :                 // Create the DB.
     323           1 :                 if err := d.mu.versions.create(
     324           1 :                         jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     325           0 :                         return nil, err
     326           0 :                 }
     327           1 :         } else {
     328           1 :                 if opts.ErrorIfExists {
     329           0 :                         return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
     330           0 :                 }
     331             :                 // Load the version set.
     332           1 :                 if err := d.mu.versions.load(
     333           1 :                         dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     334           0 :                         return nil, err
     335           0 :                 }
     336           1 :                 if opts.ErrorIfNotPristine {
     337           0 :                         liveFileNums := make(map[base.DiskFileNum]struct{})
     338           0 :                         d.mu.versions.addLiveFileNums(liveFileNums)
     339           0 :                         if len(liveFileNums) != 0 {
     340           0 :                                 return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
     341           0 :                         }
     342             :                 }
     343             :         }
     344             : 
     345             :         // In read-only mode, we replay directly into the mutable memtable but never
     346             :         // flush it. We need to delay creation of the memtable until we know the
     347             :         // sequence number of the first batch that will be inserted.
     348           1 :         if !d.opts.ReadOnly {
     349           1 :                 var entry *flushableEntry
     350           1 :                 d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     351           1 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     352           1 :         }
     353             : 
     354           1 :         d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     355           1 :                 Buckets: FsyncLatencyBuckets,
     356           1 :         })
     357           1 :         walOpts := wal.Options{
     358           1 :                 Primary:              wal.Dir{FS: opts.FS, Dirname: walDirname},
     359           1 :                 Secondary:            wal.Dir{},
     360           1 :                 MinUnflushedWALNum:   wal.NumWAL(d.mu.versions.minUnflushedLogNum),
     361           1 :                 MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
     362           1 :                 NoSyncOnClose:        opts.NoSyncOnClose,
     363           1 :                 BytesPerSync:         opts.WALBytesPerSync,
     364           1 :                 PreallocateSize:      d.walPreallocateSize,
     365           1 :                 MinSyncInterval:      opts.WALMinSyncInterval,
     366           1 :                 FsyncLatency:         d.mu.log.metrics.fsyncLatency,
     367           1 :                 QueueSemChan:         d.commit.logSyncQSem,
     368           1 :                 Logger:               opts.Logger,
     369           1 :                 EventListener:        walEventListenerAdaptor{l: opts.EventListener},
     370           1 :         }
     371           1 :         if opts.WALFailover != nil {
     372           1 :                 walOpts.Secondary = opts.WALFailover.Secondary
     373           1 :                 walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
     374           1 :                 walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     375           1 :                         Buckets: FsyncLatencyBuckets,
     376           1 :                 })
     377           1 :         }
     378           1 :         walDirs := append(walOpts.Dirs(), opts.WALRecoveryDirs...)
     379           1 :         wals, err := wal.Scan(walDirs...)
     380           1 :         if err != nil {
     381           0 :                 return nil, err
     382           0 :         }
     383           1 :         walManager, err := wal.Init(walOpts, wals)
     384           1 :         if err != nil {
     385           0 :                 return nil, err
     386           0 :         }
     387           1 :         defer func() {
     388           1 :                 if db == nil {
     389           0 :                         walManager.Close()
     390           0 :                 }
     391             :         }()
     392             : 
     393           1 :         d.mu.log.manager = walManager
     394           1 : 
     395           1 :         d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
     396           1 : 
     397           1 :         if manifestExists && !opts.DisableConsistencyCheck {
     398           1 :                 curVersion := d.mu.versions.currentVersion()
     399           1 :                 if err := checkConsistency(curVersion, dirname, d.objProvider); err != nil {
     400           0 :                         return nil, err
     401           0 :                 }
     402             :         }
     403             : 
     404           1 :         tableCacheSize := TableCacheSize(opts.MaxOpenFiles)
     405           1 :         d.tableCache = newTableCacheContainer(
     406           1 :                 opts.TableCache, d.cacheID, d.objProvider, d.opts, tableCacheSize,
     407           1 :                 &sstable.CategoryStatsCollector{})
     408           1 :         d.newIters = d.tableCache.newIters
     409           1 :         d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
     410           1 : 
     411           1 :         d.mu.annotators.totalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
     412           0 :                 return true
     413           0 :         })
     414           1 :         d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
     415           0 :                 meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
     416           0 :                 if err != nil {
     417           0 :                         return false
     418           0 :                 }
     419           0 :                 return meta.IsRemote()
     420             :         })
     421           1 :         d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
     422           0 :                 meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
     423           0 :                 if err != nil {
     424           0 :                         return false
     425           0 :                 }
     426           0 :                 return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
     427             :         })
     428             : 
     429           1 :         var previousOptionsFileNum base.DiskFileNum
     430           1 :         var previousOptionsFilename string
     431           1 :         for _, filename := range ls {
     432           1 :                 ft, fn, ok := base.ParseFilename(opts.FS, filename)
     433           1 :                 if !ok {
     434           1 :                         continue
     435             :                 }
     436             : 
     437             :                 // Don't reuse any obsolete file numbers to avoid modifying an
     438             :                 // ingested sstable's original external file.
     439           1 :                 d.mu.versions.markFileNumUsed(fn)
     440           1 : 
     441           1 :                 switch ft {
     442           0 :                 case fileTypeLog:
     443             :                         // Ignore.
     444           1 :                 case fileTypeOptions:
     445           1 :                         if previousOptionsFileNum < fn {
     446           1 :                                 previousOptionsFileNum = fn
     447           1 :                                 previousOptionsFilename = filename
     448           1 :                         }
     449           0 :                 case fileTypeTemp, fileTypeOldTemp:
     450           0 :                         if !d.opts.ReadOnly {
     451           0 :                                 // Some codepaths write to a temporary file and then
     452           0 :                                 // rename it to its final location when complete.  A
     453           0 :                                 // temp file is leftover if a process exits before the
     454           0 :                                 // rename.  Remove it.
     455           0 :                                 err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
     456           0 :                                 if err != nil {
     457           0 :                                         return nil, err
     458           0 :                                 }
     459             :                         }
     460             :                 }
     461             :         }
     462           1 :         if n := len(wals); n > 0 {
     463           1 :                 // Don't reuse any obsolete file numbers to avoid modifying an
     464           1 :                 // ingested sstable's original external file.
     465           1 :                 d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
     466           1 :         }
     467             : 
     468             :         // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
     469             :         // objProvider. This avoids FileNum collisions with obsolete sstables.
     470           1 :         objects := d.objProvider.List()
     471           1 :         for _, obj := range objects {
     472           1 :                 d.mu.versions.markFileNumUsed(obj.DiskFileNum)
     473           1 :         }
     474             : 
     475             :         // Validate the most-recent OPTIONS file, if there is one.
     476           1 :         if previousOptionsFilename != "" {
     477           1 :                 path := opts.FS.PathJoin(dirname, previousOptionsFilename)
     478           1 :                 previousOptions, err := readOptionsFile(opts, path)
     479           1 :                 if err != nil {
     480           0 :                         return nil, err
     481           0 :                 }
     482           1 :                 if err := opts.CheckCompatibility(previousOptions); err != nil {
     483           0 :                         return nil, err
     484           0 :                 }
     485             :         }
     486             : 
     487             :         // Replay any newer log files than the ones named in the manifest.
     488           1 :         var replayWALs wal.Logs
     489           1 :         for i, w := range wals {
     490           1 :                 if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
     491           1 :                         replayWALs = wals[i:]
     492           1 :                         break
     493             :                 }
     494             :         }
     495           1 :         var ve versionEdit
     496           1 :         var toFlush flushableList
     497           1 :         for i, lf := range replayWALs {
     498           1 :                 // WALs other than the last one would have been closed cleanly.
     499           1 :                 //
     500           1 :                 // Note: we used to never require strict WAL tails when reading from older
     501           1 :                 // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
     502           1 :                 // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
     503           1 :                 // compatible Pebble format is newer and guarantees a clean EOF.
     504           1 :                 strictWALTail := i < len(replayWALs)-1
     505           1 :                 flush, maxSeqNum, err := d.replayWAL(jobID, &ve, lf, strictWALTail)
     506           1 :                 if err != nil {
     507           0 :                         return nil, err
     508           0 :                 }
     509           1 :                 toFlush = append(toFlush, flush...)
     510           1 :                 if d.mu.versions.logSeqNum.Load() < maxSeqNum {
     511           1 :                         d.mu.versions.logSeqNum.Store(maxSeqNum)
     512           1 :                 }
     513             :         }
     514           1 :         d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
     515           1 : 
     516           1 :         if !d.opts.ReadOnly {
     517           1 :                 // Create an empty .log file.
     518           1 :                 newLogNum := d.mu.versions.getNextDiskFileNum()
     519           1 : 
     520           1 :                 // This logic is slightly different than RocksDB's. Specifically, RocksDB
     521           1 :                 // sets MinUnflushedLogNum to max-recovered-log-num + 1. We set it to the
     522           1 :                 // newLogNum. There should be no difference in using either value.
     523           1 :                 ve.MinUnflushedLogNum = newLogNum
     524           1 : 
     525           1 :                 // Create the manifest with the updated MinUnflushedLogNum before
     526           1 :                 // creating the new log file. If we created the log file first, a
     527           1 :                 // crash before the manifest is synced could leave two WALs with
     528           1 :                 // unclean tails.
     529           1 :                 d.mu.versions.logLock()
     530           1 :                 if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo {
     531           1 :                         return nil
     532           1 :                 }); err != nil {
     533           0 :                         return nil, err
     534           0 :                 }
     535             : 
     536           1 :                 for _, entry := range toFlush {
     537           1 :                         entry.readerUnrefLocked(true)
     538           1 :                 }
     539             : 
     540           1 :                 d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
     541           1 :                 if err != nil {
     542           0 :                         return nil, err
     543           0 :                 }
     544             : 
     545             :                 // This isn't strictly necessary as we don't use the log number for
     546             :                 // memtables being flushed, only for the next unflushed memtable.
     547           1 :                 d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
     548             :         }
     549           1 :         d.updateReadStateLocked(d.opts.DebugCheck)
     550           1 : 
     551           1 :         if !d.opts.ReadOnly {
     552           1 :                 // If the Options specify a format major version higher than the
     553           1 :                 // loaded database's, upgrade it. If this is a new database, this
     554           1 :                 // code path also performs an initial upgrade from the starting
     555           1 :                 // implicit MinSupported version.
     556           1 :                 //
     557           1 :                 // We ratchet the version this far into Open so that migrations have a read
     558           1 :                 // state available. Note that this also results in creating/updating the
     559           1 :                 // format version marker file.
     560           1 :                 if opts.FormatMajorVersion > d.FormatMajorVersion() {
     561           1 :                         if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
     562           0 :                                 return nil, err
     563           0 :                         }
     564           1 :                 } else if noFormatVersionMarker {
     565           1 :                         // We are creating a new store. Create the format version marker file.
     566           1 :                         if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
     567           0 :                                 return nil, err
     568           0 :                         }
     569             :                 }
     570             : 
     571             :                 // Write the current options to disk.
     572           1 :                 d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
     573           1 :                 tmpPath := base.MakeFilepath(opts.FS, dirname, fileTypeTemp, d.optionsFileNum)
     574           1 :                 optionsPath := base.MakeFilepath(opts.FS, dirname, fileTypeOptions, d.optionsFileNum)
     575           1 : 
     576           1 :                 // Write them to a temporary file first, in case we crash before
     577           1 :                 // we're done. A corrupt options file prevents opening the
     578           1 :                 // database.
     579           1 :                 optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
     580           1 :                 if err != nil {
     581           0 :                         return nil, err
     582           0 :                 }
     583           1 :                 serializedOpts := []byte(opts.String())
     584           1 :                 if _, err := optionsFile.Write(serializedOpts); err != nil {
     585           0 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     586           0 :                 }
     587           1 :                 d.optionsFileSize = uint64(len(serializedOpts))
     588           1 :                 if err := optionsFile.Sync(); err != nil {
     589           0 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     590           0 :                 }
     591           1 :                 if err := optionsFile.Close(); err != nil {
     592           0 :                         return nil, err
     593           0 :                 }
     594             :                 // Atomically rename to the OPTIONS-XXXXXX path. This rename is
     595             :                 // guaranteed to be atomic because the destination path does not
     596             :                 // exist.
     597           1 :                 if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
     598           0 :                         return nil, err
     599           0 :                 }
     600           1 :                 if err := d.dataDir.Sync(); err != nil {
     601           0 :                         return nil, err
     602           0 :                 }
     603             :         }
     604             : 
     605           1 :         if !d.opts.ReadOnly {
     606           1 :                 d.scanObsoleteFiles(ls)
     607           1 :                 d.deleteObsoleteFiles(jobID)
     608           1 :         }
     609             :         // Else, nothing is obsolete.
     610             : 
     611           1 :         d.mu.tableStats.cond.L = &d.mu.Mutex
     612           1 :         d.mu.tableValidation.cond.L = &d.mu.Mutex
     613           1 :         if !d.opts.ReadOnly {
     614           1 :                 d.maybeCollectTableStatsLocked()
     615           1 :         }
     616           1 :         d.calculateDiskAvailableBytes()
     617           1 : 
     618           1 :         d.maybeScheduleFlush()
     619           1 :         d.maybeScheduleCompaction()
     620           1 : 
     621           1 :         // Note: this is a no-op if invariants are disabled or race is enabled.
     622           1 :         //
     623           1 :         // Setting a finalizer on *DB causes *DB to never be reclaimed and the
     624           1 :         // finalizer to never be run. The problem is due to this limitation of
     625           1 :         // finalizers mention in the SetFinalizer docs:
     626           1 :         //
     627           1 :         //   If a cyclic structure includes a block with a finalizer, that cycle is
     628           1 :         //   not guaranteed to be garbage collected and the finalizer is not
     629           1 :         //   guaranteed to run, because there is no ordering that respects the
     630           1 :         //   dependencies.
     631           1 :         //
     632           1 :         // DB has cycles with several of its internal structures: readState,
     633           1 :         // newIters, tableCache, versions, etc. Each of this individually cause a
     634           1 :         // cycle and prevent the finalizer from being run. But we can workaround this
     635           1 :         // finializer limitation by setting a finalizer on another object that is
     636           1 :         // tied to the lifetime of DB: the DB.closed atomic.Value.
     637           1 :         dPtr := fmt.Sprintf("%p", d)
     638           1 :         invariants.SetFinalizer(d.closed, func(obj interface{}) {
     639           0 :                 v := obj.(*atomic.Value)
     640           0 :                 if err := v.Load(); err == nil {
     641           0 :                         fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
     642           0 :                         os.Exit(1)
     643           0 :                 }
     644             :         })
     645             : 
     646           1 :         return d, nil
     647             : }
     648             : 
     649             : // prepareAndOpenDirs opens the directories for the store (and creates them if
     650             : // necessary).
     651             : //
     652             : // Returns an error if ReadOnly is set and the directories don't exist.
     653             : func prepareAndOpenDirs(
     654             :         dirname string, opts *Options,
     655           1 : ) (walDirname string, dataDir vfs.File, err error) {
     656           1 :         walDirname = opts.WALDir
     657           1 :         if opts.WALDir == "" {
     658           1 :                 walDirname = dirname
     659           1 :         }
     660             : 
     661             :         // Create directories if needed.
     662           1 :         if !opts.ReadOnly {
     663           1 :                 f, err := mkdirAllAndSyncParents(opts.FS, dirname)
     664           1 :                 if err != nil {
     665           0 :                         return "", nil, err
     666           0 :                 }
     667           1 :                 f.Close()
     668           1 :                 if walDirname != dirname {
     669           1 :                         f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
     670           1 :                         if err != nil {
     671           0 :                                 return "", nil, err
     672           0 :                         }
     673           1 :                         f.Close()
     674             :                 }
     675           1 :                 if opts.WALFailover != nil {
     676           1 :                         secondary := opts.WALFailover.Secondary
     677           1 :                         f, err := mkdirAllAndSyncParents(secondary.FS, secondary.Dirname)
     678           1 :                         if err != nil {
     679           0 :                                 return "", nil, err
     680           0 :                         }
     681           1 :                         f.Close()
     682             :                 }
     683             :         }
     684             : 
     685           1 :         dataDir, err = opts.FS.OpenDir(dirname)
     686           1 :         if err != nil {
     687           0 :                 if opts.ReadOnly && oserror.IsNotExist(err) {
     688           0 :                         return "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
     689           0 :                 }
     690           0 :                 return "", nil, err
     691             :         }
     692           1 :         if opts.ReadOnly && walDirname != dirname {
     693           0 :                 // Check that the wal dir exists.
     694           0 :                 walDir, err := opts.FS.OpenDir(walDirname)
     695           0 :                 if err != nil {
     696           0 :                         dataDir.Close()
     697           0 :                         return "", nil, err
     698           0 :                 }
     699           0 :                 walDir.Close()
     700             :         }
     701             : 
     702           1 :         return walDirname, dataDir, nil
     703             : }
     704             : 
     705             : // GetVersion returns the engine version string from the latest options
     706             : // file present in dir. Used to check what Pebble or RocksDB version was last
     707             : // used to write to the database stored in this directory. An empty string is
     708             : // returned if no valid OPTIONS file with a version key was found.
     709           0 : func GetVersion(dir string, fs vfs.FS) (string, error) {
     710           0 :         ls, err := fs.List(dir)
     711           0 :         if err != nil {
     712           0 :                 return "", err
     713           0 :         }
     714           0 :         var version string
     715           0 :         lastOptionsSeen := base.DiskFileNum(0)
     716           0 :         for _, filename := range ls {
     717           0 :                 ft, fn, ok := base.ParseFilename(fs, filename)
     718           0 :                 if !ok {
     719           0 :                         continue
     720             :                 }
     721           0 :                 switch ft {
     722           0 :                 case fileTypeOptions:
     723           0 :                         // If this file has a higher number than the last options file
     724           0 :                         // processed, reset version. This is because rocksdb often
     725           0 :                         // writes multiple options files without deleting previous ones.
     726           0 :                         // Otherwise, skip parsing this options file.
     727           0 :                         if fn > lastOptionsSeen {
     728           0 :                                 version = ""
     729           0 :                                 lastOptionsSeen = fn
     730           0 :                         } else {
     731           0 :                                 continue
     732             :                         }
     733           0 :                         f, err := fs.Open(fs.PathJoin(dir, filename))
     734           0 :                         if err != nil {
     735           0 :                                 return "", err
     736           0 :                         }
     737           0 :                         data, err := io.ReadAll(f)
     738           0 :                         f.Close()
     739           0 : 
     740           0 :                         if err != nil {
     741           0 :                                 return "", err
     742           0 :                         }
     743           0 :                         err = parseOptions(string(data), func(section, key, value string) error {
     744           0 :                                 switch {
     745           0 :                                 case section == "Version":
     746           0 :                                         switch key {
     747           0 :                                         case "pebble_version":
     748           0 :                                                 version = value
     749           0 :                                         case "rocksdb_version":
     750           0 :                                                 version = fmt.Sprintf("rocksdb v%s", value)
     751             :                                         }
     752             :                                 }
     753           0 :                                 return nil
     754             :                         })
     755           0 :                         if err != nil {
     756           0 :                                 return "", err
     757           0 :                         }
     758             :                 }
     759             :         }
     760           0 :         return version, nil
     761             : }
     762             : 
     763             : // replayWAL replays the edits in the specified WAL. If the DB is in read
     764             : // only mode, then the WALs are replayed into memtables and not flushed. If
     765             : // the DB is not in read only mode, then the contents of the WAL are
     766             : // guaranteed to be flushed. Note that this flushing is very important for
     767             : // guaranteeing durability: the application may have had a number of pending
     768             : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
     769             : // happened but the corresponding data may now be readable from the WAL (while
     770             : // sitting in write-back caches in the kernel or the storage device). By
     771             : // reading the WAL (including the non-fsynced data) and then flushing all
     772             : // these changes (flush does fsyncs), we are able to guarantee that the
     773             : // initial state of the DB is durable.
     774             : //
     775             : // The toFlush return value is a list of flushables associated with the WAL
     776             : // being replayed which will be flushed. Once the version edit has been applied
     777             : // to the manifest, it is up to the caller of replayWAL to unreference the
     778             : // toFlush flushables returned by replayWAL.
     779             : //
     780             : // d.mu must be held when calling this, but the mutex may be dropped and
     781             : // re-acquired during the course of this method.
     782             : func (d *DB) replayWAL(
     783             :         jobID JobID, ve *versionEdit, ll wal.LogicalLog, strictWALTail bool,
     784           1 : ) (toFlush flushableList, maxSeqNum base.SeqNum, err error) {
     785           1 :         rr := ll.OpenForRead()
     786           1 :         defer rr.Close()
     787           1 :         var (
     788           1 :                 b               Batch
     789           1 :                 buf             bytes.Buffer
     790           1 :                 mem             *memTable
     791           1 :                 entry           *flushableEntry
     792           1 :                 offset          wal.Offset
     793           1 :                 lastFlushOffset int64
     794           1 :                 keysReplayed    int64 // number of keys replayed
     795           1 :                 batchesReplayed int64 // number of batches replayed
     796           1 :         )
     797           1 : 
     798           1 :         // TODO(jackson): This function is interspersed with panics, in addition to
     799           1 :         // corruption error propagation. Audit them to ensure we're truly only
     800           1 :         // panicking where the error points to Pebble bug and not user or
     801           1 :         // hardware-induced corruption.
     802           1 : 
     803           1 :         if d.opts.ReadOnly {
     804           0 :                 // In read-only mode, we replay directly into the mutable memtable which will
     805           0 :                 // never be flushed.
     806           0 :                 mem = d.mu.mem.mutable
     807           0 :                 if mem != nil {
     808           0 :                         entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
     809           0 :                 }
     810             :         }
     811             : 
     812             :         // Flushes the current memtable, if not nil.
     813           1 :         flushMem := func() {
     814           1 :                 if mem == nil {
     815           1 :                         return
     816           1 :                 }
     817           1 :                 var logSize uint64
     818           1 :                 mergedOffset := offset.Physical + offset.PreviousFilesBytes
     819           1 :                 if mergedOffset >= lastFlushOffset {
     820           1 :                         logSize = uint64(mergedOffset - lastFlushOffset)
     821           1 :                 }
     822             :                 // Else, this was the initial memtable in the read-only case which must have
     823             :                 // been empty, but we need to flush it since we don't want to add to it later.
     824           1 :                 lastFlushOffset = mergedOffset
     825           1 :                 entry.logSize = logSize
     826           1 :                 if !d.opts.ReadOnly {
     827           1 :                         toFlush = append(toFlush, entry)
     828           1 :                 }
     829           1 :                 mem, entry = nil, nil
     830             :         }
     831             :         // Creates a new memtable if there is no current memtable.
     832           1 :         ensureMem := func(seqNum base.SeqNum) {
     833           1 :                 if mem != nil {
     834           1 :                         return
     835           1 :                 }
     836           1 :                 mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
     837           1 :                 if d.opts.ReadOnly {
     838           0 :                         d.mu.mem.mutable = mem
     839           0 :                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
     840           0 :                 }
     841             :         }
     842             : 
     843             :         // updateVE is used to update ve with information about new files created
     844             :         // during the flush of any flushable not of type ingestedFlushable. For the
     845             :         // flushable of type ingestedFlushable we use custom handling below.
     846           1 :         updateVE := func() error {
     847           1 :                 // TODO(bananabrick): See if we can use the actual base level here,
     848           1 :                 // instead of using 1.
     849           1 :                 c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
     850           1 :                         1 /* base level */, toFlush, d.timeNow())
     851           1 :                 if err != nil {
     852           0 :                         return err
     853           0 :                 }
     854           1 :                 newVE, _, err := d.runCompaction(jobID, c)
     855           1 :                 if err != nil {
     856           0 :                         return errors.Wrapf(err, "running compaction during WAL replay")
     857           0 :                 }
     858           1 :                 ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...)
     859           1 :                 return nil
     860             :         }
     861           1 :         defer func() {
     862           1 :                 if err != nil {
     863           0 :                         err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
     864           0 :                 }
     865             :         }()
     866             : 
     867           1 :         for {
     868           1 :                 var r io.Reader
     869           1 :                 var err error
     870           1 :                 r, offset, err = rr.NextRecord()
     871           1 :                 if err == nil {
     872           1 :                         _, err = io.Copy(&buf, r)
     873           1 :                 }
     874           1 :                 if err != nil {
     875           1 :                         // It is common to encounter a zeroed or invalid chunk due to WAL
     876           1 :                         // preallocation and WAL recycling. We need to distinguish these
     877           1 :                         // errors from EOF in order to recognize that the record was
     878           1 :                         // truncated and to avoid replaying subsequent WALs, but want
     879           1 :                         // to otherwise treat them like EOF.
     880           1 :                         if err == io.EOF {
     881           1 :                                 break
     882           0 :                         } else if record.IsInvalidRecord(err) && !strictWALTail {
     883           0 :                                 break
     884             :                         }
     885           0 :                         return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
     886             :                 }
     887             : 
     888           1 :                 if buf.Len() < batchrepr.HeaderLen {
     889           0 :                         return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
     890           0 :                                 errors.Safe(base.DiskFileNum(ll.Num)), offset)
     891           0 :                 }
     892             : 
     893           1 :                 if d.opts.ErrorIfNotPristine {
     894           0 :                         return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
     895           0 :                 }
     896             : 
     897             :                 // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
     898             :                 // which is used below.
     899           1 :                 b = Batch{}
     900           1 :                 b.db = d
     901           1 :                 b.SetRepr(buf.Bytes())
     902           1 :                 seqNum := b.SeqNum()
     903           1 :                 maxSeqNum = seqNum + base.SeqNum(b.Count())
     904           1 :                 keysReplayed += int64(b.Count())
     905           1 :                 batchesReplayed++
     906           1 :                 {
     907           1 :                         br := b.Reader()
     908           1 :                         if kind, encodedFileNum, _, ok, err := br.Next(); err != nil {
     909           0 :                                 return nil, 0, err
     910           1 :                         } else if ok && kind == InternalKeyKindIngestSST {
     911           1 :                                 fileNums := make([]base.DiskFileNum, 0, b.Count())
     912           1 :                                 addFileNum := func(encodedFileNum []byte) {
     913           1 :                                         fileNum, n := binary.Uvarint(encodedFileNum)
     914           1 :                                         if n <= 0 {
     915           0 :                                                 panic("pebble: ingest sstable file num is invalid.")
     916             :                                         }
     917           1 :                                         fileNums = append(fileNums, base.DiskFileNum(fileNum))
     918             :                                 }
     919           1 :                                 addFileNum(encodedFileNum)
     920           1 : 
     921           1 :                                 for i := 1; i < int(b.Count()); i++ {
     922           1 :                                         kind, encodedFileNum, _, ok, err := br.Next()
     923           1 :                                         if err != nil {
     924           0 :                                                 return nil, 0, err
     925           0 :                                         }
     926           1 :                                         if kind != InternalKeyKindIngestSST {
     927           0 :                                                 panic("pebble: invalid batch key kind.")
     928             :                                         }
     929           1 :                                         if !ok {
     930           0 :                                                 panic("pebble: invalid batch count.")
     931             :                                         }
     932           1 :                                         addFileNum(encodedFileNum)
     933             :                                 }
     934             : 
     935           1 :                                 if _, _, _, ok, err := br.Next(); err != nil {
     936           0 :                                         return nil, 0, err
     937           1 :                                 } else if ok {
     938           0 :                                         panic("pebble: invalid number of entries in batch.")
     939             :                                 }
     940             : 
     941           1 :                                 meta := make([]*fileMetadata, len(fileNums))
     942           1 :                                 for i, n := range fileNums {
     943           1 :                                         var readable objstorage.Readable
     944           1 :                                         objMeta, err := d.objProvider.Lookup(fileTypeTable, n)
     945           1 :                                         if err != nil {
     946           0 :                                                 return nil, 0, errors.Wrap(err, "pebble: error when looking up ingested SSTs")
     947           0 :                                         }
     948           1 :                                         if objMeta.IsRemote() {
     949           0 :                                                 readable, err = d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true})
     950           0 :                                                 if err != nil {
     951           0 :                                                         return nil, 0, errors.Wrap(err, "pebble: error when opening flushable ingest files")
     952           0 :                                                 }
     953           1 :                                         } else {
     954           1 :                                                 path := base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n)
     955           1 :                                                 f, err := d.opts.FS.Open(path)
     956           1 :                                                 if err != nil {
     957           0 :                                                         return nil, 0, err
     958           0 :                                                 }
     959             : 
     960           1 :                                                 readable, err = sstable.NewSimpleReadable(f)
     961           1 :                                                 if err != nil {
     962           0 :                                                         return nil, 0, err
     963           0 :                                                 }
     964             :                                         }
     965             :                                         // NB: ingestLoad1 will close readable.
     966           1 :                                         meta[i], err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(), readable, d.cacheID, base.PhysicalTableFileNum(n))
     967           1 :                                         if err != nil {
     968           0 :                                                 return nil, 0, errors.Wrap(err, "pebble: error when loading flushable ingest files")
     969           0 :                                         }
     970             :                                 }
     971             : 
     972           1 :                                 if uint32(len(meta)) != b.Count() {
     973           0 :                                         panic("pebble: couldn't load all files in WAL entry.")
     974             :                                 }
     975             : 
     976           1 :                                 entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), KeyRange{})
     977           1 :                                 if err != nil {
     978           0 :                                         return nil, 0, err
     979           0 :                                 }
     980             : 
     981           1 :                                 if d.opts.ReadOnly {
     982           0 :                                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
     983           0 :                                         // We added the IngestSST flushable to the queue. But there
     984           0 :                                         // must be at least one WAL entry waiting to be replayed. We
     985           0 :                                         // have to ensure this newer WAL entry isn't replayed into
     986           0 :                                         // the current value of d.mu.mem.mutable because the current
     987           0 :                                         // mutable memtable exists before this flushable entry in
     988           0 :                                         // the memtable queue. To ensure this, we just need to unset
     989           0 :                                         // d.mu.mem.mutable. When a newer WAL is replayed, we will
     990           0 :                                         // set d.mu.mem.mutable to a newer value.
     991           0 :                                         d.mu.mem.mutable = nil
     992           1 :                                 } else {
     993           1 :                                         toFlush = append(toFlush, entry)
     994           1 :                                         // During WAL replay, the lsm only has L0, hence, the
     995           1 :                                         // baseLevel is 1. For the sake of simplicity, we place the
     996           1 :                                         // ingested files in L0 here, instead of finding their
     997           1 :                                         // target levels. This is a simplification for the sake of
     998           1 :                                         // simpler code. It is expected that WAL replay should be
     999           1 :                                         // rare, and that flushables of type ingestedFlushable
    1000           1 :                                         // should also be rare. So, placing the ingested files in L0
    1001           1 :                                         // is alright.
    1002           1 :                                         //
    1003           1 :                                         // TODO(bananabrick): Maybe refactor this function to allow
    1004           1 :                                         // us to easily place ingested files in levels as low as
    1005           1 :                                         // possible during WAL replay. It would require breaking up
    1006           1 :                                         // the application of ve to the manifest into chunks and is
    1007           1 :                                         // not pretty w/o a refactor to this function and how it's
    1008           1 :                                         // used.
    1009           1 :                                         c, err := newFlush(
    1010           1 :                                                 d.opts, d.mu.versions.currentVersion(),
    1011           1 :                                                 1, /* base level */
    1012           1 :                                                 []*flushableEntry{entry},
    1013           1 :                                                 d.timeNow(),
    1014           1 :                                         )
    1015           1 :                                         if err != nil {
    1016           0 :                                                 return nil, 0, err
    1017           0 :                                         }
    1018           1 :                                         for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
    1019           1 :                                                 ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata})
    1020           1 :                                         }
    1021             :                                 }
    1022           1 :                                 return toFlush, maxSeqNum, nil
    1023             :                         }
    1024             :                 }
    1025             : 
    1026           1 :                 if b.memTableSize >= uint64(d.largeBatchThreshold) {
    1027           1 :                         flushMem()
    1028           1 :                         // Make a copy of the data slice since it is currently owned by buf and will
    1029           1 :                         // be reused in the next iteration.
    1030           1 :                         b.data = slices.Clone(b.data)
    1031           1 :                         b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
    1032           1 :                         if err != nil {
    1033           0 :                                 return nil, 0, err
    1034           0 :                         }
    1035           1 :                         entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
    1036           1 :                         // Disable memory accounting by adding a reader ref that will never be
    1037           1 :                         // removed.
    1038           1 :                         entry.readerRefs.Add(1)
    1039           1 :                         if d.opts.ReadOnly {
    1040           0 :                                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1041           0 :                                 // We added the flushable batch to the flushable to the queue.
    1042           0 :                                 // But there must be at least one WAL entry waiting to be
    1043           0 :                                 // replayed. We have to ensure this newer WAL entry isn't
    1044           0 :                                 // replayed into the current value of d.mu.mem.mutable because
    1045           0 :                                 // the current mutable memtable exists before this flushable
    1046           0 :                                 // entry in the memtable queue. To ensure this, we just need to
    1047           0 :                                 // unset d.mu.mem.mutable. When a newer WAL is replayed, we will
    1048           0 :                                 // set d.mu.mem.mutable to a newer value.
    1049           0 :                                 d.mu.mem.mutable = nil
    1050           1 :                         } else {
    1051           1 :                                 toFlush = append(toFlush, entry)
    1052           1 :                         }
    1053           1 :                 } else {
    1054           1 :                         ensureMem(seqNum)
    1055           1 :                         if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
    1056           0 :                                 return nil, 0, err
    1057           0 :                         }
    1058             :                         // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
    1059             :                         // batch may not initially fit, but will eventually fit (since it is smaller than
    1060             :                         // largeBatchThreshold).
    1061           1 :                         for err == arenaskl.ErrArenaFull {
    1062           1 :                                 flushMem()
    1063           1 :                                 ensureMem(seqNum)
    1064           1 :                                 err = mem.prepare(&b)
    1065           1 :                                 if err != nil && err != arenaskl.ErrArenaFull {
    1066           0 :                                         return nil, 0, err
    1067           0 :                                 }
    1068             :                         }
    1069           1 :                         if err = mem.apply(&b, seqNum); err != nil {
    1070           0 :                                 return nil, 0, err
    1071           0 :                         }
    1072           1 :                         mem.writerUnref()
    1073             :                 }
    1074           1 :                 buf.Reset()
    1075             :         }
    1076             : 
    1077           1 :         d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
    1078           1 :                 jobID, base.DiskFileNum(ll.Num).String(), offset, keysReplayed, batchesReplayed)
    1079           1 :         flushMem()
    1080           1 : 
    1081           1 :         // mem is nil here.
    1082           1 :         if !d.opts.ReadOnly && batchesReplayed > 0 {
    1083           1 :                 err = updateVE()
    1084           1 :                 if err != nil {
    1085           0 :                         return nil, 0, err
    1086           0 :                 }
    1087             :         }
    1088           1 :         return toFlush, maxSeqNum, err
    1089             : }
    1090             : 
    1091           1 : func readOptionsFile(opts *Options, path string) (string, error) {
    1092           1 :         f, err := opts.FS.Open(path)
    1093           1 :         if err != nil {
    1094           0 :                 return "", err
    1095           0 :         }
    1096           1 :         defer f.Close()
    1097           1 : 
    1098           1 :         data, err := io.ReadAll(f)
    1099           1 :         if err != nil {
    1100           0 :                 return "", err
    1101           0 :         }
    1102           1 :         return string(data), nil
    1103             : }
    1104             : 
    1105             : // DBDesc briefly describes high-level state about a database.
    1106             : type DBDesc struct {
    1107             :         // Exists is true if an existing database was found.
    1108             :         Exists bool
    1109             :         // FormatMajorVersion indicates the database's current format
    1110             :         // version.
    1111             :         FormatMajorVersion FormatMajorVersion
    1112             :         // ManifestFilename is the filename of the current active manifest,
    1113             :         // if the database exists.
    1114             :         ManifestFilename string
    1115             :         // OptionsFilename is the filename of the most recent OPTIONS file, if it
    1116             :         // exists.
    1117             :         OptionsFilename string
    1118             : }
    1119             : 
    1120             : // String implements fmt.Stringer.
    1121           0 : func (d *DBDesc) String() string {
    1122           0 :         if !d.Exists {
    1123           0 :                 return "uninitialized"
    1124           0 :         }
    1125           0 :         var buf bytes.Buffer
    1126           0 :         fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
    1127           0 :         fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
    1128           0 :         fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
    1129           0 :         return buf.String()
    1130             : }
    1131             : 
    1132             : // Peek looks for an existing database in dirname on the provided FS. It
    1133             : // returns a brief description of the database. Peek is read-only and
    1134             : // does not open the database
    1135           0 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
    1136           0 :         ls, err := fs.List(dirname)
    1137           0 :         if err != nil {
    1138           0 :                 return nil, err
    1139           0 :         }
    1140             : 
    1141           0 :         vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
    1142           0 :         if err != nil {
    1143           0 :                 return nil, err
    1144           0 :         }
    1145             :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1146             :         // PeekMarker variant that avoids opening the directory.
    1147           0 :         if err := versMarker.Close(); err != nil {
    1148           0 :                 return nil, err
    1149           0 :         }
    1150             : 
    1151             :         // Find the currently active manifest, if there is one.
    1152           0 :         manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
    1153           0 :         if err != nil {
    1154           0 :                 return nil, err
    1155           0 :         }
    1156             :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1157             :         // PeekMarker variant that avoids opening the directory.
    1158           0 :         if err := manifestMarker.Close(); err != nil {
    1159           0 :                 return nil, err
    1160           0 :         }
    1161             : 
    1162           0 :         desc := &DBDesc{
    1163           0 :                 Exists:             exists,
    1164           0 :                 FormatMajorVersion: vers,
    1165           0 :         }
    1166           0 : 
    1167           0 :         // Find the OPTIONS file with the highest file number within the list of
    1168           0 :         // directory entries.
    1169           0 :         var previousOptionsFileNum base.DiskFileNum
    1170           0 :         for _, filename := range ls {
    1171           0 :                 ft, fn, ok := base.ParseFilename(fs, filename)
    1172           0 :                 if !ok || ft != fileTypeOptions || fn < previousOptionsFileNum {
    1173           0 :                         continue
    1174             :                 }
    1175           0 :                 previousOptionsFileNum = fn
    1176           0 :                 desc.OptionsFilename = fs.PathJoin(dirname, filename)
    1177             :         }
    1178             : 
    1179           0 :         if exists {
    1180           0 :                 desc.ManifestFilename = base.MakeFilepath(fs, dirname, fileTypeManifest, manifestFileNum)
    1181           0 :         }
    1182           0 :         return desc, nil
    1183             : }
    1184             : 
    1185             : // LockDirectory acquires the database directory lock in the named directory,
    1186             : // preventing another process from opening the database. LockDirectory returns a
    1187             : // handle to the held lock that may be passed to Open through Options.Lock to
    1188             : // subsequently open the database, skipping lock acquistion during Open.
    1189             : //
    1190             : // LockDirectory may be used to expand the critical section protected by the
    1191             : // database lock to include setup before the call to Open.
    1192           1 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
    1193           1 :         fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, fileTypeLock, base.DiskFileNum(0)))
    1194           1 :         if err != nil {
    1195           0 :                 return nil, err
    1196           0 :         }
    1197           1 :         l := &Lock{dirname: dirname, fileLock: fileLock}
    1198           1 :         l.refs.Store(1)
    1199           1 :         invariants.SetFinalizer(l, func(obj interface{}) {
    1200           1 :                 if refs := obj.(*Lock).refs.Load(); refs > 0 {
    1201           0 :                         panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
    1202             :                 }
    1203             :         })
    1204           1 :         return l, nil
    1205             : }
    1206             : 
    1207             : // Lock represents a file lock on a directory. It may be passed to Open through
    1208             : // Options.Lock to elide lock aquisition during Open.
    1209             : type Lock struct {
    1210             :         dirname  string
    1211             :         fileLock io.Closer
    1212             :         // refs is a count of the number of handles on the lock. refs must be 0, 1
    1213             :         // or 2.
    1214             :         //
    1215             :         // When acquired by the client and passed to Open, refs = 1 and the Open
    1216             :         // call increments it to 2. When the database is closed, it's decremented to
    1217             :         // 1. Finally when the original caller, calls Close on the Lock, it's
    1218             :         // drecemented to zero and the underlying file lock is released.
    1219             :         //
    1220             :         // When Open acquires the file lock, refs remains at 1 until the database is
    1221             :         // closed.
    1222             :         refs atomic.Int32
    1223             : }
    1224             : 
    1225           0 : func (l *Lock) refForOpen() error {
    1226           0 :         // During Open, when a user passed in a lock, the reference count must be
    1227           0 :         // exactly 1. If it's zero, the lock is no longer held and is invalid. If
    1228           0 :         // it's 2, the lock is already in use by another database within the
    1229           0 :         // process.
    1230           0 :         if !l.refs.CompareAndSwap(1, 2) {
    1231           0 :                 return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
    1232           0 :         }
    1233           0 :         return nil
    1234             : }
    1235             : 
    1236             : // Close releases the lock, permitting another process to lock and open the
    1237             : // database. Close must not be called until after a database using the Lock has
    1238             : // been closed.
    1239           1 : func (l *Lock) Close() error {
    1240           1 :         if l.refs.Add(-1) > 0 {
    1241           0 :                 return nil
    1242           0 :         }
    1243           1 :         defer func() { l.fileLock = nil }()
    1244           1 :         return l.fileLock.Close()
    1245             : }
    1246             : 
    1247           0 : func (l *Lock) pathMatches(dirname string) error {
    1248           0 :         if dirname == l.dirname {
    1249           0 :                 return nil
    1250           0 :         }
    1251             :         // Check for relative paths, symlinks, etc. This isn't ideal because we're
    1252             :         // circumventing the vfs.FS interface here.
    1253             :         //
    1254             :         // TODO(jackson): We could add support for retrieving file inodes through Stat
    1255             :         // calls in the VFS interface on platforms where it's available and use that
    1256             :         // to differentiate.
    1257           0 :         dirStat, err1 := os.Stat(dirname)
    1258           0 :         lockDirStat, err2 := os.Stat(l.dirname)
    1259           0 :         if err1 == nil && err2 == nil && os.SameFile(dirStat, lockDirStat) {
    1260           0 :                 return nil
    1261           0 :         }
    1262           0 :         return errors.Join(
    1263           0 :                 errors.Newf("pebble: opts.Lock acquired in %q not %q", l.dirname, dirname),
    1264           0 :                 err1, err2)
    1265             : }
    1266             : 
    1267             : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
    1268             : // does not exist.
    1269             : //
    1270             : // Note that errors can be wrapped with more details; use errors.Is().
    1271             : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
    1272             : 
    1273             : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
    1274             : // already exists.
    1275             : //
    1276             : // Note that errors can be wrapped with more details; use errors.Is().
    1277             : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
    1278             : 
    1279             : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
    1280             : // already exists and is not pristine.
    1281             : //
    1282             : // Note that errors can be wrapped with more details; use errors.Is().
    1283             : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
    1284             : 
    1285             : // IsCorruptionError returns true if the given error indicates database
    1286             : // corruption.
    1287           0 : func IsCorruptionError(err error) bool {
    1288           0 :         return errors.Is(err, base.ErrCorruption)
    1289           0 : }
    1290             : 
    1291           1 : func checkConsistency(v *manifest.Version, dirname string, objProvider objstorage.Provider) error {
    1292           1 :         var errs []error
    1293           1 :         dedup := make(map[base.DiskFileNum]struct{})
    1294           1 :         for level, files := range v.Levels {
    1295           1 :                 iter := files.Iter()
    1296           1 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1297           1 :                         backingState := f.FileBacking
    1298           1 :                         if _, ok := dedup[backingState.DiskFileNum]; ok {
    1299           1 :                                 continue
    1300             :                         }
    1301           1 :                         dedup[backingState.DiskFileNum] = struct{}{}
    1302           1 :                         fileNum := backingState.DiskFileNum
    1303           1 :                         fileSize := backingState.Size
    1304           1 :                         // We skip over remote objects; those are instead checked asynchronously
    1305           1 :                         // by the table stats loading job.
    1306           1 :                         meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
    1307           1 :                         var size int64
    1308           1 :                         if err == nil {
    1309           1 :                                 if meta.IsRemote() {
    1310           1 :                                         continue
    1311             :                                 }
    1312           1 :                                 size, err = objProvider.Size(meta)
    1313             :                         }
    1314           1 :                         if err != nil {
    1315           0 :                                 errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
    1316           0 :                                 continue
    1317             :                         }
    1318             : 
    1319           1 :                         if size != int64(fileSize) {
    1320           0 :                                 errs = append(errs, errors.Errorf(
    1321           0 :                                         "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
    1322           0 :                                         errors.Safe(level), fileNum, objProvider.Path(meta),
    1323           0 :                                         errors.Safe(size), errors.Safe(fileSize)))
    1324           0 :                                 continue
    1325             :                         }
    1326             :                 }
    1327             :         }
    1328           1 :         return errors.Join(errs...)
    1329             : }
    1330             : 
    1331             : type walEventListenerAdaptor struct {
    1332             :         l *EventListener
    1333             : }
    1334             : 
    1335           1 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
    1336           1 :         // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
    1337           1 :         // is insufficient to infer whether primary or secondary.
    1338           1 :         wci := WALCreateInfo{
    1339           1 :                 JobID:           ci.JobID,
    1340           1 :                 Path:            ci.Path,
    1341           1 :                 FileNum:         base.DiskFileNum(ci.Num),
    1342           1 :                 RecycledFileNum: ci.RecycledFileNum,
    1343           1 :                 Err:             ci.Err,
    1344           1 :         }
    1345           1 :         l.l.WALCreated(wci)
    1346           1 : }

Generated by: LCOV version 1.14