LCOV - code coverage report
Current view: top level - pebble - open.go (source / functions) Hit Total Coverage
Test: 2024-12-11 08:18Z ab9741a3 - tests + meta.lcov Lines: 799 898 89.0 %
Date: 2024-12-11 08:19:09 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "io"
      13             :         "math"
      14             :         "os"
      15             :         "slices"
      16             :         "sync/atomic"
      17             :         "time"
      18             : 
      19             :         "github.com/cockroachdb/crlib/crtime"
      20             :         "github.com/cockroachdb/errors"
      21             :         "github.com/cockroachdb/errors/oserror"
      22             :         "github.com/cockroachdb/pebble/batchrepr"
      23             :         "github.com/cockroachdb/pebble/internal/arenaskl"
      24             :         "github.com/cockroachdb/pebble/internal/base"
      25             :         "github.com/cockroachdb/pebble/internal/cache"
      26             :         "github.com/cockroachdb/pebble/internal/constants"
      27             :         "github.com/cockroachdb/pebble/internal/invariants"
      28             :         "github.com/cockroachdb/pebble/internal/keyspan"
      29             :         "github.com/cockroachdb/pebble/internal/manifest"
      30             :         "github.com/cockroachdb/pebble/internal/manual"
      31             :         "github.com/cockroachdb/pebble/objstorage"
      32             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      33             :         "github.com/cockroachdb/pebble/objstorage/remote"
      34             :         "github.com/cockroachdb/pebble/record"
      35             :         "github.com/cockroachdb/pebble/sstable"
      36             :         "github.com/cockroachdb/pebble/vfs"
      37             :         "github.com/cockroachdb/pebble/wal"
      38             :         "github.com/prometheus/client_golang/prometheus"
      39             : )
      40             : 
      41             : const (
      42             :         initialMemTableSize = 256 << 10 // 256 KB
      43             : 
      44             :         // The max batch size is limited by the uint32 offsets stored in
      45             :         // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
      46             :         //
      47             :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      48             :         // end of an allocation fits in uint32.
      49             :         //
      50             :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      51             :         // 2GB).
      52             :         maxBatchSize = constants.MaxUint32OrInt
      53             : 
      54             :         // The max memtable size is limited by the uint32 offsets stored in
      55             :         // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
      56             :         //
      57             :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      58             :         // end of an allocation fits in uint32.
      59             :         //
      60             :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      61             :         // 2GB).
      62             :         maxMemTableSize = constants.MaxUint32OrInt
      63             : )
      64             : 
      65             : // FileCacheSize can be used to determine the file
      66             : // cache size for a single db, given the maximum open
      67             : // files which can be used by a file cache which is
      68             : // only used by a single db.
      69           2 : func FileCacheSize(maxOpenFiles int) int {
      70           2 :         fileCacheSize := maxOpenFiles - numNonFileCacheFiles
      71           2 :         if fileCacheSize < minFileCacheSize {
      72           1 :                 fileCacheSize = minFileCacheSize
      73           1 :         }
      74           2 :         return fileCacheSize
      75             : }
      76             : 
      77             : // Open opens a DB whose files live in the given directory.
      78           2 : func Open(dirname string, opts *Options) (db *DB, err error) {
      79           2 :         // Make a copy of the options so that we don't mutate the passed in options.
      80           2 :         opts = opts.Clone()
      81           2 :         opts = opts.EnsureDefaults()
      82           2 :         if err := opts.Validate(); err != nil {
      83           0 :                 return nil, err
      84           0 :         }
      85           2 :         if opts.LoggerAndTracer == nil {
      86           2 :                 opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
      87           2 :         } else {
      88           1 :                 opts.Logger = opts.LoggerAndTracer
      89           1 :         }
      90             : 
      91           2 :         if invariants.Sometimes(5) {
      92           2 :                 assertComparer := base.MakeAssertComparer(*opts.Comparer)
      93           2 :                 opts.Comparer = &assertComparer
      94           2 :         }
      95             : 
      96             :         // In all error cases, we return db = nil; this is used by various
      97             :         // deferred cleanups.
      98             : 
      99             :         // Open the database and WAL directories first.
     100           2 :         walDirname, dataDir, err := prepareAndOpenDirs(dirname, opts)
     101           2 :         if err != nil {
     102           1 :                 return nil, errors.Wrapf(err, "error opening database at %q", dirname)
     103           1 :         }
     104           2 :         defer func() {
     105           2 :                 if db == nil {
     106           1 :                         dataDir.Close()
     107           1 :                 }
     108             :         }()
     109             : 
     110             :         // Lock the database directory.
     111           2 :         var fileLock *Lock
     112           2 :         if opts.Lock != nil {
     113           1 :                 // The caller already acquired the database lock. Ensure that the
     114           1 :                 // directory matches.
     115           1 :                 if err := opts.Lock.pathMatches(dirname); err != nil {
     116           0 :                         return nil, err
     117           0 :                 }
     118           1 :                 if err := opts.Lock.refForOpen(); err != nil {
     119           1 :                         return nil, err
     120           1 :                 }
     121           1 :                 fileLock = opts.Lock
     122           2 :         } else {
     123           2 :                 fileLock, err = LockDirectory(dirname, opts.FS)
     124           2 :                 if err != nil {
     125           1 :                         return nil, err
     126           1 :                 }
     127             :         }
     128           2 :         defer func() {
     129           2 :                 if db == nil {
     130           1 :                         fileLock.Close()
     131           1 :                 }
     132             :         }()
     133             : 
     134             :         // List the directory contents. This also happens to include WAL log files, if
     135             :         // they are in the same dir, but we will ignore those below. The provider is
     136             :         // also given this list, but it ignores non sstable files.
     137           2 :         ls, err := opts.FS.List(dirname)
     138           2 :         if err != nil {
     139           1 :                 return nil, err
     140           1 :         }
     141             : 
     142             :         // Establish the format major version.
     143           2 :         formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
     144           2 :         if err != nil {
     145           1 :                 return nil, err
     146           1 :         }
     147           2 :         defer func() {
     148           2 :                 if db == nil {
     149           1 :                         formatVersionMarker.Close()
     150           1 :                 }
     151             :         }()
     152             : 
     153           2 :         noFormatVersionMarker := formatVersion == FormatDefault
     154           2 :         if noFormatVersionMarker {
     155           2 :                 // We will initialize the store at the minimum possible format, then upgrade
     156           2 :                 // the format to the desired one. This helps test the format upgrade code.
     157           2 :                 formatVersion = FormatMinSupported
     158           2 :                 if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
     159           2 :                         formatVersion = FormatMinForSharedObjects
     160           2 :                 }
     161             :                 // There is no format version marker file. There are three cases:
     162             :                 //  - we are trying to open an existing store that was created at
     163             :                 //    FormatMostCompatible (the only one without a version marker file)
     164             :                 //  - we are creating a new store;
     165             :                 //  - we are retrying a failed creation.
     166             :                 //
     167             :                 // To error in the first case, we set ErrorIfNotPristine.
     168           2 :                 opts.ErrorIfNotPristine = true
     169           2 :                 defer func() {
     170           2 :                         if err != nil && errors.Is(err, ErrDBNotPristine) {
     171           0 :                                 // We must be trying to open an existing store at FormatMostCompatible.
     172           0 :                                 // Correct the error in this case -we
     173           0 :                                 err = errors.Newf(
     174           0 :                                         "pebble: database %q written in format major version 1 which is no longer supported",
     175           0 :                                         dirname)
     176           0 :                         }
     177             :                 }()
     178           2 :         } else {
     179           2 :                 if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone && formatVersion < FormatMinForSharedObjects {
     180           0 :                         return nil, errors.Newf(
     181           0 :                                 "pebble: database %q configured with shared objects but written in too old format major version %d",
     182           0 :                                 dirname, formatVersion)
     183           0 :                 }
     184             :         }
     185             : 
     186             :         // Find the currently active manifest, if there is one.
     187           2 :         manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
     188           2 :         if err != nil {
     189           1 :                 return nil, errors.Wrapf(err, "pebble: database %q", dirname)
     190           1 :         }
     191           2 :         defer func() {
     192           2 :                 if db == nil {
     193           1 :                         manifestMarker.Close()
     194           1 :                 }
     195             :         }()
     196             : 
     197             :         // Atomic markers may leave behind obsolete files if there's a crash
     198             :         // mid-update. Clean these up if we're not in read-only mode.
     199           2 :         if !opts.ReadOnly {
     200           2 :                 if err := formatVersionMarker.RemoveObsolete(); err != nil {
     201           0 :                         return nil, err
     202           0 :                 }
     203           2 :                 if err := manifestMarker.RemoveObsolete(); err != nil {
     204           0 :                         return nil, err
     205           0 :                 }
     206             :         }
     207             : 
     208           2 :         if opts.Cache == nil {
     209           1 :                 opts.Cache = cache.New(cacheDefaultSize)
     210           2 :         } else {
     211           2 :                 opts.Cache.Ref()
     212           2 :         }
     213             : 
     214           2 :         d := &DB{
     215           2 :                 cacheID:             opts.Cache.NewID(),
     216           2 :                 dirname:             dirname,
     217           2 :                 opts:                opts,
     218           2 :                 cmp:                 opts.Comparer.Compare,
     219           2 :                 equal:               opts.Comparer.Equal,
     220           2 :                 merge:               opts.Merger.Merge,
     221           2 :                 split:               opts.Comparer.Split,
     222           2 :                 abbreviatedKey:      opts.Comparer.AbbreviatedKey,
     223           2 :                 largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
     224           2 :                 fileLock:            fileLock,
     225           2 :                 dataDir:             dataDir,
     226           2 :                 closed:              new(atomic.Value),
     227           2 :                 closedCh:            make(chan struct{}),
     228           2 :         }
     229           2 :         d.mu.versions = &versionSet{}
     230           2 :         d.diskAvailBytes.Store(math.MaxUint64)
     231           2 : 
     232           2 :         defer func() {
     233           2 :                 // If an error or panic occurs during open, attempt to release the manually
     234           2 :                 // allocated memory resources. Note that rather than look for an error, we
     235           2 :                 // look for the return of a nil DB pointer.
     236           2 :                 if r := recover(); db == nil {
     237           1 :                         // If there's an unused, recycled memtable, we need to release its memory.
     238           1 :                         if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
     239           1 :                                 d.freeMemTable(obsoleteMemTable)
     240           1 :                         }
     241             : 
     242             :                         // Release our references to the Cache. Note that both the DB, and
     243             :                         // fileCache have a reference. When we release the reference to
     244             :                         // the fileCache, and if there are no other references to
     245             :                         // the fileCache, then the fileCache will also release its
     246             :                         // reference to the cache.
     247           1 :                         opts.Cache.Unref()
     248           1 : 
     249           1 :                         if d.fileCache != nil {
     250           1 :                                 _ = d.fileCache.close()
     251           1 :                         }
     252             : 
     253           1 :                         for _, mem := range d.mu.mem.queue {
     254           1 :                                 switch t := mem.flushable.(type) {
     255           1 :                                 case *memTable:
     256           1 :                                         manual.Free(manual.MemTable, t.arenaBuf)
     257           1 :                                         t.arenaBuf = nil
     258             :                                 }
     259             :                         }
     260           1 :                         if d.cleanupManager != nil {
     261           1 :                                 d.cleanupManager.Close()
     262           1 :                         }
     263           1 :                         if d.objProvider != nil {
     264           1 :                                 d.objProvider.Close()
     265           1 :                         }
     266           1 :                         if r != nil {
     267           1 :                                 panic(r)
     268             :                         }
     269             :                 }
     270             :         }()
     271             : 
     272           2 :         d.commit = newCommitPipeline(commitEnv{
     273           2 :                 logSeqNum:     &d.mu.versions.logSeqNum,
     274           2 :                 visibleSeqNum: &d.mu.versions.visibleSeqNum,
     275           2 :                 apply:         d.commitApply,
     276           2 :                 write:         d.commitWrite,
     277           2 :         })
     278           2 :         d.mu.nextJobID = 1
     279           2 :         d.mu.mem.nextSize = opts.MemTableSize
     280           2 :         if d.mu.mem.nextSize > initialMemTableSize {
     281           2 :                 d.mu.mem.nextSize = initialMemTableSize
     282           2 :         }
     283           2 :         d.mu.compact.cond.L = &d.mu.Mutex
     284           2 :         d.mu.compact.inProgress = make(map[*compaction]struct{})
     285           2 :         d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
     286           2 :         d.mu.snapshots.init()
     287           2 :         // logSeqNum is the next sequence number that will be assigned.
     288           2 :         // Start assigning sequence numbers from base.SeqNumStart to leave
     289           2 :         // room for reserved sequence numbers (see comments around
     290           2 :         // SeqNumStart).
     291           2 :         d.mu.versions.logSeqNum.Store(base.SeqNumStart)
     292           2 :         d.mu.formatVers.vers.Store(uint64(formatVersion))
     293           2 :         d.mu.formatVers.marker = formatVersionMarker
     294           2 : 
     295           2 :         d.timeNow = time.Now
     296           2 :         d.openedAt = d.timeNow()
     297           2 : 
     298           2 :         d.mu.Lock()
     299           2 :         defer d.mu.Unlock()
     300           2 : 
     301           2 :         jobID := d.newJobIDLocked()
     302           2 : 
     303           2 :         providerSettings := objstorageprovider.Settings{
     304           2 :                 Logger:              opts.Logger,
     305           2 :                 FS:                  opts.FS,
     306           2 :                 FSDirName:           dirname,
     307           2 :                 FSDirInitialListing: ls,
     308           2 :                 FSCleaner:           opts.Cleaner,
     309           2 :                 NoSyncOnClose:       opts.NoSyncOnClose,
     310           2 :                 BytesPerSync:        opts.BytesPerSync,
     311           2 :         }
     312           2 :         providerSettings.Local.ReadaheadConfig = opts.Local.ReadaheadConfig
     313           2 :         providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
     314           2 :         providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
     315           2 :         providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
     316           2 :         providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
     317           2 : 
     318           2 :         d.objProvider, err = objstorageprovider.Open(providerSettings)
     319           2 :         if err != nil {
     320           1 :                 return nil, err
     321           1 :         }
     322             : 
     323           2 :         if !manifestExists {
     324           2 :                 // DB does not exist.
     325           2 :                 if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
     326           1 :                         return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
     327           1 :                 }
     328             : 
     329             :                 // Create the DB.
     330           2 :                 if err := d.mu.versions.create(
     331           2 :                         jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     332           1 :                         return nil, err
     333           1 :                 }
     334           2 :         } else {
     335           2 :                 if opts.ErrorIfExists {
     336           1 :                         return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
     337           1 :                 }
     338             :                 // Load the version set.
     339           2 :                 if err := d.mu.versions.load(
     340           2 :                         dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     341           1 :                         return nil, err
     342           1 :                 }
     343           2 :                 if opts.ErrorIfNotPristine {
     344           1 :                         liveFileNums := make(map[base.DiskFileNum]struct{})
     345           1 :                         d.mu.versions.addLiveFileNums(liveFileNums)
     346           1 :                         if len(liveFileNums) != 0 {
     347           1 :                                 return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
     348           1 :                         }
     349             :                 }
     350             :         }
     351             : 
     352             :         // In read-only mode, we replay directly into the mutable memtable but never
     353             :         // flush it. We need to delay creation of the memtable until we know the
     354             :         // sequence number of the first batch that will be inserted.
     355           2 :         if !d.opts.ReadOnly {
     356           2 :                 var entry *flushableEntry
     357           2 :                 d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     358           2 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     359           2 :         }
     360             : 
     361           2 :         d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     362           2 :                 Buckets: FsyncLatencyBuckets,
     363           2 :         })
     364           2 :         walOpts := wal.Options{
     365           2 :                 Primary:              wal.Dir{FS: opts.FS, Dirname: walDirname},
     366           2 :                 Secondary:            wal.Dir{},
     367           2 :                 MinUnflushedWALNum:   wal.NumWAL(d.mu.versions.minUnflushedLogNum),
     368           2 :                 MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
     369           2 :                 NoSyncOnClose:        opts.NoSyncOnClose,
     370           2 :                 BytesPerSync:         opts.WALBytesPerSync,
     371           2 :                 PreallocateSize:      d.walPreallocateSize,
     372           2 :                 MinSyncInterval:      opts.WALMinSyncInterval,
     373           2 :                 FsyncLatency:         d.mu.log.metrics.fsyncLatency,
     374           2 :                 QueueSemChan:         d.commit.logSyncQSem,
     375           2 :                 Logger:               opts.Logger,
     376           2 :                 EventListener:        walEventListenerAdaptor{l: opts.EventListener},
     377           2 :         }
     378           2 :         if opts.WALFailover != nil {
     379           2 :                 walOpts.Secondary = opts.WALFailover.Secondary
     380           2 :                 walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
     381           2 :                 walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     382           2 :                         Buckets: FsyncLatencyBuckets,
     383           2 :                 })
     384           2 :         }
     385           2 :         walDirs := append(walOpts.Dirs(), opts.WALRecoveryDirs...)
     386           2 :         wals, err := wal.Scan(walDirs...)
     387           2 :         if err != nil {
     388           1 :                 return nil, err
     389           1 :         }
     390           2 :         walManager, err := wal.Init(walOpts, wals)
     391           2 :         if err != nil {
     392           1 :                 return nil, err
     393           1 :         }
     394           2 :         defer func() {
     395           2 :                 if db == nil {
     396           1 :                         walManager.Close()
     397           1 :                 }
     398             :         }()
     399             : 
     400           2 :         d.mu.log.manager = walManager
     401           2 : 
     402           2 :         d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
     403           2 : 
     404           2 :         if manifestExists && !opts.DisableConsistencyCheck {
     405           2 :                 curVersion := d.mu.versions.currentVersion()
     406           2 :                 if err := checkConsistency(curVersion, dirname, d.objProvider); err != nil {
     407           1 :                         return nil, err
     408           1 :                 }
     409             :         }
     410             : 
     411           2 :         fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
     412           2 :         d.fileCache = newFileCacheContainer(
     413           2 :                 opts.FileCache, d.cacheID, d.objProvider, d.opts, fileCacheSize,
     414           2 :                 &sstable.CategoryStatsCollector{})
     415           2 :         d.newIters = d.fileCache.newIters
     416           2 :         d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
     417           2 : 
     418           2 :         d.mu.annotators.totalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
     419           1 :                 return true
     420           1 :         })
     421           2 :         d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
     422           1 :                 meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
     423           1 :                 if err != nil {
     424           0 :                         return false
     425           0 :                 }
     426           1 :                 return meta.IsRemote()
     427             :         })
     428           2 :         d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
     429           1 :                 meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
     430           1 :                 if err != nil {
     431           0 :                         return false
     432           0 :                 }
     433           1 :                 return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
     434             :         })
     435             : 
     436           2 :         var previousOptionsFileNum base.DiskFileNum
     437           2 :         var previousOptionsFilename string
     438           2 :         for _, filename := range ls {
     439           2 :                 ft, fn, ok := base.ParseFilename(opts.FS, filename)
     440           2 :                 if !ok {
     441           2 :                         continue
     442             :                 }
     443             : 
     444             :                 // Don't reuse any obsolete file numbers to avoid modifying an
     445             :                 // ingested sstable's original external file.
     446           2 :                 d.mu.versions.markFileNumUsed(fn)
     447           2 : 
     448           2 :                 switch ft {
     449           0 :                 case fileTypeLog:
     450             :                         // Ignore.
     451           2 :                 case fileTypeOptions:
     452           2 :                         if previousOptionsFileNum < fn {
     453           2 :                                 previousOptionsFileNum = fn
     454           2 :                                 previousOptionsFilename = filename
     455           2 :                         }
     456           1 :                 case fileTypeTemp, fileTypeOldTemp:
     457           1 :                         if !d.opts.ReadOnly {
     458           1 :                                 // Some codepaths write to a temporary file and then
     459           1 :                                 // rename it to its final location when complete.  A
     460           1 :                                 // temp file is leftover if a process exits before the
     461           1 :                                 // rename.  Remove it.
     462           1 :                                 err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
     463           1 :                                 if err != nil {
     464           0 :                                         return nil, err
     465           0 :                                 }
     466             :                         }
     467             :                 }
     468             :         }
     469           2 :         if n := len(wals); n > 0 {
     470           2 :                 // Don't reuse any obsolete file numbers to avoid modifying an
     471           2 :                 // ingested sstable's original external file.
     472           2 :                 d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
     473           2 :         }
     474             : 
     475             :         // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
     476             :         // objProvider. This avoids FileNum collisions with obsolete sstables.
     477           2 :         objects := d.objProvider.List()
     478           2 :         for _, obj := range objects {
     479           2 :                 d.mu.versions.markFileNumUsed(obj.DiskFileNum)
     480           2 :         }
     481             : 
     482             :         // Validate the most-recent OPTIONS file, if there is one.
     483           2 :         if previousOptionsFilename != "" {
     484           2 :                 path := opts.FS.PathJoin(dirname, previousOptionsFilename)
     485           2 :                 previousOptions, err := readOptionsFile(opts, path)
     486           2 :                 if err != nil {
     487           0 :                         return nil, err
     488           0 :                 }
     489           2 :                 if err := opts.CheckCompatibility(previousOptions); err != nil {
     490           1 :                         return nil, err
     491           1 :                 }
     492             :         }
     493             : 
     494             :         // Replay any newer log files than the ones named in the manifest.
     495           2 :         var replayWALs wal.Logs
     496           2 :         for i, w := range wals {
     497           2 :                 if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
     498           2 :                         replayWALs = wals[i:]
     499           2 :                         break
     500             :                 }
     501             :         }
     502           2 :         var flushableIngests []*ingestedFlushable
     503           2 :         for i, lf := range replayWALs {
     504           2 :                 // WALs other than the last one would have been closed cleanly.
     505           2 :                 //
     506           2 :                 // Note: we used to never require strict WAL tails when reading from older
     507           2 :                 // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
     508           2 :                 // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
     509           2 :                 // compatible Pebble format is newer and guarantees a clean EOF.
     510           2 :                 strictWALTail := i < len(replayWALs)-1
     511           2 :                 fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
     512           2 :                 if err != nil {
     513           1 :                         return nil, err
     514           1 :                 }
     515           2 :                 if len(fi) > 0 {
     516           2 :                         flushableIngests = append(flushableIngests, fi...)
     517           2 :                 }
     518           2 :                 if d.mu.versions.logSeqNum.Load() < maxSeqNum {
     519           2 :                         d.mu.versions.logSeqNum.Store(maxSeqNum)
     520           2 :                 }
     521             :         }
     522           2 :         if d.mu.mem.mutable == nil {
     523           2 :                 // Recreate the mutable memtable if replayWAL got rid of it.
     524           2 :                 var entry *flushableEntry
     525           2 :                 d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     526           2 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     527           2 :         }
     528           2 :         d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
     529           2 : 
     530           2 :         if !d.opts.ReadOnly {
     531           2 :                 d.maybeScheduleFlush()
     532           2 :                 for d.mu.compact.flushing {
     533           2 :                         d.mu.compact.cond.Wait()
     534           2 :                 }
     535             : 
     536             :                 // Create an empty .log file for the mutable memtable.
     537           2 :                 newLogNum := d.mu.versions.getNextDiskFileNum()
     538           2 :                 d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
     539           2 :                 if err != nil {
     540           1 :                         return nil, err
     541           1 :                 }
     542             : 
     543             :                 // This isn't strictly necessary as we don't use the log number for
     544             :                 // memtables being flushed, only for the next unflushed memtable.
     545           2 :                 d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
     546             :         }
     547           2 :         d.updateReadStateLocked(d.opts.DebugCheck)
     548           2 : 
     549           2 :         if !d.opts.ReadOnly {
     550           2 :                 // If the Options specify a format major version higher than the
     551           2 :                 // loaded database's, upgrade it. If this is a new database, this
     552           2 :                 // code path also performs an initial upgrade from the starting
     553           2 :                 // implicit MinSupported version.
     554           2 :                 //
     555           2 :                 // We ratchet the version this far into Open so that migrations have a read
     556           2 :                 // state available. Note that this also results in creating/updating the
     557           2 :                 // format version marker file.
     558           2 :                 if opts.FormatMajorVersion > d.FormatMajorVersion() {
     559           2 :                         if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
     560           0 :                                 return nil, err
     561           0 :                         }
     562           2 :                 } else if noFormatVersionMarker {
     563           2 :                         // We are creating a new store. Create the format version marker file.
     564           2 :                         if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
     565           1 :                                 return nil, err
     566           1 :                         }
     567             :                 }
     568             : 
     569             :                 // Write the current options to disk.
     570           2 :                 d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
     571           2 :                 tmpPath := base.MakeFilepath(opts.FS, dirname, fileTypeTemp, d.optionsFileNum)
     572           2 :                 optionsPath := base.MakeFilepath(opts.FS, dirname, fileTypeOptions, d.optionsFileNum)
     573           2 : 
     574           2 :                 // Write them to a temporary file first, in case we crash before
     575           2 :                 // we're done. A corrupt options file prevents opening the
     576           2 :                 // database.
     577           2 :                 optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
     578           2 :                 if err != nil {
     579           1 :                         return nil, err
     580           1 :                 }
     581           2 :                 serializedOpts := []byte(opts.String())
     582           2 :                 if _, err := optionsFile.Write(serializedOpts); err != nil {
     583           1 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     584           1 :                 }
     585           2 :                 d.optionsFileSize = uint64(len(serializedOpts))
     586           2 :                 if err := optionsFile.Sync(); err != nil {
     587           1 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     588           1 :                 }
     589           2 :                 if err := optionsFile.Close(); err != nil {
     590           0 :                         return nil, err
     591           0 :                 }
     592             :                 // Atomically rename to the OPTIONS-XXXXXX path. This rename is
     593             :                 // guaranteed to be atomic because the destination path does not
     594             :                 // exist.
     595           2 :                 if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
     596           1 :                         return nil, err
     597           1 :                 }
     598           2 :                 if err := d.dataDir.Sync(); err != nil {
     599           1 :                         return nil, err
     600           1 :                 }
     601             :         }
     602             : 
     603           2 :         if !d.opts.ReadOnly {
     604           2 :                 // Get a fresh list of files, in case some of the earlier flushes/compactions
     605           2 :                 // have deleted some files.
     606           2 :                 ls, err := opts.FS.List(dirname)
     607           2 :                 if err != nil {
     608           1 :                         return nil, err
     609           1 :                 }
     610           2 :                 d.scanObsoleteFiles(ls, flushableIngests)
     611           2 :                 d.deleteObsoleteFiles(jobID)
     612             :         }
     613             :         // Else, nothing is obsolete.
     614             : 
     615           2 :         d.mu.tableStats.cond.L = &d.mu.Mutex
     616           2 :         d.mu.tableValidation.cond.L = &d.mu.Mutex
     617           2 :         if !d.opts.ReadOnly {
     618           2 :                 d.maybeCollectTableStatsLocked()
     619           2 :         }
     620           2 :         d.calculateDiskAvailableBytes()
     621           2 : 
     622           2 :         d.maybeScheduleFlush()
     623           2 :         d.maybeScheduleCompaction()
     624           2 : 
     625           2 :         // Note: this is a no-op if invariants are disabled or race is enabled.
     626           2 :         //
     627           2 :         // Setting a finalizer on *DB causes *DB to never be reclaimed and the
     628           2 :         // finalizer to never be run. The problem is due to this limitation of
     629           2 :         // finalizers mention in the SetFinalizer docs:
     630           2 :         //
     631           2 :         //   If a cyclic structure includes a block with a finalizer, that cycle is
     632           2 :         //   not guaranteed to be garbage collected and the finalizer is not
     633           2 :         //   guaranteed to run, because there is no ordering that respects the
     634           2 :         //   dependencies.
     635           2 :         //
     636           2 :         // DB has cycles with several of its internal structures: readState,
     637           2 :         // newIters, fileCache, versions, etc. Each of this individually cause a
     638           2 :         // cycle and prevent the finalizer from being run. But we can workaround this
     639           2 :         // finializer limitation by setting a finalizer on another object that is
     640           2 :         // tied to the lifetime of DB: the DB.closed atomic.Value.
     641           2 :         dPtr := fmt.Sprintf("%p", d)
     642           2 :         invariants.SetFinalizer(d.closed, func(obj interface{}) {
     643           0 :                 v := obj.(*atomic.Value)
     644           0 :                 if err := v.Load(); err == nil {
     645           0 :                         fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
     646           0 :                         os.Exit(1)
     647           0 :                 }
     648             :         })
     649             : 
     650           2 :         return d, nil
     651             : }
     652             : 
     653             : // prepareAndOpenDirs opens the directories for the store (and creates them if
     654             : // necessary).
     655             : //
     656             : // Returns an error if ReadOnly is set and the directories don't exist.
     657             : func prepareAndOpenDirs(
     658             :         dirname string, opts *Options,
     659           2 : ) (walDirname string, dataDir vfs.File, err error) {
     660           2 :         walDirname = opts.WALDir
     661           2 :         if opts.WALDir == "" {
     662           2 :                 walDirname = dirname
     663           2 :         }
     664             : 
     665             :         // Create directories if needed.
     666           2 :         if !opts.ReadOnly {
     667           2 :                 f, err := mkdirAllAndSyncParents(opts.FS, dirname)
     668           2 :                 if err != nil {
     669           1 :                         return "", nil, err
     670           1 :                 }
     671           2 :                 f.Close()
     672           2 :                 if walDirname != dirname {
     673           2 :                         f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
     674           2 :                         if err != nil {
     675           0 :                                 return "", nil, err
     676           0 :                         }
     677           2 :                         f.Close()
     678             :                 }
     679           2 :                 if opts.WALFailover != nil {
     680           2 :                         secondary := opts.WALFailover.Secondary
     681           2 :                         f, err := mkdirAllAndSyncParents(secondary.FS, secondary.Dirname)
     682           2 :                         if err != nil {
     683           0 :                                 return "", nil, err
     684           0 :                         }
     685           2 :                         f.Close()
     686             :                 }
     687             :         }
     688             : 
     689           2 :         dataDir, err = opts.FS.OpenDir(dirname)
     690           2 :         if err != nil {
     691           1 :                 if opts.ReadOnly && oserror.IsNotExist(err) {
     692           1 :                         return "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
     693           1 :                 }
     694           1 :                 return "", nil, err
     695             :         }
     696           2 :         if opts.ReadOnly && walDirname != dirname {
     697           1 :                 // Check that the wal dir exists.
     698           1 :                 walDir, err := opts.FS.OpenDir(walDirname)
     699           1 :                 if err != nil {
     700           1 :                         dataDir.Close()
     701           1 :                         return "", nil, err
     702           1 :                 }
     703           1 :                 walDir.Close()
     704             :         }
     705             : 
     706           2 :         return walDirname, dataDir, nil
     707             : }
     708             : 
     709             : // GetVersion returns the engine version string from the latest options
     710             : // file present in dir. Used to check what Pebble or RocksDB version was last
     711             : // used to write to the database stored in this directory. An empty string is
     712             : // returned if no valid OPTIONS file with a version key was found.
     713           1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
     714           1 :         ls, err := fs.List(dir)
     715           1 :         if err != nil {
     716           0 :                 return "", err
     717           0 :         }
     718           1 :         var version string
     719           1 :         lastOptionsSeen := base.DiskFileNum(0)
     720           1 :         for _, filename := range ls {
     721           1 :                 ft, fn, ok := base.ParseFilename(fs, filename)
     722           1 :                 if !ok {
     723           1 :                         continue
     724             :                 }
     725           1 :                 switch ft {
     726           1 :                 case fileTypeOptions:
     727           1 :                         // If this file has a higher number than the last options file
     728           1 :                         // processed, reset version. This is because rocksdb often
     729           1 :                         // writes multiple options files without deleting previous ones.
     730           1 :                         // Otherwise, skip parsing this options file.
     731           1 :                         if fn > lastOptionsSeen {
     732           1 :                                 version = ""
     733           1 :                                 lastOptionsSeen = fn
     734           1 :                         } else {
     735           1 :                                 continue
     736             :                         }
     737           1 :                         f, err := fs.Open(fs.PathJoin(dir, filename))
     738           1 :                         if err != nil {
     739           0 :                                 return "", err
     740           0 :                         }
     741           1 :                         data, err := io.ReadAll(f)
     742           1 :                         f.Close()
     743           1 : 
     744           1 :                         if err != nil {
     745           0 :                                 return "", err
     746           0 :                         }
     747           1 :                         err = parseOptions(string(data), parseOptionsFuncs{
     748           1 :                                 visitKeyValue: func(i, j int, section, key, value string) error {
     749           1 :                                         switch {
     750           1 :                                         case section == "Version":
     751           1 :                                                 switch key {
     752           1 :                                                 case "pebble_version":
     753           1 :                                                         version = value
     754           1 :                                                 case "rocksdb_version":
     755           1 :                                                         version = fmt.Sprintf("rocksdb v%s", value)
     756             :                                                 }
     757             :                                         }
     758           1 :                                         return nil
     759             :                                 },
     760             :                         })
     761           1 :                         if err != nil {
     762           0 :                                 return "", err
     763           0 :                         }
     764             :                 }
     765             :         }
     766           1 :         return version, nil
     767             : }
     768             : 
     769             : func (d *DB) replayIngestedFlushable(
     770             :         b *Batch, logNum base.DiskFileNum,
     771           2 : ) (entry *flushableEntry, err error) {
     772           2 :         br := b.Reader()
     773           2 :         seqNum := b.SeqNum()
     774           2 : 
     775           2 :         fileNums := make([]base.DiskFileNum, 0, b.Count())
     776           2 :         var exciseSpan KeyRange
     777           2 :         addFileNum := func(encodedFileNum []byte) {
     778           2 :                 fileNum, n := binary.Uvarint(encodedFileNum)
     779           2 :                 if n <= 0 {
     780           0 :                         panic("pebble: ingest sstable file num is invalid")
     781             :                 }
     782           2 :                 fileNums = append(fileNums, base.DiskFileNum(fileNum))
     783             :         }
     784             : 
     785           2 :         for i := 0; i < int(b.Count()); i++ {
     786           2 :                 kind, key, val, ok, err := br.Next()
     787           2 :                 if err != nil {
     788           0 :                         return nil, err
     789           0 :                 }
     790           2 :                 if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
     791           0 :                         panic("pebble: invalid batch key kind")
     792             :                 }
     793           2 :                 if !ok {
     794           0 :                         panic("pebble: invalid batch count")
     795             :                 }
     796           2 :                 if kind == base.InternalKeyKindExcise {
     797           2 :                         if exciseSpan.Valid() {
     798           0 :                                 panic("pebble: multiple excise spans in a single batch")
     799             :                         }
     800           2 :                         exciseSpan.Start = slices.Clone(key)
     801           2 :                         exciseSpan.End = slices.Clone(val)
     802           2 :                         continue
     803             :                 }
     804           2 :                 addFileNum(key)
     805             :         }
     806             : 
     807           2 :         if _, _, _, ok, err := br.Next(); err != nil {
     808           0 :                 return nil, err
     809           2 :         } else if ok {
     810           0 :                 panic("pebble: invalid number of entries in batch")
     811             :         }
     812             : 
     813           2 :         meta := make([]*fileMetadata, len(fileNums))
     814           2 :         var lastRangeKey keyspan.Span
     815           2 :         for i, n := range fileNums {
     816           2 :                 readable, err := d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true})
     817           2 :                 if err != nil {
     818           0 :                         return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
     819           0 :                 }
     820             :                 // NB: ingestLoad1 will close readable.
     821           2 :                 meta[i], lastRangeKey, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
     822           2 :                         readable, d.cacheID, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
     823           2 :                 if err != nil {
     824           0 :                         return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
     825           0 :                 }
     826             :         }
     827           2 :         if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
     828           0 :                 return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
     829           0 :                         d.opts.Comparer.FormatKey(lastRangeKey.End))
     830           0 :         }
     831             : 
     832           2 :         numFiles := len(meta)
     833           2 :         if exciseSpan.Valid() {
     834           2 :                 numFiles++
     835           2 :         }
     836           2 :         if uint32(numFiles) != b.Count() {
     837           0 :                 panic("pebble: couldn't load all files in WAL entry")
     838             :         }
     839             : 
     840           2 :         return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
     841             : }
     842             : 
     843             : // replayWAL replays the edits in the specified WAL. If the DB is in read
     844             : // only mode, then the WALs are replayed into memtables and not flushed. If
     845             : // the DB is not in read only mode, then the contents of the WAL are
     846             : // guaranteed to be flushed when a flush is scheduled after this method is run.
     847             : // Note that this flushing is very important for guaranteeing durability:
     848             : // the application may have had a number of pending
     849             : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
     850             : // happened but the corresponding data may now be readable from the WAL (while
     851             : // sitting in write-back caches in the kernel or the storage device). By
     852             : // reading the WAL (including the non-fsynced data) and then flushing all
     853             : // these changes (flush does fsyncs), we are able to guarantee that the
     854             : // initial state of the DB is durable.
     855             : //
     856             : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
     857             : // WALs into the flushable queue. Flushing of the queue is expected to be handled
     858             : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
     859             : //
     860             : // d.mu must be held when calling this, but the mutex may be dropped and
     861             : // re-acquired during the course of this method.
     862             : func (d *DB) replayWAL(
     863             :         jobID JobID, ll wal.LogicalLog, strictWALTail bool,
     864           2 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
     865           2 :         rr := ll.OpenForRead()
     866           2 :         defer rr.Close()
     867           2 :         var (
     868           2 :                 b               Batch
     869           2 :                 buf             bytes.Buffer
     870           2 :                 mem             *memTable
     871           2 :                 entry           *flushableEntry
     872           2 :                 offset          wal.Offset
     873           2 :                 lastFlushOffset int64
     874           2 :                 keysReplayed    int64 // number of keys replayed
     875           2 :                 batchesReplayed int64 // number of batches replayed
     876           2 :         )
     877           2 : 
     878           2 :         // TODO(jackson): This function is interspersed with panics, in addition to
     879           2 :         // corruption error propagation. Audit them to ensure we're truly only
     880           2 :         // panicking where the error points to Pebble bug and not user or
     881           2 :         // hardware-induced corruption.
     882           2 : 
     883           2 :         // "Flushes" (ie. closes off) the current memtable, if not nil.
     884           2 :         flushMem := func() {
     885           2 :                 if mem == nil {
     886           2 :                         return
     887           2 :                 }
     888           2 :                 mem.writerUnref()
     889           2 :                 if d.mu.mem.mutable == mem {
     890           2 :                         d.mu.mem.mutable = nil
     891           2 :                 }
     892           2 :                 entry.flushForced = !d.opts.ReadOnly
     893           2 :                 var logSize uint64
     894           2 :                 mergedOffset := offset.Physical + offset.PreviousFilesBytes
     895           2 :                 if mergedOffset >= lastFlushOffset {
     896           2 :                         logSize = uint64(mergedOffset - lastFlushOffset)
     897           2 :                 }
     898             :                 // Else, this was the initial memtable in the read-only case which must have
     899             :                 // been empty, but we need to flush it since we don't want to add to it later.
     900           2 :                 lastFlushOffset = mergedOffset
     901           2 :                 entry.logSize = logSize
     902           2 :                 mem, entry = nil, nil
     903             :         }
     904             : 
     905           2 :         mem = d.mu.mem.mutable
     906           2 :         if mem != nil {
     907           2 :                 entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
     908           2 :                 if !d.opts.ReadOnly {
     909           2 :                         flushMem()
     910           2 :                 }
     911             :         }
     912             : 
     913             :         // Creates a new memtable if there is no current memtable.
     914           2 :         ensureMem := func(seqNum base.SeqNum) {
     915           2 :                 if mem != nil {
     916           2 :                         return
     917           2 :                 }
     918           2 :                 mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
     919           2 :                 d.mu.mem.mutable = mem
     920           2 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     921             :         }
     922             : 
     923           2 :         defer func() {
     924           2 :                 if err != nil {
     925           1 :                         err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
     926           1 :                 }
     927             :         }()
     928             : 
     929           2 :         for {
     930           2 :                 var r io.Reader
     931           2 :                 var err error
     932           2 :                 r, offset, err = rr.NextRecord()
     933           2 :                 if err == nil {
     934           2 :                         _, err = io.Copy(&buf, r)
     935           2 :                 }
     936           2 :                 if err != nil {
     937           2 :                         // It is common to encounter a zeroed or invalid chunk due to WAL
     938           2 :                         // preallocation and WAL recycling. We need to distinguish these
     939           2 :                         // errors from EOF in order to recognize that the record was
     940           2 :                         // truncated and to avoid replaying subsequent WALs, but want
     941           2 :                         // to otherwise treat them like EOF.
     942           2 :                         if err == io.EOF {
     943           2 :                                 break
     944           1 :                         } else if record.IsInvalidRecord(err) && !strictWALTail {
     945           1 :                                 break
     946             :                         }
     947           1 :                         return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
     948             :                 }
     949             : 
     950           2 :                 if buf.Len() < batchrepr.HeaderLen {
     951           0 :                         return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
     952           0 :                                 errors.Safe(base.DiskFileNum(ll.Num)), offset)
     953           0 :                 }
     954             : 
     955           2 :                 if d.opts.ErrorIfNotPristine {
     956           1 :                         return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
     957           1 :                 }
     958             : 
     959             :                 // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
     960             :                 // which is used below.
     961           2 :                 b = Batch{}
     962           2 :                 b.db = d
     963           2 :                 b.SetRepr(buf.Bytes())
     964           2 :                 seqNum := b.SeqNum()
     965           2 :                 maxSeqNum = seqNum + base.SeqNum(b.Count())
     966           2 :                 keysReplayed += int64(b.Count())
     967           2 :                 batchesReplayed++
     968           2 :                 {
     969           2 :                         br := b.Reader()
     970           2 :                         if kind, _, _, ok, err := br.Next(); err != nil {
     971           0 :                                 return nil, 0, err
     972           2 :                         } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
     973           2 :                                 // We're in the flushable ingests (+ possibly excises) case.
     974           2 :                                 //
     975           2 :                                 // Ingests require an up-to-date view of the LSM to determine the target
     976           2 :                                 // level of ingested sstables, and to accurately compute excises. Instead of
     977           2 :                                 // doing an ingest in this function, we just enqueue a flushable ingest
     978           2 :                                 // in the flushables queue and run a regular flush.
     979           2 :                                 flushMem()
     980           2 :                                 // mem is nil here.
     981           2 :                                 entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
     982           2 :                                 if err != nil {
     983           0 :                                         return nil, 0, err
     984           0 :                                 }
     985           2 :                                 fi := entry.flushable.(*ingestedFlushable)
     986           2 :                                 flushableIngests = append(flushableIngests, fi)
     987           2 :                                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     988           2 :                                 // A flushable ingest is always followed by a WAL rotation.
     989           2 :                                 break
     990             :                         }
     991             :                 }
     992             : 
     993           2 :                 if b.memTableSize >= uint64(d.largeBatchThreshold) {
     994           2 :                         flushMem()
     995           2 :                         // Make a copy of the data slice since it is currently owned by buf and will
     996           2 :                         // be reused in the next iteration.
     997           2 :                         b.data = slices.Clone(b.data)
     998           2 :                         b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
     999           2 :                         if err != nil {
    1000           0 :                                 return nil, 0, err
    1001           0 :                         }
    1002           2 :                         entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
    1003           2 :                         // Disable memory accounting by adding a reader ref that will never be
    1004           2 :                         // removed.
    1005           2 :                         entry.readerRefs.Add(1)
    1006           2 :                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1007           2 :                 } else {
    1008           2 :                         ensureMem(seqNum)
    1009           2 :                         if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
    1010           0 :                                 return nil, 0, err
    1011           0 :                         }
    1012             :                         // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
    1013             :                         // batch may not initially fit, but will eventually fit (since it is smaller than
    1014             :                         // largeBatchThreshold).
    1015           2 :                         for err == arenaskl.ErrArenaFull {
    1016           2 :                                 flushMem()
    1017           2 :                                 ensureMem(seqNum)
    1018           2 :                                 err = mem.prepare(&b)
    1019           2 :                                 if err != nil && err != arenaskl.ErrArenaFull {
    1020           0 :                                         return nil, 0, err
    1021           0 :                                 }
    1022             :                         }
    1023           2 :                         if err = mem.apply(&b, seqNum); err != nil {
    1024           0 :                                 return nil, 0, err
    1025           0 :                         }
    1026           2 :                         mem.writerUnref()
    1027             :                 }
    1028           2 :                 buf.Reset()
    1029             :         }
    1030             : 
    1031           2 :         d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
    1032           2 :                 jobID, ll.String(), offset, keysReplayed, batchesReplayed)
    1033           2 :         if !d.opts.ReadOnly {
    1034           2 :                 flushMem()
    1035           2 :         }
    1036             : 
    1037             :         // mem is nil here, if !ReadOnly.
    1038           2 :         return flushableIngests, maxSeqNum, err
    1039             : }
    1040             : 
    1041           2 : func readOptionsFile(opts *Options, path string) (string, error) {
    1042           2 :         f, err := opts.FS.Open(path)
    1043           2 :         if err != nil {
    1044           0 :                 return "", err
    1045           0 :         }
    1046           2 :         defer f.Close()
    1047           2 : 
    1048           2 :         data, err := io.ReadAll(f)
    1049           2 :         if err != nil {
    1050           0 :                 return "", err
    1051           0 :         }
    1052           2 :         return string(data), nil
    1053             : }
    1054             : 
    1055             : // DBDesc briefly describes high-level state about a database.
    1056             : type DBDesc struct {
    1057             :         // Exists is true if an existing database was found.
    1058             :         Exists bool
    1059             :         // FormatMajorVersion indicates the database's current format
    1060             :         // version.
    1061             :         FormatMajorVersion FormatMajorVersion
    1062             :         // ManifestFilename is the filename of the current active manifest,
    1063             :         // if the database exists.
    1064             :         ManifestFilename string
    1065             :         // OptionsFilename is the filename of the most recent OPTIONS file, if it
    1066             :         // exists.
    1067             :         OptionsFilename string
    1068             : }
    1069             : 
    1070             : // String implements fmt.Stringer.
    1071           1 : func (d *DBDesc) String() string {
    1072           1 :         if !d.Exists {
    1073           1 :                 return "uninitialized"
    1074           1 :         }
    1075           1 :         var buf bytes.Buffer
    1076           1 :         fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
    1077           1 :         fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
    1078           1 :         fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
    1079           1 :         return buf.String()
    1080             : }
    1081             : 
    1082             : // Peek looks for an existing database in dirname on the provided FS. It
    1083             : // returns a brief description of the database. Peek is read-only and
    1084             : // does not open the database
    1085           1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
    1086           1 :         ls, err := fs.List(dirname)
    1087           1 :         if err != nil {
    1088           1 :                 return nil, err
    1089           1 :         }
    1090             : 
    1091           1 :         vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
    1092           1 :         if err != nil {
    1093           0 :                 return nil, err
    1094           0 :         }
    1095             :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1096             :         // PeekMarker variant that avoids opening the directory.
    1097           1 :         if err := versMarker.Close(); err != nil {
    1098           0 :                 return nil, err
    1099           0 :         }
    1100             : 
    1101             :         // Find the currently active manifest, if there is one.
    1102           1 :         manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
    1103           1 :         if err != nil {
    1104           0 :                 return nil, err
    1105           0 :         }
    1106             :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1107             :         // PeekMarker variant that avoids opening the directory.
    1108           1 :         if err := manifestMarker.Close(); err != nil {
    1109           0 :                 return nil, err
    1110           0 :         }
    1111             : 
    1112           1 :         desc := &DBDesc{
    1113           1 :                 Exists:             exists,
    1114           1 :                 FormatMajorVersion: vers,
    1115           1 :         }
    1116           1 : 
    1117           1 :         // Find the OPTIONS file with the highest file number within the list of
    1118           1 :         // directory entries.
    1119           1 :         var previousOptionsFileNum base.DiskFileNum
    1120           1 :         for _, filename := range ls {
    1121           1 :                 ft, fn, ok := base.ParseFilename(fs, filename)
    1122           1 :                 if !ok || ft != fileTypeOptions || fn < previousOptionsFileNum {
    1123           1 :                         continue
    1124             :                 }
    1125           1 :                 previousOptionsFileNum = fn
    1126           1 :                 desc.OptionsFilename = fs.PathJoin(dirname, filename)
    1127             :         }
    1128             : 
    1129           1 :         if exists {
    1130           1 :                 desc.ManifestFilename = base.MakeFilepath(fs, dirname, fileTypeManifest, manifestFileNum)
    1131           1 :         }
    1132           1 :         return desc, nil
    1133             : }
    1134             : 
    1135             : // LockDirectory acquires the database directory lock in the named directory,
    1136             : // preventing another process from opening the database. LockDirectory returns a
    1137             : // handle to the held lock that may be passed to Open through Options.Lock to
    1138             : // subsequently open the database, skipping lock acquistion during Open.
    1139             : //
    1140             : // LockDirectory may be used to expand the critical section protected by the
    1141             : // database lock to include setup before the call to Open.
    1142           2 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
    1143           2 :         fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, fileTypeLock, base.DiskFileNum(0)))
    1144           2 :         if err != nil {
    1145           1 :                 return nil, err
    1146           1 :         }
    1147           2 :         l := &Lock{dirname: dirname, fileLock: fileLock}
    1148           2 :         l.refs.Store(1)
    1149           2 :         invariants.SetFinalizer(l, func(obj interface{}) {
    1150           2 :                 if refs := obj.(*Lock).refs.Load(); refs > 0 {
    1151           0 :                         panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
    1152             :                 }
    1153             :         })
    1154           2 :         return l, nil
    1155             : }
    1156             : 
    1157             : // Lock represents a file lock on a directory. It may be passed to Open through
    1158             : // Options.Lock to elide lock aquisition during Open.
    1159             : type Lock struct {
    1160             :         dirname  string
    1161             :         fileLock io.Closer
    1162             :         // refs is a count of the number of handles on the lock. refs must be 0, 1
    1163             :         // or 2.
    1164             :         //
    1165             :         // When acquired by the client and passed to Open, refs = 1 and the Open
    1166             :         // call increments it to 2. When the database is closed, it's decremented to
    1167             :         // 1. Finally when the original caller, calls Close on the Lock, it's
    1168             :         // drecemented to zero and the underlying file lock is released.
    1169             :         //
    1170             :         // When Open acquires the file lock, refs remains at 1 until the database is
    1171             :         // closed.
    1172             :         refs atomic.Int32
    1173             : }
    1174             : 
    1175           1 : func (l *Lock) refForOpen() error {
    1176           1 :         // During Open, when a user passed in a lock, the reference count must be
    1177           1 :         // exactly 1. If it's zero, the lock is no longer held and is invalid. If
    1178           1 :         // it's 2, the lock is already in use by another database within the
    1179           1 :         // process.
    1180           1 :         if !l.refs.CompareAndSwap(1, 2) {
    1181           1 :                 return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
    1182           1 :         }
    1183           1 :         return nil
    1184             : }
    1185             : 
    1186             : // Close releases the lock, permitting another process to lock and open the
    1187             : // database. Close must not be called until after a database using the Lock has
    1188             : // been closed.
    1189           2 : func (l *Lock) Close() error {
    1190           2 :         if l.refs.Add(-1) > 0 {
    1191           1 :                 return nil
    1192           1 :         }
    1193           2 :         defer func() { l.fileLock = nil }()
    1194           2 :         return l.fileLock.Close()
    1195             : }
    1196             : 
    1197           1 : func (l *Lock) pathMatches(dirname string) error {
    1198           1 :         if dirname == l.dirname {
    1199           1 :                 return nil
    1200           1 :         }
    1201             :         // Check for relative paths, symlinks, etc. This isn't ideal because we're
    1202             :         // circumventing the vfs.FS interface here.
    1203             :         //
    1204             :         // TODO(jackson): We could add support for retrieving file inodes through Stat
    1205             :         // calls in the VFS interface on platforms where it's available and use that
    1206             :         // to differentiate.
    1207           1 :         dirStat, err1 := os.Stat(dirname)
    1208           1 :         lockDirStat, err2 := os.Stat(l.dirname)
    1209           1 :         if err1 == nil && err2 == nil && os.SameFile(dirStat, lockDirStat) {
    1210           1 :                 return nil
    1211           1 :         }
    1212           0 :         return errors.Join(
    1213           0 :                 errors.Newf("pebble: opts.Lock acquired in %q not %q", l.dirname, dirname),
    1214           0 :                 err1, err2)
    1215             : }
    1216             : 
    1217             : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
    1218             : // does not exist.
    1219             : //
    1220             : // Note that errors can be wrapped with more details; use errors.Is().
    1221             : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
    1222             : 
    1223             : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
    1224             : // already exists.
    1225             : //
    1226             : // Note that errors can be wrapped with more details; use errors.Is().
    1227             : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
    1228             : 
    1229             : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
    1230             : // already exists and is not pristine.
    1231             : //
    1232             : // Note that errors can be wrapped with more details; use errors.Is().
    1233             : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
    1234             : 
    1235             : // IsCorruptionError returns true if the given error indicates database
    1236             : // corruption.
    1237           1 : func IsCorruptionError(err error) bool {
    1238           1 :         return errors.Is(err, base.ErrCorruption)
    1239           1 : }
    1240             : 
    1241           2 : func checkConsistency(v *manifest.Version, dirname string, objProvider objstorage.Provider) error {
    1242           2 :         var errs []error
    1243           2 :         dedup := make(map[base.DiskFileNum]struct{})
    1244           2 :         for level, files := range v.Levels {
    1245           2 :                 iter := files.Iter()
    1246           2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1247           2 :                         backingState := f.FileBacking
    1248           2 :                         if _, ok := dedup[backingState.DiskFileNum]; ok {
    1249           2 :                                 continue
    1250             :                         }
    1251           2 :                         dedup[backingState.DiskFileNum] = struct{}{}
    1252           2 :                         fileNum := backingState.DiskFileNum
    1253           2 :                         fileSize := backingState.Size
    1254           2 :                         // We skip over remote objects; those are instead checked asynchronously
    1255           2 :                         // by the table stats loading job.
    1256           2 :                         meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
    1257           2 :                         var size int64
    1258           2 :                         if err == nil {
    1259           2 :                                 if meta.IsRemote() {
    1260           2 :                                         continue
    1261             :                                 }
    1262           2 :                                 size, err = objProvider.Size(meta)
    1263             :                         }
    1264           2 :                         if err != nil {
    1265           1 :                                 errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
    1266           1 :                                 continue
    1267             :                         }
    1268             : 
    1269           2 :                         if size != int64(fileSize) {
    1270           0 :                                 errs = append(errs, errors.Errorf(
    1271           0 :                                         "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
    1272           0 :                                         errors.Safe(level), fileNum, objProvider.Path(meta),
    1273           0 :                                         errors.Safe(size), errors.Safe(fileSize)))
    1274           0 :                                 continue
    1275             :                         }
    1276             :                 }
    1277             :         }
    1278           2 :         return errors.Join(errs...)
    1279             : }
    1280             : 
    1281             : type walEventListenerAdaptor struct {
    1282             :         l *EventListener
    1283             : }
    1284             : 
    1285           2 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
    1286           2 :         // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
    1287           2 :         // is insufficient to infer whether primary or secondary.
    1288           2 :         wci := WALCreateInfo{
    1289           2 :                 JobID:           ci.JobID,
    1290           2 :                 Path:            ci.Path,
    1291           2 :                 FileNum:         base.DiskFileNum(ci.Num),
    1292           2 :                 RecycledFileNum: ci.RecycledFileNum,
    1293           2 :                 Err:             ci.Err,
    1294           2 :         }
    1295           2 :         l.l.WALCreated(wci)
    1296           2 : }

Generated by: LCOV version 1.14