LCOV - code coverage report
Current view: top level - pebble - open.go (source / functions) Coverage Total Hit
Test: 2025-02-09 08:17Z 12f37e44 - tests + meta.lcov Lines: 88.9 % 903 803
Test Date: 2025-02-09 08:18:31 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "context"
      10              :         "encoding/binary"
      11              :         "fmt"
      12              :         "io"
      13              :         "math"
      14              :         "os"
      15              :         "slices"
      16              :         "sync/atomic"
      17              :         "time"
      18              : 
      19              :         "github.com/cockroachdb/crlib/crtime"
      20              :         "github.com/cockroachdb/errors"
      21              :         "github.com/cockroachdb/errors/oserror"
      22              :         "github.com/cockroachdb/pebble/batchrepr"
      23              :         "github.com/cockroachdb/pebble/internal/arenaskl"
      24              :         "github.com/cockroachdb/pebble/internal/base"
      25              :         "github.com/cockroachdb/pebble/internal/cache"
      26              :         "github.com/cockroachdb/pebble/internal/constants"
      27              :         "github.com/cockroachdb/pebble/internal/invariants"
      28              :         "github.com/cockroachdb/pebble/internal/keyspan"
      29              :         "github.com/cockroachdb/pebble/internal/manifest"
      30              :         "github.com/cockroachdb/pebble/internal/manual"
      31              :         "github.com/cockroachdb/pebble/objstorage"
      32              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      33              :         "github.com/cockroachdb/pebble/objstorage/remote"
      34              :         "github.com/cockroachdb/pebble/record"
      35              :         "github.com/cockroachdb/pebble/vfs"
      36              :         "github.com/cockroachdb/pebble/wal"
      37              :         "github.com/prometheus/client_golang/prometheus"
      38              : )
      39              : 
      40              : const (
      41              :         initialMemTableSize = 256 << 10 // 256 KB
      42              : 
      43              :         // The max batch size is limited by the uint32 offsets stored in
      44              :         // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
      45              :         //
      46              :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      47              :         // end of an allocation fits in uint32.
      48              :         //
      49              :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      50              :         // 2GB).
      51              :         maxBatchSize = constants.MaxUint32OrInt
      52              : 
      53              :         // The max memtable size is limited by the uint32 offsets stored in
      54              :         // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
      55              :         //
      56              :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      57              :         // end of an allocation fits in uint32.
      58              :         //
      59              :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      60              :         // 2GB).
      61              :         maxMemTableSize = constants.MaxUint32OrInt
      62              : )
      63              : 
      64              : // FileCacheSize can be used to determine the file
      65              : // cache size for a single db, given the maximum open
      66              : // files which can be used by a file cache which is
      67              : // only used by a single db.
      68            2 : func FileCacheSize(maxOpenFiles int) int {
      69            2 :         fileCacheSize := maxOpenFiles - numNonFileCacheFiles
      70            2 :         if fileCacheSize < minFileCacheSize {
      71            1 :                 fileCacheSize = minFileCacheSize
      72            1 :         }
      73            2 :         return fileCacheSize
      74              : }
      75              : 
      76              : // Open opens a DB whose files live in the given directory.
      77            2 : func Open(dirname string, opts *Options) (db *DB, err error) {
      78            2 :         // Make a copy of the options so that we don't mutate the passed in options.
      79            2 :         opts = opts.Clone()
      80            2 :         opts.EnsureDefaults()
      81            2 :         if err := opts.Validate(); err != nil {
      82            0 :                 return nil, err
      83            0 :         }
      84            2 :         if opts.LoggerAndTracer == nil {
      85            2 :                 opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
      86            2 :         } else {
      87            1 :                 opts.Logger = opts.LoggerAndTracer
      88            1 :         }
      89              : 
      90            2 :         if invariants.Sometimes(5) {
      91            2 :                 assertComparer := base.MakeAssertComparer(*opts.Comparer)
      92            2 :                 opts.Comparer = &assertComparer
      93            2 :         }
      94              : 
      95              :         // In all error cases, we return db = nil; this is used by various
      96              :         // deferred cleanups.
      97              : 
      98              :         // Open the database and WAL directories first.
      99            2 :         walDirname, dataDir, err := prepareAndOpenDirs(dirname, opts)
     100            2 :         if err != nil {
     101            1 :                 return nil, errors.Wrapf(err, "error opening database at %q", dirname)
     102            1 :         }
     103            2 :         defer func() {
     104            2 :                 if db == nil {
     105            1 :                         dataDir.Close()
     106            1 :                 }
     107              :         }()
     108              : 
     109              :         // Lock the database directory.
     110            2 :         var fileLock *Lock
     111            2 :         if opts.Lock != nil {
     112            1 :                 // The caller already acquired the database lock. Ensure that the
     113            1 :                 // directory matches.
     114            1 :                 if err := opts.Lock.pathMatches(dirname); err != nil {
     115            0 :                         return nil, err
     116            0 :                 }
     117            1 :                 if err := opts.Lock.refForOpen(); err != nil {
     118            1 :                         return nil, err
     119            1 :                 }
     120            1 :                 fileLock = opts.Lock
     121            2 :         } else {
     122            2 :                 fileLock, err = LockDirectory(dirname, opts.FS)
     123            2 :                 if err != nil {
     124            1 :                         return nil, err
     125            1 :                 }
     126              :         }
     127            2 :         defer func() {
     128            2 :                 if db == nil {
     129            1 :                         fileLock.Close()
     130            1 :                 }
     131              :         }()
     132              : 
     133              :         // List the directory contents. This also happens to include WAL log files, if
     134              :         // they are in the same dir, but we will ignore those below. The provider is
     135              :         // also given this list, but it ignores non sstable files.
     136            2 :         ls, err := opts.FS.List(dirname)
     137            2 :         if err != nil {
     138            1 :                 return nil, err
     139            1 :         }
     140              : 
     141              :         // Establish the format major version.
     142            2 :         formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
     143            2 :         if err != nil {
     144            1 :                 return nil, err
     145            1 :         }
     146            2 :         defer func() {
     147            2 :                 if db == nil {
     148            1 :                         formatVersionMarker.Close()
     149            1 :                 }
     150              :         }()
     151              : 
     152            2 :         noFormatVersionMarker := formatVersion == FormatDefault
     153            2 :         if noFormatVersionMarker {
     154            2 :                 // We will initialize the store at the minimum possible format, then upgrade
     155            2 :                 // the format to the desired one. This helps test the format upgrade code.
     156            2 :                 formatVersion = FormatMinSupported
     157            2 :                 if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
     158            2 :                         formatVersion = FormatMinForSharedObjects
     159            2 :                 }
     160              :                 // There is no format version marker file. There are three cases:
     161              :                 //  - we are trying to open an existing store that was created at
     162              :                 //    FormatMostCompatible (the only one without a version marker file)
     163              :                 //  - we are creating a new store;
     164              :                 //  - we are retrying a failed creation.
     165              :                 //
     166              :                 // To error in the first case, we set ErrorIfNotPristine.
     167            2 :                 opts.ErrorIfNotPristine = true
     168            2 :                 defer func() {
     169            2 :                         if err != nil && errors.Is(err, ErrDBNotPristine) {
     170            0 :                                 // We must be trying to open an existing store at FormatMostCompatible.
     171            0 :                                 // Correct the error in this case -we
     172            0 :                                 err = errors.Newf(
     173            0 :                                         "pebble: database %q written in format major version 1 which is no longer supported",
     174            0 :                                         dirname)
     175            0 :                         }
     176              :                 }()
     177            2 :         } else {
     178            2 :                 if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone && formatVersion < FormatMinForSharedObjects {
     179            0 :                         return nil, errors.Newf(
     180            0 :                                 "pebble: database %q configured with shared objects but written in too old format major version %d",
     181            0 :                                 dirname, formatVersion)
     182            0 :                 }
     183              :         }
     184              : 
     185              :         // Find the currently active manifest, if there is one.
     186            2 :         manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
     187            2 :         if err != nil {
     188            1 :                 return nil, errors.Wrapf(err, "pebble: database %q", dirname)
     189            1 :         }
     190            2 :         defer func() {
     191            2 :                 if db == nil {
     192            1 :                         manifestMarker.Close()
     193            1 :                 }
     194              :         }()
     195              : 
     196              :         // Atomic markers may leave behind obsolete files if there's a crash
     197              :         // mid-update. Clean these up if we're not in read-only mode.
     198            2 :         if !opts.ReadOnly {
     199            2 :                 if err := formatVersionMarker.RemoveObsolete(); err != nil {
     200            0 :                         return nil, err
     201            0 :                 }
     202            2 :                 if err := manifestMarker.RemoveObsolete(); err != nil {
     203            0 :                         return nil, err
     204            0 :                 }
     205              :         }
     206              : 
     207            2 :         if opts.Cache == nil {
     208            2 :                 opts.Cache = cache.New(opts.CacheSize)
     209            2 :         } else {
     210            1 :                 opts.Cache.Ref()
     211            1 :         }
     212              : 
     213            2 :         d := &DB{
     214            2 :                 cacheID:             opts.Cache.NewID(),
     215            2 :                 dirname:             dirname,
     216            2 :                 opts:                opts,
     217            2 :                 cmp:                 opts.Comparer.Compare,
     218            2 :                 equal:               opts.Comparer.Equal,
     219            2 :                 merge:               opts.Merger.Merge,
     220            2 :                 split:               opts.Comparer.Split,
     221            2 :                 abbreviatedKey:      opts.Comparer.AbbreviatedKey,
     222            2 :                 largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
     223            2 :                 fileLock:            fileLock,
     224            2 :                 dataDir:             dataDir,
     225            2 :                 closed:              new(atomic.Value),
     226            2 :                 closedCh:            make(chan struct{}),
     227            2 :         }
     228            2 :         d.mu.versions = &versionSet{}
     229            2 :         d.diskAvailBytes.Store(math.MaxUint64)
     230            2 : 
     231            2 :         defer func() {
     232            2 :                 // If an error or panic occurs during open, attempt to release the manually
     233            2 :                 // allocated memory resources. Note that rather than look for an error, we
     234            2 :                 // look for the return of a nil DB pointer.
     235            2 :                 if r := recover(); db == nil {
     236            1 :                         // If there's an unused, recycled memtable, we need to release its memory.
     237            1 :                         if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
     238            1 :                                 d.freeMemTable(obsoleteMemTable)
     239            1 :                         }
     240              : 
     241              :                         // Release our references to the Cache. Note that both the DB, and
     242              :                         // fileCache have a reference. When we release the reference to
     243              :                         // the fileCache, and if there are no other references to
     244              :                         // the fileCache, then the fileCache will also release its
     245              :                         // reference to the cache.
     246            1 :                         opts.Cache.Unref()
     247            1 : 
     248            1 :                         if d.fileCache != nil {
     249            1 :                                 _ = d.fileCache.Close()
     250            1 :                         }
     251              : 
     252            1 :                         for _, mem := range d.mu.mem.queue {
     253            1 :                                 switch t := mem.flushable.(type) {
     254            1 :                                 case *memTable:
     255            1 :                                         manual.Free(manual.MemTable, t.arenaBuf)
     256            1 :                                         t.arenaBuf = manual.Buf{}
     257              :                                 }
     258              :                         }
     259            1 :                         if d.cleanupManager != nil {
     260            1 :                                 d.cleanupManager.Close()
     261            1 :                         }
     262            1 :                         if d.objProvider != nil {
     263            1 :                                 d.objProvider.Close()
     264            1 :                         }
     265            1 :                         if r != nil {
     266            1 :                                 panic(r)
     267              :                         }
     268              :                 }
     269              :         }()
     270              : 
     271            2 :         d.commit = newCommitPipeline(commitEnv{
     272            2 :                 logSeqNum:     &d.mu.versions.logSeqNum,
     273            2 :                 visibleSeqNum: &d.mu.versions.visibleSeqNum,
     274            2 :                 apply:         d.commitApply,
     275            2 :                 write:         d.commitWrite,
     276            2 :         })
     277            2 :         d.mu.nextJobID = 1
     278            2 :         d.mu.mem.nextSize = opts.MemTableSize
     279            2 :         if d.mu.mem.nextSize > initialMemTableSize {
     280            2 :                 d.mu.mem.nextSize = initialMemTableSize
     281            2 :         }
     282            2 :         d.mu.compact.cond.L = &d.mu.Mutex
     283            2 :         d.mu.compact.inProgress = make(map[*compaction]struct{})
     284            2 :         d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
     285            2 :         d.mu.snapshots.init()
     286            2 :         // logSeqNum is the next sequence number that will be assigned.
     287            2 :         // Start assigning sequence numbers from base.SeqNumStart to leave
     288            2 :         // room for reserved sequence numbers (see comments around
     289            2 :         // SeqNumStart).
     290            2 :         d.mu.versions.logSeqNum.Store(base.SeqNumStart)
     291            2 :         d.mu.formatVers.vers.Store(uint64(formatVersion))
     292            2 :         d.mu.formatVers.marker = formatVersionMarker
     293            2 : 
     294            2 :         d.timeNow = time.Now
     295            2 :         d.openedAt = d.timeNow()
     296            2 : 
     297            2 :         d.mu.Lock()
     298            2 :         defer d.mu.Unlock()
     299            2 : 
     300            2 :         jobID := d.newJobIDLocked()
     301            2 : 
     302            2 :         providerSettings := objstorageprovider.Settings{
     303            2 :                 Logger:              opts.Logger,
     304            2 :                 FS:                  opts.FS,
     305            2 :                 FSDirName:           dirname,
     306            2 :                 FSDirInitialListing: ls,
     307            2 :                 FSCleaner:           opts.Cleaner,
     308            2 :                 NoSyncOnClose:       opts.NoSyncOnClose,
     309            2 :                 BytesPerSync:        opts.BytesPerSync,
     310            2 :         }
     311            2 :         providerSettings.Local.ReadaheadConfig = opts.Local.ReadaheadConfig
     312            2 :         providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
     313            2 :         providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
     314            2 :         providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
     315            2 :         providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
     316            2 : 
     317            2 :         d.objProvider, err = objstorageprovider.Open(providerSettings)
     318            2 :         if err != nil {
     319            1 :                 return nil, err
     320            1 :         }
     321              : 
     322            2 :         if !manifestExists {
     323            2 :                 // DB does not exist.
     324            2 :                 if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
     325            1 :                         return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
     326            1 :                 }
     327              : 
     328              :                 // Create the DB.
     329            2 :                 if err := d.mu.versions.create(
     330            2 :                         jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     331            1 :                         return nil, err
     332            1 :                 }
     333            2 :         } else {
     334            2 :                 if opts.ErrorIfExists {
     335            1 :                         return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
     336            1 :                 }
     337              :                 // Load the version set.
     338            2 :                 if err := d.mu.versions.load(
     339            2 :                         dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     340            1 :                         return nil, err
     341            1 :                 }
     342            2 :                 if opts.ErrorIfNotPristine {
     343            1 :                         liveFileNums := make(map[base.DiskFileNum]struct{})
     344            1 :                         d.mu.versions.addLiveFileNums(liveFileNums)
     345            1 :                         if len(liveFileNums) != 0 {
     346            1 :                                 return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
     347            1 :                         }
     348              :                 }
     349              :         }
     350              : 
     351              :         // In read-only mode, we replay directly into the mutable memtable but never
     352              :         // flush it. We need to delay creation of the memtable until we know the
     353              :         // sequence number of the first batch that will be inserted.
     354            2 :         if !d.opts.ReadOnly {
     355            2 :                 var entry *flushableEntry
     356            2 :                 d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     357            2 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     358            2 :         }
     359              : 
     360            2 :         d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     361            2 :                 Buckets: FsyncLatencyBuckets,
     362            2 :         })
     363            2 :         walOpts := wal.Options{
     364            2 :                 Primary:              wal.Dir{FS: opts.FS, Dirname: walDirname},
     365            2 :                 Secondary:            wal.Dir{},
     366            2 :                 MinUnflushedWALNum:   wal.NumWAL(d.mu.versions.minUnflushedLogNum),
     367            2 :                 MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
     368            2 :                 NoSyncOnClose:        opts.NoSyncOnClose,
     369            2 :                 BytesPerSync:         opts.WALBytesPerSync,
     370            2 :                 PreallocateSize:      d.walPreallocateSize,
     371            2 :                 MinSyncInterval:      opts.WALMinSyncInterval,
     372            2 :                 FsyncLatency:         d.mu.log.metrics.fsyncLatency,
     373            2 :                 QueueSemChan:         d.commit.logSyncQSem,
     374            2 :                 Logger:               opts.Logger,
     375            2 :                 EventListener:        walEventListenerAdaptor{l: opts.EventListener},
     376            2 :         }
     377            2 :         if opts.WALFailover != nil {
     378            2 :                 walOpts.Secondary = opts.WALFailover.Secondary
     379            2 :                 walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
     380            2 :                 walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     381            2 :                         Buckets: FsyncLatencyBuckets,
     382            2 :                 })
     383            2 :         }
     384            2 :         walDirs := append(walOpts.Dirs(), opts.WALRecoveryDirs...)
     385            2 :         wals, err := wal.Scan(walDirs...)
     386            2 :         if err != nil {
     387            1 :                 return nil, err
     388            1 :         }
     389            2 :         walManager, err := wal.Init(walOpts, wals)
     390            2 :         if err != nil {
     391            1 :                 return nil, err
     392            1 :         }
     393            2 :         defer func() {
     394            2 :                 if db == nil {
     395            1 :                         walManager.Close()
     396            1 :                 }
     397              :         }()
     398              : 
     399            2 :         d.mu.log.manager = walManager
     400            2 : 
     401            2 :         d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
     402            2 : 
     403            2 :         if manifestExists && !opts.DisableConsistencyCheck {
     404            2 :                 curVersion := d.mu.versions.currentVersion()
     405            2 :                 if err := checkConsistency(curVersion, d.objProvider); err != nil {
     406            1 :                         return nil, err
     407            1 :                 }
     408              :         }
     409              : 
     410            2 :         fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
     411            2 :         if opts.FileCache == nil {
     412            2 :                 opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
     413            2 :                 defer opts.FileCache.Unref()
     414            2 :         }
     415            2 :         d.fileCache = opts.FileCache.newHandle(d.cacheID, d.objProvider, d.opts)
     416            2 :         d.newIters = d.fileCache.newIters
     417            2 :         d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
     418            2 : 
     419            2 :         d.mu.annotators.totalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
     420            1 :                 return true
     421            1 :         })
     422            2 :         d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
     423            1 :                 meta, err := d.objProvider.Lookup(base.FileTypeTable, f.FileBacking.DiskFileNum)
     424            1 :                 if err != nil {
     425            0 :                         return false
     426            0 :                 }
     427            1 :                 return meta.IsRemote()
     428              :         })
     429            2 :         d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
     430            1 :                 meta, err := d.objProvider.Lookup(base.FileTypeTable, f.FileBacking.DiskFileNum)
     431            1 :                 if err != nil {
     432            0 :                         return false
     433            0 :                 }
     434            1 :                 return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
     435              :         })
     436              : 
     437            2 :         var previousOptionsFileNum base.DiskFileNum
     438            2 :         var previousOptionsFilename string
     439            2 :         for _, filename := range ls {
     440            2 :                 ft, fn, ok := base.ParseFilename(opts.FS, filename)
     441            2 :                 if !ok {
     442            2 :                         continue
     443              :                 }
     444              : 
     445              :                 // Don't reuse any obsolete file numbers to avoid modifying an
     446              :                 // ingested sstable's original external file.
     447            2 :                 d.mu.versions.markFileNumUsed(fn)
     448            2 : 
     449            2 :                 switch ft {
     450            0 :                 case base.FileTypeLog:
     451              :                         // Ignore.
     452            2 :                 case base.FileTypeOptions:
     453            2 :                         if previousOptionsFileNum < fn {
     454            2 :                                 previousOptionsFileNum = fn
     455            2 :                                 previousOptionsFilename = filename
     456            2 :                         }
     457            1 :                 case base.FileTypeTemp, base.FileTypeOldTemp:
     458            1 :                         if !d.opts.ReadOnly {
     459            1 :                                 // Some codepaths write to a temporary file and then
     460            1 :                                 // rename it to its final location when complete.  A
     461            1 :                                 // temp file is leftover if a process exits before the
     462            1 :                                 // rename.  Remove it.
     463            1 :                                 err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
     464            1 :                                 if err != nil {
     465            0 :                                         return nil, err
     466            0 :                                 }
     467              :                         }
     468              :                 }
     469              :         }
     470            2 :         if n := len(wals); n > 0 {
     471            2 :                 // Don't reuse any obsolete file numbers to avoid modifying an
     472            2 :                 // ingested sstable's original external file.
     473            2 :                 d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
     474            2 :         }
     475              : 
     476              :         // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
     477              :         // objProvider. This avoids FileNum collisions with obsolete sstables.
     478            2 :         objects := d.objProvider.List()
     479            2 :         for _, obj := range objects {
     480            2 :                 d.mu.versions.markFileNumUsed(obj.DiskFileNum)
     481            2 :         }
     482              : 
     483              :         // Validate the most-recent OPTIONS file, if there is one.
     484            2 :         if previousOptionsFilename != "" {
     485            2 :                 path := opts.FS.PathJoin(dirname, previousOptionsFilename)
     486            2 :                 previousOptions, err := readOptionsFile(opts, path)
     487            2 :                 if err != nil {
     488            0 :                         return nil, err
     489            0 :                 }
     490            2 :                 if err := opts.CheckCompatibility(previousOptions); err != nil {
     491            1 :                         return nil, err
     492            1 :                 }
     493              :         }
     494              : 
     495              :         // Replay any newer log files than the ones named in the manifest.
     496            2 :         var replayWALs wal.Logs
     497            2 :         for i, w := range wals {
     498            2 :                 if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
     499            2 :                         replayWALs = wals[i:]
     500            2 :                         break
     501              :                 }
     502              :         }
     503            2 :         var flushableIngests []*ingestedFlushable
     504            2 :         for i, lf := range replayWALs {
     505            2 :                 // WALs other than the last one would have been closed cleanly.
     506            2 :                 //
     507            2 :                 // Note: we used to never require strict WAL tails when reading from older
     508            2 :                 // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
     509            2 :                 // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
     510            2 :                 // compatible Pebble format is newer and guarantees a clean EOF.
     511            2 :                 strictWALTail := i < len(replayWALs)-1
     512            2 :                 fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
     513            2 :                 if err != nil {
     514            1 :                         return nil, err
     515            1 :                 }
     516            2 :                 if len(fi) > 0 {
     517            2 :                         flushableIngests = append(flushableIngests, fi...)
     518            2 :                 }
     519            2 :                 if d.mu.versions.logSeqNum.Load() < maxSeqNum {
     520            2 :                         d.mu.versions.logSeqNum.Store(maxSeqNum)
     521            2 :                 }
     522              :         }
     523            2 :         if d.mu.mem.mutable == nil {
     524            2 :                 // Recreate the mutable memtable if replayWAL got rid of it.
     525            2 :                 var entry *flushableEntry
     526            2 :                 d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
     527            2 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     528            2 :         }
     529            2 :         d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
     530            2 : 
     531            2 :         if !d.opts.ReadOnly {
     532            2 :                 d.maybeScheduleFlush()
     533            2 :                 for d.mu.compact.flushing {
     534            2 :                         d.mu.compact.cond.Wait()
     535            2 :                 }
     536              : 
     537              :                 // Create an empty .log file for the mutable memtable.
     538            2 :                 newLogNum := d.mu.versions.getNextDiskFileNum()
     539            2 :                 d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
     540            2 :                 if err != nil {
     541            1 :                         return nil, err
     542            1 :                 }
     543              : 
     544              :                 // This isn't strictly necessary as we don't use the log number for
     545              :                 // memtables being flushed, only for the next unflushed memtable.
     546            2 :                 d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
     547              :         }
     548            2 :         d.updateReadStateLocked(d.opts.DebugCheck)
     549            2 : 
     550            2 :         if !d.opts.ReadOnly {
     551            2 :                 // If the Options specify a format major version higher than the
     552            2 :                 // loaded database's, upgrade it. If this is a new database, this
     553            2 :                 // code path also performs an initial upgrade from the starting
     554            2 :                 // implicit MinSupported version.
     555            2 :                 //
     556            2 :                 // We ratchet the version this far into Open so that migrations have a read
     557            2 :                 // state available. Note that this also results in creating/updating the
     558            2 :                 // format version marker file.
     559            2 :                 if opts.FormatMajorVersion > d.FormatMajorVersion() {
     560            2 :                         if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
     561            0 :                                 return nil, err
     562            0 :                         }
     563            2 :                 } else if noFormatVersionMarker {
     564            2 :                         // We are creating a new store. Create the format version marker file.
     565            2 :                         if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
     566            1 :                                 return nil, err
     567            1 :                         }
     568              :                 }
     569              : 
     570              :                 // Write the current options to disk.
     571            2 :                 d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
     572            2 :                 tmpPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeTemp, d.optionsFileNum)
     573            2 :                 optionsPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeOptions, d.optionsFileNum)
     574            2 : 
     575            2 :                 // Write them to a temporary file first, in case we crash before
     576            2 :                 // we're done. A corrupt options file prevents opening the
     577            2 :                 // database.
     578            2 :                 optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
     579            2 :                 if err != nil {
     580            1 :                         return nil, err
     581            1 :                 }
     582            2 :                 serializedOpts := []byte(opts.String())
     583            2 :                 if _, err := optionsFile.Write(serializedOpts); err != nil {
     584            1 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     585            1 :                 }
     586            2 :                 d.optionsFileSize = uint64(len(serializedOpts))
     587            2 :                 if err := optionsFile.Sync(); err != nil {
     588            1 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     589            1 :                 }
     590            2 :                 if err := optionsFile.Close(); err != nil {
     591            0 :                         return nil, err
     592            0 :                 }
     593              :                 // Atomically rename to the OPTIONS-XXXXXX path. This rename is
     594              :                 // guaranteed to be atomic because the destination path does not
     595              :                 // exist.
     596            2 :                 if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
     597            1 :                         return nil, err
     598            1 :                 }
     599            2 :                 if err := d.dataDir.Sync(); err != nil {
     600            1 :                         return nil, err
     601            1 :                 }
     602              :         }
     603              : 
     604            2 :         if !d.opts.ReadOnly {
     605            2 :                 // Get a fresh list of files, in case some of the earlier flushes/compactions
     606            2 :                 // have deleted some files.
     607            2 :                 ls, err := opts.FS.List(dirname)
     608            2 :                 if err != nil {
     609            1 :                         return nil, err
     610            1 :                 }
     611            2 :                 d.scanObsoleteFiles(ls, flushableIngests)
     612            2 :                 d.deleteObsoleteFiles(jobID)
     613              :         }
     614              :         // Else, nothing is obsolete.
     615              : 
     616            2 :         d.mu.tableStats.cond.L = &d.mu.Mutex
     617            2 :         d.mu.tableValidation.cond.L = &d.mu.Mutex
     618            2 :         if !d.opts.ReadOnly {
     619            2 :                 d.maybeCollectTableStatsLocked()
     620            2 :         }
     621            2 :         d.calculateDiskAvailableBytes()
     622            2 : 
     623            2 :         d.maybeScheduleFlush()
     624            2 :         d.maybeScheduleCompaction()
     625            2 : 
     626            2 :         // Note: this is a no-op if invariants are disabled or race is enabled.
     627            2 :         //
     628            2 :         // Setting a finalizer on *DB causes *DB to never be reclaimed and the
     629            2 :         // finalizer to never be run. The problem is due to this limitation of
     630            2 :         // finalizers mention in the SetFinalizer docs:
     631            2 :         //
     632            2 :         //   If a cyclic structure includes a block with a finalizer, that cycle is
     633            2 :         //   not guaranteed to be garbage collected and the finalizer is not
     634            2 :         //   guaranteed to run, because there is no ordering that respects the
     635            2 :         //   dependencies.
     636            2 :         //
     637            2 :         // DB has cycles with several of its internal structures: readState,
     638            2 :         // newIters, fileCache, versions, etc. Each of this individually cause a
     639            2 :         // cycle and prevent the finalizer from being run. But we can workaround this
     640            2 :         // finializer limitation by setting a finalizer on another object that is
     641            2 :         // tied to the lifetime of DB: the DB.closed atomic.Value.
     642            2 :         dPtr := fmt.Sprintf("%p", d)
     643            2 :         invariants.SetFinalizer(d.closed, func(obj interface{}) {
     644            0 :                 v := obj.(*atomic.Value)
     645            0 :                 if err := v.Load(); err == nil {
     646            0 :                         fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
     647            0 :                         os.Exit(1)
     648            0 :                 }
     649              :         })
     650              : 
     651            2 :         return d, nil
     652              : }
     653              : 
     654              : // prepareAndOpenDirs opens the directories for the store (and creates them if
     655              : // necessary).
     656              : //
     657              : // Returns an error if ReadOnly is set and the directories don't exist.
     658              : func prepareAndOpenDirs(
     659              :         dirname string, opts *Options,
     660            2 : ) (walDirname string, dataDir vfs.File, err error) {
     661            2 :         walDirname = opts.WALDir
     662            2 :         if opts.WALDir == "" {
     663            2 :                 walDirname = dirname
     664            2 :         }
     665              : 
     666              :         // Create directories if needed.
     667            2 :         if !opts.ReadOnly {
     668            2 :                 f, err := mkdirAllAndSyncParents(opts.FS, dirname)
     669            2 :                 if err != nil {
     670            1 :                         return "", nil, err
     671            1 :                 }
     672            2 :                 f.Close()
     673            2 :                 if walDirname != dirname {
     674            2 :                         f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
     675            2 :                         if err != nil {
     676            0 :                                 return "", nil, err
     677            0 :                         }
     678            2 :                         f.Close()
     679              :                 }
     680            2 :                 if opts.WALFailover != nil {
     681            2 :                         secondary := opts.WALFailover.Secondary
     682            2 :                         f, err := mkdirAllAndSyncParents(secondary.FS, secondary.Dirname)
     683            2 :                         if err != nil {
     684            0 :                                 return "", nil, err
     685            0 :                         }
     686            2 :                         f.Close()
     687              :                 }
     688              :         }
     689              : 
     690            2 :         dataDir, err = opts.FS.OpenDir(dirname)
     691            2 :         if err != nil {
     692            1 :                 if opts.ReadOnly && oserror.IsNotExist(err) {
     693            1 :                         return "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
     694            1 :                 }
     695            1 :                 return "", nil, err
     696              :         }
     697            2 :         if opts.ReadOnly && walDirname != dirname {
     698            1 :                 // Check that the wal dir exists.
     699            1 :                 walDir, err := opts.FS.OpenDir(walDirname)
     700            1 :                 if err != nil {
     701            1 :                         dataDir.Close()
     702            1 :                         return "", nil, err
     703            1 :                 }
     704            1 :                 walDir.Close()
     705              :         }
     706              : 
     707            2 :         return walDirname, dataDir, nil
     708              : }
     709              : 
     710              : // GetVersion returns the engine version string from the latest options
     711              : // file present in dir. Used to check what Pebble or RocksDB version was last
     712              : // used to write to the database stored in this directory. An empty string is
     713              : // returned if no valid OPTIONS file with a version key was found.
     714            1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
     715            1 :         ls, err := fs.List(dir)
     716            1 :         if err != nil {
     717            0 :                 return "", err
     718            0 :         }
     719            1 :         var version string
     720            1 :         lastOptionsSeen := base.DiskFileNum(0)
     721            1 :         for _, filename := range ls {
     722            1 :                 ft, fn, ok := base.ParseFilename(fs, filename)
     723            1 :                 if !ok {
     724            1 :                         continue
     725              :                 }
     726            1 :                 switch ft {
     727            1 :                 case base.FileTypeOptions:
     728            1 :                         // If this file has a higher number than the last options file
     729            1 :                         // processed, reset version. This is because rocksdb often
     730            1 :                         // writes multiple options files without deleting previous ones.
     731            1 :                         // Otherwise, skip parsing this options file.
     732            1 :                         if fn > lastOptionsSeen {
     733            1 :                                 version = ""
     734            1 :                                 lastOptionsSeen = fn
     735            1 :                         } else {
     736            0 :                                 continue
     737              :                         }
     738            1 :                         f, err := fs.Open(fs.PathJoin(dir, filename))
     739            1 :                         if err != nil {
     740            0 :                                 return "", err
     741            0 :                         }
     742            1 :                         data, err := io.ReadAll(f)
     743            1 :                         f.Close()
     744            1 : 
     745            1 :                         if err != nil {
     746            0 :                                 return "", err
     747            0 :                         }
     748            1 :                         err = parseOptions(string(data), parseOptionsFuncs{
     749            1 :                                 visitKeyValue: func(i, j int, section, key, value string) error {
     750            1 :                                         switch {
     751            1 :                                         case section == "Version":
     752            1 :                                                 switch key {
     753            1 :                                                 case "pebble_version":
     754            1 :                                                         version = value
     755            1 :                                                 case "rocksdb_version":
     756            1 :                                                         version = fmt.Sprintf("rocksdb v%s", value)
     757              :                                                 }
     758              :                                         }
     759            1 :                                         return nil
     760              :                                 },
     761              :                         })
     762            1 :                         if err != nil {
     763            0 :                                 return "", err
     764            0 :                         }
     765              :                 }
     766              :         }
     767            1 :         return version, nil
     768              : }
     769              : 
     770              : func (d *DB) replayIngestedFlushable(
     771              :         b *Batch, logNum base.DiskFileNum,
     772            2 : ) (entry *flushableEntry, err error) {
     773            2 :         br := b.Reader()
     774            2 :         seqNum := b.SeqNum()
     775            2 : 
     776            2 :         fileNums := make([]base.DiskFileNum, 0, b.Count())
     777            2 :         var exciseSpan KeyRange
     778            2 :         addFileNum := func(encodedFileNum []byte) {
     779            2 :                 fileNum, n := binary.Uvarint(encodedFileNum)
     780            2 :                 if n <= 0 {
     781            0 :                         panic("pebble: ingest sstable file num is invalid")
     782              :                 }
     783            2 :                 fileNums = append(fileNums, base.DiskFileNum(fileNum))
     784              :         }
     785              : 
     786            2 :         for i := 0; i < int(b.Count()); i++ {
     787            2 :                 kind, key, val, ok, err := br.Next()
     788            2 :                 if err != nil {
     789            0 :                         return nil, err
     790            0 :                 }
     791            2 :                 if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
     792            0 :                         panic("pebble: invalid batch key kind")
     793              :                 }
     794            2 :                 if !ok {
     795            0 :                         panic("pebble: invalid batch count")
     796              :                 }
     797            2 :                 if kind == base.InternalKeyKindExcise {
     798            2 :                         if exciseSpan.Valid() {
     799            0 :                                 panic("pebble: multiple excise spans in a single batch")
     800              :                         }
     801            2 :                         exciseSpan.Start = slices.Clone(key)
     802            2 :                         exciseSpan.End = slices.Clone(val)
     803            2 :                         continue
     804              :                 }
     805            2 :                 addFileNum(key)
     806              :         }
     807              : 
     808            2 :         if _, _, _, ok, err := br.Next(); err != nil {
     809            0 :                 return nil, err
     810            2 :         } else if ok {
     811            0 :                 panic("pebble: invalid number of entries in batch")
     812              :         }
     813              : 
     814            2 :         meta := make([]*fileMetadata, len(fileNums))
     815            2 :         var lastRangeKey keyspan.Span
     816            2 :         for i, n := range fileNums {
     817            2 :                 readable, err := d.objProvider.OpenForReading(context.TODO(), base.FileTypeTable, n,
     818            2 :                         objstorage.OpenOptions{MustExist: true})
     819            2 :                 if err != nil {
     820            0 :                         return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
     821            0 :                 }
     822              :                 // NB: ingestLoad1 will close readable.
     823            2 :                 meta[i], lastRangeKey, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
     824            2 :                         readable, d.cacheID, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
     825            2 :                 if err != nil {
     826            0 :                         return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
     827            0 :                 }
     828              :         }
     829            2 :         if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
     830            0 :                 return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
     831            0 :                         d.opts.Comparer.FormatKey(lastRangeKey.End))
     832            0 :         }
     833              : 
     834            2 :         numFiles := len(meta)
     835            2 :         if exciseSpan.Valid() {
     836            2 :                 numFiles++
     837            2 :         }
     838            2 :         if uint32(numFiles) != b.Count() {
     839            0 :                 panic("pebble: couldn't load all files in WAL entry")
     840              :         }
     841              : 
     842            2 :         return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
     843              : }
     844              : 
     845              : // replayWAL replays the edits in the specified WAL. If the DB is in read
     846              : // only mode, then the WALs are replayed into memtables and not flushed. If
     847              : // the DB is not in read only mode, then the contents of the WAL are
     848              : // guaranteed to be flushed when a flush is scheduled after this method is run.
     849              : // Note that this flushing is very important for guaranteeing durability:
     850              : // the application may have had a number of pending
     851              : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
     852              : // happened but the corresponding data may now be readable from the WAL (while
     853              : // sitting in write-back caches in the kernel or the storage device). By
     854              : // reading the WAL (including the non-fsynced data) and then flushing all
     855              : // these changes (flush does fsyncs), we are able to guarantee that the
     856              : // initial state of the DB is durable.
     857              : //
     858              : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
     859              : // WALs into the flushable queue. Flushing of the queue is expected to be handled
     860              : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
     861              : //
     862              : // d.mu must be held when calling this, but the mutex may be dropped and
     863              : // re-acquired during the course of this method.
     864              : func (d *DB) replayWAL(
     865              :         jobID JobID, ll wal.LogicalLog, strictWALTail bool,
     866            2 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
     867            2 :         rr := ll.OpenForRead()
     868            2 :         defer rr.Close()
     869            2 :         var (
     870            2 :                 b               Batch
     871            2 :                 buf             bytes.Buffer
     872            2 :                 mem             *memTable
     873            2 :                 entry           *flushableEntry
     874            2 :                 offset          wal.Offset
     875            2 :                 lastFlushOffset int64
     876            2 :                 keysReplayed    int64 // number of keys replayed
     877            2 :                 batchesReplayed int64 // number of batches replayed
     878            2 :         )
     879            2 : 
     880            2 :         // TODO(jackson): This function is interspersed with panics, in addition to
     881            2 :         // corruption error propagation. Audit them to ensure we're truly only
     882            2 :         // panicking where the error points to Pebble bug and not user or
     883            2 :         // hardware-induced corruption.
     884            2 : 
     885            2 :         // "Flushes" (ie. closes off) the current memtable, if not nil.
     886            2 :         flushMem := func() {
     887            2 :                 if mem == nil {
     888            2 :                         return
     889            2 :                 }
     890            2 :                 mem.writerUnref()
     891            2 :                 if d.mu.mem.mutable == mem {
     892            2 :                         d.mu.mem.mutable = nil
     893            2 :                 }
     894            2 :                 entry.flushForced = !d.opts.ReadOnly
     895            2 :                 var logSize uint64
     896            2 :                 mergedOffset := offset.Physical + offset.PreviousFilesBytes
     897            2 :                 if mergedOffset >= lastFlushOffset {
     898            2 :                         logSize = uint64(mergedOffset - lastFlushOffset)
     899            2 :                 }
     900              :                 // Else, this was the initial memtable in the read-only case which must have
     901              :                 // been empty, but we need to flush it since we don't want to add to it later.
     902            2 :                 lastFlushOffset = mergedOffset
     903            2 :                 entry.logSize = logSize
     904            2 :                 mem, entry = nil, nil
     905              :         }
     906              : 
     907            2 :         mem = d.mu.mem.mutable
     908            2 :         if mem != nil {
     909            2 :                 entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
     910            2 :                 if !d.opts.ReadOnly {
     911            2 :                         flushMem()
     912            2 :                 }
     913              :         }
     914              : 
     915              :         // Creates a new memtable if there is no current memtable.
     916            2 :         ensureMem := func(seqNum base.SeqNum) {
     917            2 :                 if mem != nil {
     918            2 :                         return
     919            2 :                 }
     920            2 :                 mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
     921            2 :                 d.mu.mem.mutable = mem
     922            2 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     923              :         }
     924              : 
     925            2 :         defer func() {
     926            2 :                 if err != nil {
     927            1 :                         err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
     928            1 :                 }
     929              :         }()
     930              : 
     931            2 :         for {
     932            2 :                 var r io.Reader
     933            2 :                 var err error
     934            2 :                 r, offset, err = rr.NextRecord()
     935            2 :                 if err == nil {
     936            2 :                         _, err = io.Copy(&buf, r)
     937            2 :                 }
     938            2 :                 if err != nil {
     939            2 :                         // It is common to encounter a zeroed or invalid chunk due to WAL
     940            2 :                         // preallocation and WAL recycling. We need to distinguish these
     941            2 :                         // errors from EOF in order to recognize that the record was
     942            2 :                         // truncated and to avoid replaying subsequent WALs, but want
     943            2 :                         // to otherwise treat them like EOF.
     944            2 :                         if err == io.EOF {
     945            2 :                                 break
     946            1 :                         } else if record.IsInvalidRecord(err) {
     947            1 :                                 if !strictWALTail {
     948            1 :                                         break
     949              :                                 }
     950            1 :                                 err = errors.Mark(err, ErrCorruption)
     951              :                         }
     952            1 :                         return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
     953              :                 }
     954              : 
     955            2 :                 if buf.Len() < batchrepr.HeaderLen {
     956            0 :                         return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
     957            0 :                                 errors.Safe(base.DiskFileNum(ll.Num)), offset)
     958            0 :                 }
     959              : 
     960            2 :                 if d.opts.ErrorIfNotPristine {
     961            1 :                         return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
     962            1 :                 }
     963              : 
     964              :                 // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
     965              :                 // which is used below.
     966            2 :                 b = Batch{}
     967            2 :                 b.db = d
     968            2 :                 b.SetRepr(buf.Bytes())
     969            2 :                 seqNum := b.SeqNum()
     970            2 :                 maxSeqNum = seqNum + base.SeqNum(b.Count())
     971            2 :                 keysReplayed += int64(b.Count())
     972            2 :                 batchesReplayed++
     973            2 :                 {
     974            2 :                         br := b.Reader()
     975            2 :                         if kind, _, _, ok, err := br.Next(); err != nil {
     976            0 :                                 return nil, 0, err
     977            2 :                         } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
     978            2 :                                 // We're in the flushable ingests (+ possibly excises) case.
     979            2 :                                 //
     980            2 :                                 // Ingests require an up-to-date view of the LSM to determine the target
     981            2 :                                 // level of ingested sstables, and to accurately compute excises. Instead of
     982            2 :                                 // doing an ingest in this function, we just enqueue a flushable ingest
     983            2 :                                 // in the flushables queue and run a regular flush.
     984            2 :                                 flushMem()
     985            2 :                                 // mem is nil here.
     986            2 :                                 entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
     987            2 :                                 if err != nil {
     988            0 :                                         return nil, 0, err
     989            0 :                                 }
     990            2 :                                 fi := entry.flushable.(*ingestedFlushable)
     991            2 :                                 flushableIngests = append(flushableIngests, fi)
     992            2 :                                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     993            2 :                                 // A flushable ingest is always followed by a WAL rotation.
     994            2 :                                 break
     995              :                         }
     996              :                 }
     997              : 
     998            2 :                 if b.memTableSize >= uint64(d.largeBatchThreshold) {
     999            2 :                         flushMem()
    1000            2 :                         // Make a copy of the data slice since it is currently owned by buf and will
    1001            2 :                         // be reused in the next iteration.
    1002            2 :                         b.data = slices.Clone(b.data)
    1003            2 :                         b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
    1004            2 :                         if err != nil {
    1005            0 :                                 return nil, 0, err
    1006            0 :                         }
    1007            2 :                         entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
    1008            2 :                         // Disable memory accounting by adding a reader ref that will never be
    1009            2 :                         // removed.
    1010            2 :                         entry.readerRefs.Add(1)
    1011            2 :                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1012            2 :                 } else {
    1013            2 :                         ensureMem(seqNum)
    1014            2 :                         if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
    1015            0 :                                 return nil, 0, err
    1016            0 :                         }
    1017              :                         // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
    1018              :                         // batch may not initially fit, but will eventually fit (since it is smaller than
    1019              :                         // largeBatchThreshold).
    1020            2 :                         for err == arenaskl.ErrArenaFull {
    1021            2 :                                 flushMem()
    1022            2 :                                 ensureMem(seqNum)
    1023            2 :                                 err = mem.prepare(&b)
    1024            2 :                                 if err != nil && err != arenaskl.ErrArenaFull {
    1025            0 :                                         return nil, 0, err
    1026            0 :                                 }
    1027              :                         }
    1028            2 :                         if err = mem.apply(&b, seqNum); err != nil {
    1029            0 :                                 return nil, 0, err
    1030            0 :                         }
    1031            2 :                         mem.writerUnref()
    1032              :                 }
    1033            2 :                 buf.Reset()
    1034              :         }
    1035              : 
    1036            2 :         d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
    1037            2 :                 jobID, ll.String(), offset, keysReplayed, batchesReplayed)
    1038            2 :         if !d.opts.ReadOnly {
    1039            2 :                 flushMem()
    1040            2 :         }
    1041              : 
    1042              :         // mem is nil here, if !ReadOnly.
    1043            2 :         return flushableIngests, maxSeqNum, err
    1044              : }
    1045              : 
    1046            2 : func readOptionsFile(opts *Options, path string) (string, error) {
    1047            2 :         f, err := opts.FS.Open(path)
    1048            2 :         if err != nil {
    1049            0 :                 return "", err
    1050            0 :         }
    1051            2 :         defer f.Close()
    1052            2 : 
    1053            2 :         data, err := io.ReadAll(f)
    1054            2 :         if err != nil {
    1055            0 :                 return "", err
    1056            0 :         }
    1057            2 :         return string(data), nil
    1058              : }
    1059              : 
    1060              : // DBDesc briefly describes high-level state about a database.
    1061              : type DBDesc struct {
    1062              :         // Exists is true if an existing database was found.
    1063              :         Exists bool
    1064              :         // FormatMajorVersion indicates the database's current format
    1065              :         // version.
    1066              :         FormatMajorVersion FormatMajorVersion
    1067              :         // ManifestFilename is the filename of the current active manifest,
    1068              :         // if the database exists.
    1069              :         ManifestFilename string
    1070              :         // OptionsFilename is the filename of the most recent OPTIONS file, if it
    1071              :         // exists.
    1072              :         OptionsFilename string
    1073              : }
    1074              : 
    1075              : // String implements fmt.Stringer.
    1076            1 : func (d *DBDesc) String() string {
    1077            1 :         if !d.Exists {
    1078            1 :                 return "uninitialized"
    1079            1 :         }
    1080            1 :         var buf bytes.Buffer
    1081            1 :         fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
    1082            1 :         fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
    1083            1 :         fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
    1084            1 :         return buf.String()
    1085              : }
    1086              : 
    1087              : // Peek looks for an existing database in dirname on the provided FS. It
    1088              : // returns a brief description of the database. Peek is read-only and
    1089              : // does not open the database
    1090            1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
    1091            1 :         ls, err := fs.List(dirname)
    1092            1 :         if err != nil {
    1093            1 :                 return nil, err
    1094            1 :         }
    1095              : 
    1096            1 :         vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
    1097            1 :         if err != nil {
    1098            0 :                 return nil, err
    1099            0 :         }
    1100              :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1101              :         // PeekMarker variant that avoids opening the directory.
    1102            1 :         if err := versMarker.Close(); err != nil {
    1103            0 :                 return nil, err
    1104            0 :         }
    1105              : 
    1106              :         // Find the currently active manifest, if there is one.
    1107            1 :         manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
    1108            1 :         if err != nil {
    1109            0 :                 return nil, err
    1110            0 :         }
    1111              :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1112              :         // PeekMarker variant that avoids opening the directory.
    1113            1 :         if err := manifestMarker.Close(); err != nil {
    1114            0 :                 return nil, err
    1115            0 :         }
    1116              : 
    1117            1 :         desc := &DBDesc{
    1118            1 :                 Exists:             exists,
    1119            1 :                 FormatMajorVersion: vers,
    1120            1 :         }
    1121            1 : 
    1122            1 :         // Find the OPTIONS file with the highest file number within the list of
    1123            1 :         // directory entries.
    1124            1 :         var previousOptionsFileNum base.DiskFileNum
    1125            1 :         for _, filename := range ls {
    1126            1 :                 ft, fn, ok := base.ParseFilename(fs, filename)
    1127            1 :                 if !ok || ft != base.FileTypeOptions || fn < previousOptionsFileNum {
    1128            1 :                         continue
    1129              :                 }
    1130            1 :                 previousOptionsFileNum = fn
    1131            1 :                 desc.OptionsFilename = fs.PathJoin(dirname, filename)
    1132              :         }
    1133              : 
    1134            1 :         if exists {
    1135            1 :                 desc.ManifestFilename = base.MakeFilepath(fs, dirname, base.FileTypeManifest, manifestFileNum)
    1136            1 :         }
    1137            1 :         return desc, nil
    1138              : }
    1139              : 
    1140              : // LockDirectory acquires the database directory lock in the named directory,
    1141              : // preventing another process from opening the database. LockDirectory returns a
    1142              : // handle to the held lock that may be passed to Open through Options.Lock to
    1143              : // subsequently open the database, skipping lock acquistion during Open.
    1144              : //
    1145              : // LockDirectory may be used to expand the critical section protected by the
    1146              : // database lock to include setup before the call to Open.
    1147            2 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
    1148            2 :         fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, base.FileTypeLock, base.DiskFileNum(0)))
    1149            2 :         if err != nil {
    1150            1 :                 return nil, err
    1151            1 :         }
    1152            2 :         l := &Lock{dirname: dirname, fileLock: fileLock}
    1153            2 :         l.refs.Store(1)
    1154            2 :         invariants.SetFinalizer(l, func(obj interface{}) {
    1155            2 :                 if refs := obj.(*Lock).refs.Load(); refs > 0 {
    1156            0 :                         panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
    1157              :                 }
    1158              :         })
    1159            2 :         return l, nil
    1160              : }
    1161              : 
    1162              : // Lock represents a file lock on a directory. It may be passed to Open through
    1163              : // Options.Lock to elide lock aquisition during Open.
    1164              : type Lock struct {
    1165              :         dirname  string
    1166              :         fileLock io.Closer
    1167              :         // refs is a count of the number of handles on the lock. refs must be 0, 1
    1168              :         // or 2.
    1169              :         //
    1170              :         // When acquired by the client and passed to Open, refs = 1 and the Open
    1171              :         // call increments it to 2. When the database is closed, it's decremented to
    1172              :         // 1. Finally when the original caller, calls Close on the Lock, it's
    1173              :         // drecemented to zero and the underlying file lock is released.
    1174              :         //
    1175              :         // When Open acquires the file lock, refs remains at 1 until the database is
    1176              :         // closed.
    1177              :         refs atomic.Int32
    1178              : }
    1179              : 
    1180            1 : func (l *Lock) refForOpen() error {
    1181            1 :         // During Open, when a user passed in a lock, the reference count must be
    1182            1 :         // exactly 1. If it's zero, the lock is no longer held and is invalid. If
    1183            1 :         // it's 2, the lock is already in use by another database within the
    1184            1 :         // process.
    1185            1 :         if !l.refs.CompareAndSwap(1, 2) {
    1186            1 :                 return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
    1187            1 :         }
    1188            1 :         return nil
    1189              : }
    1190              : 
    1191              : // Close releases the lock, permitting another process to lock and open the
    1192              : // database. Close must not be called until after a database using the Lock has
    1193              : // been closed.
    1194            2 : func (l *Lock) Close() error {
    1195            2 :         if l.refs.Add(-1) > 0 {
    1196            1 :                 return nil
    1197            1 :         }
    1198            2 :         defer func() { l.fileLock = nil }()
    1199            2 :         return l.fileLock.Close()
    1200              : }
    1201              : 
    1202            1 : func (l *Lock) pathMatches(dirname string) error {
    1203            1 :         if dirname == l.dirname {
    1204            1 :                 return nil
    1205            1 :         }
    1206              :         // Check for relative paths, symlinks, etc. This isn't ideal because we're
    1207              :         // circumventing the vfs.FS interface here.
    1208              :         //
    1209              :         // TODO(jackson): We could add support for retrieving file inodes through Stat
    1210              :         // calls in the VFS interface on platforms where it's available and use that
    1211              :         // to differentiate.
    1212            1 :         dirStat, err1 := os.Stat(dirname)
    1213            1 :         lockDirStat, err2 := os.Stat(l.dirname)
    1214            1 :         if err1 == nil && err2 == nil && os.SameFile(dirStat, lockDirStat) {
    1215            1 :                 return nil
    1216            1 :         }
    1217            0 :         return errors.Join(
    1218            0 :                 errors.Newf("pebble: opts.Lock acquired in %q not %q", l.dirname, dirname),
    1219            0 :                 err1, err2)
    1220              : }
    1221              : 
    1222              : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
    1223              : // does not exist.
    1224              : //
    1225              : // Note that errors can be wrapped with more details; use errors.Is().
    1226              : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
    1227              : 
    1228              : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
    1229              : // already exists.
    1230              : //
    1231              : // Note that errors can be wrapped with more details; use errors.Is().
    1232              : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
    1233              : 
    1234              : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
    1235              : // already exists and is not pristine.
    1236              : //
    1237              : // Note that errors can be wrapped with more details; use errors.Is().
    1238              : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
    1239              : 
    1240              : // IsCorruptionError returns true if the given error indicates database
    1241              : // corruption.
    1242            1 : func IsCorruptionError(err error) bool {
    1243            1 :         return errors.Is(err, base.ErrCorruption)
    1244            1 : }
    1245              : 
    1246            2 : func checkConsistency(v *manifest.Version, objProvider objstorage.Provider) error {
    1247            2 :         var errs []error
    1248            2 :         dedup := make(map[base.DiskFileNum]struct{})
    1249            2 :         for level, files := range v.Levels {
    1250            2 :                 iter := files.Iter()
    1251            2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1252            2 :                         backingState := f.FileBacking
    1253            2 :                         if _, ok := dedup[backingState.DiskFileNum]; ok {
    1254            2 :                                 continue
    1255              :                         }
    1256            2 :                         dedup[backingState.DiskFileNum] = struct{}{}
    1257            2 :                         fileNum := backingState.DiskFileNum
    1258            2 :                         fileSize := backingState.Size
    1259            2 :                         // We skip over remote objects; those are instead checked asynchronously
    1260            2 :                         // by the table stats loading job.
    1261            2 :                         meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
    1262            2 :                         var size int64
    1263            2 :                         if err == nil {
    1264            2 :                                 if meta.IsRemote() {
    1265            2 :                                         continue
    1266              :                                 }
    1267            2 :                                 size, err = objProvider.Size(meta)
    1268              :                         }
    1269            2 :                         if err != nil {
    1270            1 :                                 errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
    1271            1 :                                 continue
    1272              :                         }
    1273              : 
    1274            2 :                         if size != int64(fileSize) {
    1275            0 :                                 errs = append(errs, errors.Errorf(
    1276            0 :                                         "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
    1277            0 :                                         errors.Safe(level), fileNum, objProvider.Path(meta),
    1278            0 :                                         errors.Safe(size), errors.Safe(fileSize)))
    1279            0 :                                 continue
    1280              :                         }
    1281              :                 }
    1282              :         }
    1283            2 :         return errors.Join(errs...)
    1284              : }
    1285              : 
    1286              : type walEventListenerAdaptor struct {
    1287              :         l *EventListener
    1288              : }
    1289              : 
    1290            2 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
    1291            2 :         // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
    1292            2 :         // is insufficient to infer whether primary or secondary.
    1293            2 :         wci := WALCreateInfo{
    1294            2 :                 JobID:           ci.JobID,
    1295            2 :                 Path:            ci.Path,
    1296            2 :                 FileNum:         base.DiskFileNum(ci.Num),
    1297            2 :                 RecycledFileNum: ci.RecycledFileNum,
    1298            2 :                 Err:             ci.Err,
    1299            2 :         }
    1300            2 :         l.l.WALCreated(wci)
    1301            2 : }
        

Generated by: LCOV version 2.0-1