LCOV - code coverage report
Current view: top level - pebble - open.go (source / functions) Hit Total Coverage
Test: 2023-10-29 08:16Z ed45a776 - tests + meta.lcov Lines: 749 834 89.8 %
Date: 2023-10-29 08:18:00 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "io"
      13             :         "math"
      14             :         "os"
      15             :         "sort"
      16             :         "sync/atomic"
      17             :         "time"
      18             : 
      19             :         "github.com/cockroachdb/errors"
      20             :         "github.com/cockroachdb/errors/oserror"
      21             :         "github.com/cockroachdb/pebble/internal/arenaskl"
      22             :         "github.com/cockroachdb/pebble/internal/base"
      23             :         "github.com/cockroachdb/pebble/internal/cache"
      24             :         "github.com/cockroachdb/pebble/internal/constants"
      25             :         "github.com/cockroachdb/pebble/internal/invariants"
      26             :         "github.com/cockroachdb/pebble/internal/manifest"
      27             :         "github.com/cockroachdb/pebble/internal/manual"
      28             :         "github.com/cockroachdb/pebble/objstorage"
      29             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      30             :         "github.com/cockroachdb/pebble/record"
      31             :         "github.com/cockroachdb/pebble/sstable"
      32             :         "github.com/cockroachdb/pebble/vfs"
      33             :         "github.com/prometheus/client_golang/prometheus"
      34             : )
      35             : 
      36             : const (
      37             :         initialMemTableSize = 256 << 10 // 256 KB
      38             : 
      39             :         // The max batch size is limited by the uint32 offsets stored in
      40             :         // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
      41             :         //
      42             :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      43             :         // end of an allocation fits in uint32.
      44             :         //
      45             :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      46             :         // 2GB).
      47             :         maxBatchSize = constants.MaxUint32OrInt
      48             : 
      49             :         // The max memtable size is limited by the uint32 offsets stored in
      50             :         // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
      51             :         //
      52             :         // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
      53             :         // end of an allocation fits in uint32.
      54             :         //
      55             :         // On 32-bit systems, slices are naturally limited to MaxInt (just short of
      56             :         // 2GB).
      57             :         maxMemTableSize = constants.MaxUint32OrInt
      58             : )
      59             : 
      60             : // TableCacheSize can be used to determine the table
      61             : // cache size for a single db, given the maximum open
      62             : // files which can be used by a table cache which is
      63             : // only used by a single db.
      64           2 : func TableCacheSize(maxOpenFiles int) int {
      65           2 :         tableCacheSize := maxOpenFiles - numNonTableCacheFiles
      66           2 :         if tableCacheSize < minTableCacheSize {
      67           1 :                 tableCacheSize = minTableCacheSize
      68           1 :         }
      69           2 :         return tableCacheSize
      70             : }
      71             : 
      72             : // Open opens a DB whose files live in the given directory.
      73           2 : func Open(dirname string, opts *Options) (db *DB, _ error) {
      74           2 :         // Make a copy of the options so that we don't mutate the passed in options.
      75           2 :         opts = opts.Clone()
      76           2 :         opts = opts.EnsureDefaults()
      77           2 :         if err := opts.Validate(); err != nil {
      78           0 :                 return nil, err
      79           0 :         }
      80           2 :         if opts.LoggerAndTracer == nil {
      81           2 :                 opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
      82           2 :         } else {
      83           1 :                 opts.Logger = opts.LoggerAndTracer
      84           1 :         }
      85             : 
      86             :         // In all error cases, we return db = nil; this is used by various
      87             :         // deferred cleanups.
      88             : 
      89             :         // Open the database and WAL directories first.
      90           2 :         walDirname, dataDir, walDir, err := prepareAndOpenDirs(dirname, opts)
      91           2 :         if err != nil {
      92           1 :                 return nil, errors.Wrapf(err, "error opening database at %q", dirname)
      93           1 :         }
      94           2 :         defer func() {
      95           2 :                 if db == nil {
      96           1 :                         if walDir != dataDir {
      97           0 :                                 walDir.Close()
      98           0 :                         }
      99           1 :                         dataDir.Close()
     100             :                 }
     101             :         }()
     102             : 
     103             :         // Lock the database directory.
     104           2 :         var fileLock *Lock
     105           2 :         if opts.Lock != nil {
     106           1 :                 // The caller already acquired the database lock. Ensure that the
     107           1 :                 // directory matches.
     108           1 :                 if dirname != opts.Lock.dirname {
     109           0 :                         return nil, errors.Newf("pebble: opts.Lock acquired in %q not %q", opts.Lock.dirname, dirname)
     110           0 :                 }
     111           1 :                 if err := opts.Lock.refForOpen(); err != nil {
     112           1 :                         return nil, err
     113           1 :                 }
     114           1 :                 fileLock = opts.Lock
     115           2 :         } else {
     116           2 :                 fileLock, err = LockDirectory(dirname, opts.FS)
     117           2 :                 if err != nil {
     118           1 :                         return nil, err
     119           1 :                 }
     120             :         }
     121           2 :         defer func() {
     122           2 :                 if db == nil {
     123           1 :                         fileLock.Close()
     124           1 :                 }
     125             :         }()
     126             : 
     127             :         // Establish the format major version.
     128           2 :         formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname)
     129           2 :         if err != nil {
     130           1 :                 return nil, err
     131           1 :         }
     132           2 :         defer func() {
     133           2 :                 if db == nil {
     134           1 :                         formatVersionMarker.Close()
     135           1 :                 }
     136             :         }()
     137             : 
     138             :         // Find the currently active manifest, if there is one.
     139           2 :         manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(formatVersion, opts.FS, dirname)
     140           2 :         if err != nil {
     141           1 :                 return nil, errors.Wrapf(err, "pebble: database %q", dirname)
     142           1 :         }
     143           2 :         defer func() {
     144           2 :                 if db == nil {
     145           1 :                         manifestMarker.Close()
     146           1 :                 }
     147             :         }()
     148             : 
     149             :         // Atomic markers may leave behind obsolete files if there's a crash
     150             :         // mid-update. Clean these up if we're not in read-only mode.
     151           2 :         if !opts.ReadOnly {
     152           2 :                 if err := formatVersionMarker.RemoveObsolete(); err != nil {
     153           0 :                         return nil, err
     154           0 :                 }
     155           2 :                 if err := manifestMarker.RemoveObsolete(); err != nil {
     156           0 :                         return nil, err
     157           0 :                 }
     158             :         }
     159             : 
     160           2 :         if opts.Cache == nil {
     161           1 :                 opts.Cache = cache.New(cacheDefaultSize)
     162           2 :         } else {
     163           2 :                 opts.Cache.Ref()
     164           2 :         }
     165             : 
     166           2 :         d := &DB{
     167           2 :                 cacheID:             opts.Cache.NewID(),
     168           2 :                 dirname:             dirname,
     169           2 :                 walDirname:          walDirname,
     170           2 :                 opts:                opts,
     171           2 :                 cmp:                 opts.Comparer.Compare,
     172           2 :                 equal:               opts.equal(),
     173           2 :                 merge:               opts.Merger.Merge,
     174           2 :                 split:               opts.Comparer.Split,
     175           2 :                 abbreviatedKey:      opts.Comparer.AbbreviatedKey,
     176           2 :                 largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
     177           2 :                 fileLock:            fileLock,
     178           2 :                 dataDir:             dataDir,
     179           2 :                 walDir:              walDir,
     180           2 :                 logRecycler:         logRecycler{limit: opts.MemTableStopWritesThreshold + 1},
     181           2 :                 closed:              new(atomic.Value),
     182           2 :                 closedCh:            make(chan struct{}),
     183           2 :         }
     184           2 :         d.mu.versions = &versionSet{}
     185           2 :         d.diskAvailBytes.Store(math.MaxUint64)
     186           2 : 
     187           2 :         defer func() {
     188           2 :                 // If an error or panic occurs during open, attempt to release the manually
     189           2 :                 // allocated memory resources. Note that rather than look for an error, we
     190           2 :                 // look for the return of a nil DB pointer.
     191           2 :                 if r := recover(); db == nil {
     192           1 :                         // Release our references to the Cache. Note that both the DB, and
     193           1 :                         // tableCache have a reference. When we release the reference to
     194           1 :                         // the tableCache, and if there are no other references to
     195           1 :                         // the tableCache, then the tableCache will also release its
     196           1 :                         // reference to the cache.
     197           1 :                         opts.Cache.Unref()
     198           1 : 
     199           1 :                         if d.tableCache != nil {
     200           1 :                                 _ = d.tableCache.close()
     201           1 :                         }
     202             : 
     203           1 :                         for _, mem := range d.mu.mem.queue {
     204           1 :                                 switch t := mem.flushable.(type) {
     205           1 :                                 case *memTable:
     206           1 :                                         manual.Free(t.arenaBuf)
     207           1 :                                         t.arenaBuf = nil
     208             :                                 }
     209             :                         }
     210           1 :                         if d.cleanupManager != nil {
     211           1 :                                 d.cleanupManager.Close()
     212           1 :                         }
     213           1 :                         if d.objProvider != nil {
     214           1 :                                 d.objProvider.Close()
     215           1 :                         }
     216           1 :                         if r != nil {
     217           1 :                                 panic(r)
     218             :                         }
     219             :                 }
     220             :         }()
     221             : 
     222           2 :         d.commit = newCommitPipeline(commitEnv{
     223           2 :                 logSeqNum:     &d.mu.versions.logSeqNum,
     224           2 :                 visibleSeqNum: &d.mu.versions.visibleSeqNum,
     225           2 :                 apply:         d.commitApply,
     226           2 :                 write:         d.commitWrite,
     227           2 :         })
     228           2 :         d.mu.nextJobID = 1
     229           2 :         d.mu.mem.nextSize = opts.MemTableSize
     230           2 :         if d.mu.mem.nextSize > initialMemTableSize {
     231           2 :                 d.mu.mem.nextSize = initialMemTableSize
     232           2 :         }
     233           2 :         d.mu.compact.cond.L = &d.mu.Mutex
     234           2 :         d.mu.compact.inProgress = make(map[*compaction]struct{})
     235           2 :         d.mu.compact.noOngoingFlushStartTime = time.Now()
     236           2 :         d.mu.snapshots.init()
     237           2 :         // logSeqNum is the next sequence number that will be assigned.
     238           2 :         // Start assigning sequence numbers from base.SeqNumStart to leave
     239           2 :         // room for reserved sequence numbers (see comments around
     240           2 :         // SeqNumStart).
     241           2 :         d.mu.versions.logSeqNum.Store(base.SeqNumStart)
     242           2 :         d.mu.formatVers.vers.Store(uint64(formatVersion))
     243           2 :         d.mu.formatVers.marker = formatVersionMarker
     244           2 : 
     245           2 :         d.timeNow = time.Now
     246           2 :         d.openedAt = d.timeNow()
     247           2 : 
     248           2 :         d.mu.Lock()
     249           2 :         defer d.mu.Unlock()
     250           2 : 
     251           2 :         jobID := d.mu.nextJobID
     252           2 :         d.mu.nextJobID++
     253           2 : 
     254           2 :         setCurrent := setCurrentFunc(d.FormatMajorVersion(), manifestMarker, opts.FS, dirname, d.dataDir)
     255           2 : 
     256           2 :         if !manifestExists {
     257           2 :                 // DB does not exist.
     258           2 :                 if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
     259           1 :                         return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
     260           1 :                 }
     261             : 
     262             :                 // Create the DB.
     263           2 :                 if err := d.mu.versions.create(jobID, dirname, opts, manifestMarker, setCurrent, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     264           1 :                         return nil, err
     265           1 :                 }
     266           2 :         } else {
     267           2 :                 if opts.ErrorIfExists {
     268           1 :                         return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
     269           1 :                 }
     270             :                 // Load the version set.
     271           2 :                 if err := d.mu.versions.load(dirname, opts, manifestFileNum, manifestMarker, setCurrent, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
     272           1 :                         return nil, err
     273           1 :                 }
     274           2 :                 if opts.ErrorIfNotPristine {
     275           1 :                         liveFileNums := make(map[base.DiskFileNum]struct{})
     276           1 :                         d.mu.versions.addLiveFileNums(liveFileNums)
     277           1 :                         if len(liveFileNums) != 0 {
     278           1 :                                 return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
     279           1 :                         }
     280             :                 }
     281             :         }
     282             : 
     283             :         // In read-only mode, we replay directly into the mutable memtable but never
     284             :         // flush it. We need to delay creation of the memtable until we know the
     285             :         // sequence number of the first batch that will be inserted.
     286           2 :         if !d.opts.ReadOnly {
     287           2 :                 var entry *flushableEntry
     288           2 :                 d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load())
     289           2 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     290           2 :         }
     291             : 
     292             :         // List the objects
     293           2 :         ls, err := opts.FS.List(d.walDirname)
     294           2 :         if err != nil {
     295           1 :                 return nil, err
     296           1 :         }
     297           2 :         if d.dirname != d.walDirname {
     298           2 :                 ls2, err := opts.FS.List(d.dirname)
     299           2 :                 if err != nil {
     300           0 :                         return nil, err
     301           0 :                 }
     302           2 :                 ls = append(ls, ls2...)
     303             :         }
     304           2 :         providerSettings := objstorageprovider.Settings{
     305           2 :                 Logger:              opts.Logger,
     306           2 :                 FS:                  opts.FS,
     307           2 :                 FSDirName:           dirname,
     308           2 :                 FSDirInitialListing: ls,
     309           2 :                 FSCleaner:           opts.Cleaner,
     310           2 :                 NoSyncOnClose:       opts.NoSyncOnClose,
     311           2 :                 BytesPerSync:        opts.BytesPerSync,
     312           2 :         }
     313           2 :         providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
     314           2 :         providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
     315           2 :         providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
     316           2 :         providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
     317           2 : 
     318           2 :         d.objProvider, err = objstorageprovider.Open(providerSettings)
     319           2 :         if err != nil {
     320           1 :                 return nil, err
     321           1 :         }
     322             : 
     323           2 :         d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
     324           2 : 
     325           2 :         if manifestExists {
     326           2 :                 curVersion := d.mu.versions.currentVersion()
     327           2 :                 if err := checkConsistency(curVersion, dirname, d.objProvider); err != nil {
     328           0 :                         return nil, err
     329           0 :                 }
     330             :         }
     331             : 
     332           2 :         tableCacheSize := TableCacheSize(opts.MaxOpenFiles)
     333           2 :         d.tableCache = newTableCacheContainer(opts.TableCache, d.cacheID, d.objProvider, d.opts, tableCacheSize)
     334           2 :         d.newIters = d.tableCache.newIters
     335           2 :         d.tableNewRangeKeyIter = d.tableCache.newRangeKeyIter
     336           2 : 
     337           2 :         // Replay any newer log files than the ones named in the manifest.
     338           2 :         type fileNumAndName struct {
     339           2 :                 num  base.DiskFileNum
     340           2 :                 name string
     341           2 :         }
     342           2 :         var logFiles []fileNumAndName
     343           2 :         var previousOptionsFileNum FileNum
     344           2 :         var previousOptionsFilename string
     345           2 :         for _, filename := range ls {
     346           2 :                 ft, fn, ok := base.ParseFilename(opts.FS, filename)
     347           2 :                 if !ok {
     348           2 :                         continue
     349             :                 }
     350             : 
     351             :                 // Don't reuse any obsolete file numbers to avoid modifying an
     352             :                 // ingested sstable's original external file.
     353           2 :                 if d.mu.versions.nextFileNum <= uint64(fn.FileNum()) {
     354           2 :                         d.mu.versions.nextFileNum = uint64(fn.FileNum()) + 1
     355           2 :                 }
     356             : 
     357           2 :                 switch ft {
     358           2 :                 case fileTypeLog:
     359           2 :                         if fn >= d.mu.versions.minUnflushedLogNum {
     360           2 :                                 logFiles = append(logFiles, fileNumAndName{fn, filename})
     361           2 :                         }
     362           2 :                         if d.logRecycler.minRecycleLogNum <= fn.FileNum() {
     363           2 :                                 d.logRecycler.minRecycleLogNum = fn.FileNum() + 1
     364           2 :                         }
     365           2 :                 case fileTypeOptions:
     366           2 :                         if previousOptionsFileNum < fn.FileNum() {
     367           2 :                                 previousOptionsFileNum = fn.FileNum()
     368           2 :                                 previousOptionsFilename = filename
     369           2 :                         }
     370           1 :                 case fileTypeTemp, fileTypeOldTemp:
     371           1 :                         if !d.opts.ReadOnly {
     372           1 :                                 // Some codepaths write to a temporary file and then
     373           1 :                                 // rename it to its final location when complete.  A
     374           1 :                                 // temp file is leftover if a process exits before the
     375           1 :                                 // rename.  Remove it.
     376           1 :                                 err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
     377           1 :                                 if err != nil {
     378           0 :                                         return nil, err
     379           0 :                                 }
     380             :                         }
     381             :                 }
     382             :         }
     383             : 
     384             :         // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
     385             :         // objProvider. This avoids FileNum collisions with obsolete sstables.
     386           2 :         objects := d.objProvider.List()
     387           2 :         for _, obj := range objects {
     388           2 :                 if d.mu.versions.nextFileNum <= uint64(obj.DiskFileNum) {
     389           1 :                         d.mu.versions.nextFileNum = uint64(obj.DiskFileNum) + 1
     390           1 :                 }
     391             :         }
     392             : 
     393             :         // Validate the most-recent OPTIONS file, if there is one.
     394           2 :         var strictWALTail bool
     395           2 :         if previousOptionsFilename != "" {
     396           2 :                 path := opts.FS.PathJoin(dirname, previousOptionsFilename)
     397           2 :                 strictWALTail, err = checkOptions(opts, path)
     398           2 :                 if err != nil {
     399           1 :                         return nil, err
     400           1 :                 }
     401             :         }
     402             : 
     403           2 :         sort.Slice(logFiles, func(i, j int) bool {
     404           2 :                 return logFiles[i].num < logFiles[j].num
     405           2 :         })
     406             : 
     407           2 :         var ve versionEdit
     408           2 :         var toFlush flushableList
     409           2 :         for i, lf := range logFiles {
     410           2 :                 lastWAL := i == len(logFiles)-1
     411           2 :                 flush, maxSeqNum, err := d.replayWAL(jobID, &ve, opts.FS,
     412           2 :                         opts.FS.PathJoin(d.walDirname, lf.name), lf.num, strictWALTail && !lastWAL)
     413           2 :                 if err != nil {
     414           1 :                         return nil, err
     415           1 :                 }
     416           2 :                 toFlush = append(toFlush, flush...)
     417           2 :                 d.mu.versions.markFileNumUsed(lf.num)
     418           2 :                 if d.mu.versions.logSeqNum.Load() < maxSeqNum {
     419           2 :                         d.mu.versions.logSeqNum.Store(maxSeqNum)
     420           2 :                 }
     421             :         }
     422           2 :         d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
     423           2 : 
     424           2 :         if !d.opts.ReadOnly {
     425           2 :                 // Create an empty .log file.
     426           2 :                 newLogNum := d.mu.versions.getNextDiskFileNum()
     427           2 : 
     428           2 :                 // This logic is slightly different than RocksDB's. Specifically, RocksDB
     429           2 :                 // sets MinUnflushedLogNum to max-recovered-log-num + 1. We set it to the
     430           2 :                 // newLogNum. There should be no difference in using either value.
     431           2 :                 ve.MinUnflushedLogNum = newLogNum
     432           2 : 
     433           2 :                 // Create the manifest with the updated MinUnflushedLogNum before
     434           2 :                 // creating the new log file. If we created the log file first, a
     435           2 :                 // crash before the manifest is synced could leave two WALs with
     436           2 :                 // unclean tails.
     437           2 :                 d.mu.versions.logLock()
     438           2 :                 if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo {
     439           2 :                         return nil
     440           2 :                 }); err != nil {
     441           1 :                         return nil, err
     442           1 :                 }
     443             : 
     444           2 :                 for _, entry := range toFlush {
     445           2 :                         entry.readerUnrefLocked(true)
     446           2 :                 }
     447             : 
     448           2 :                 newLogName := base.MakeFilepath(opts.FS, d.walDirname, fileTypeLog, newLogNum)
     449           2 :                 d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: 0})
     450           2 :                 logFile, err := opts.FS.Create(newLogName)
     451           2 :                 if err != nil {
     452           1 :                         return nil, err
     453           1 :                 }
     454           2 :                 if err := d.walDir.Sync(); err != nil {
     455           1 :                         return nil, err
     456           1 :                 }
     457           2 :                 d.opts.EventListener.WALCreated(WALCreateInfo{
     458           2 :                         JobID:   jobID,
     459           2 :                         Path:    newLogName,
     460           2 :                         FileNum: newLogNum,
     461           2 :                 })
     462           2 :                 // This isn't strictly necessary as we don't use the log number for
     463           2 :                 // memtables being flushed, only for the next unflushed memtable.
     464           2 :                 d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
     465           2 : 
     466           2 :                 logFile = vfs.NewSyncingFile(logFile, vfs.SyncingFileOptions{
     467           2 :                         NoSyncOnClose:   d.opts.NoSyncOnClose,
     468           2 :                         BytesPerSync:    d.opts.WALBytesPerSync,
     469           2 :                         PreallocateSize: d.walPreallocateSize(),
     470           2 :                 })
     471           2 :                 d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
     472           2 :                         Buckets: FsyncLatencyBuckets,
     473           2 :                 })
     474           2 : 
     475           2 :                 logWriterConfig := record.LogWriterConfig{
     476           2 :                         WALMinSyncInterval: d.opts.WALMinSyncInterval,
     477           2 :                         WALFsyncLatency:    d.mu.log.metrics.fsyncLatency,
     478           2 :                         QueueSemChan:       d.commit.logSyncQSem,
     479           2 :                 }
     480           2 :                 d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig)
     481           2 :                 d.mu.versions.metrics.WAL.Files++
     482             :         }
     483           2 :         d.updateReadStateLocked(d.opts.DebugCheck)
     484           2 : 
     485           2 :         // If the Options specify a format major version higher than the
     486           2 :         // loaded database's, upgrade it. If this is a new database, this
     487           2 :         // code path also performs an initial upgrade from the starting
     488           2 :         // implicit MostCompatible version.
     489           2 :         //
     490           2 :         // We ratchet the version this far into Open so that migrations have a read
     491           2 :         // state available.
     492           2 :         if !d.opts.ReadOnly && opts.FormatMajorVersion > d.FormatMajorVersion() {
     493           2 :                 if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
     494           0 :                         return nil, err
     495           0 :                 }
     496             :         }
     497             : 
     498           2 :         if !d.opts.ReadOnly {
     499           2 :                 // Write the current options to disk.
     500           2 :                 d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
     501           2 :                 tmpPath := base.MakeFilepath(opts.FS, dirname, fileTypeTemp, d.optionsFileNum)
     502           2 :                 optionsPath := base.MakeFilepath(opts.FS, dirname, fileTypeOptions, d.optionsFileNum)
     503           2 : 
     504           2 :                 // Write them to a temporary file first, in case we crash before
     505           2 :                 // we're done. A corrupt options file prevents opening the
     506           2 :                 // database.
     507           2 :                 optionsFile, err := opts.FS.Create(tmpPath)
     508           2 :                 if err != nil {
     509           1 :                         return nil, err
     510           1 :                 }
     511           2 :                 serializedOpts := []byte(opts.String())
     512           2 :                 if _, err := optionsFile.Write(serializedOpts); err != nil {
     513           1 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     514           1 :                 }
     515           2 :                 d.optionsFileSize = uint64(len(serializedOpts))
     516           2 :                 if err := optionsFile.Sync(); err != nil {
     517           1 :                         return nil, errors.CombineErrors(err, optionsFile.Close())
     518           1 :                 }
     519           2 :                 if err := optionsFile.Close(); err != nil {
     520           0 :                         return nil, err
     521           0 :                 }
     522             :                 // Atomically rename to the OPTIONS-XXXXXX path. This rename is
     523             :                 // guaranteed to be atomic because the destination path does not
     524             :                 // exist.
     525           2 :                 if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
     526           1 :                         return nil, err
     527           1 :                 }
     528           2 :                 if err := d.dataDir.Sync(); err != nil {
     529           1 :                         return nil, err
     530           1 :                 }
     531             :         }
     532             : 
     533           2 :         if !d.opts.ReadOnly {
     534           2 :                 d.scanObsoleteFiles(ls)
     535           2 :                 d.deleteObsoleteFiles(jobID)
     536           2 :         } else {
     537           1 :                 // All the log files are obsolete.
     538           1 :                 d.mu.versions.metrics.WAL.Files = int64(len(logFiles))
     539           1 :         }
     540           2 :         d.mu.tableStats.cond.L = &d.mu.Mutex
     541           2 :         d.mu.tableValidation.cond.L = &d.mu.Mutex
     542           2 :         if !d.opts.ReadOnly {
     543           2 :                 d.maybeCollectTableStatsLocked()
     544           2 :         }
     545           2 :         d.calculateDiskAvailableBytes()
     546           2 : 
     547           2 :         d.maybeScheduleFlush()
     548           2 :         d.maybeScheduleCompaction()
     549           2 : 
     550           2 :         // Note: this is a no-op if invariants are disabled or race is enabled.
     551           2 :         //
     552           2 :         // Setting a finalizer on *DB causes *DB to never be reclaimed and the
     553           2 :         // finalizer to never be run. The problem is due to this limitation of
     554           2 :         // finalizers mention in the SetFinalizer docs:
     555           2 :         //
     556           2 :         //   If a cyclic structure includes a block with a finalizer, that cycle is
     557           2 :         //   not guaranteed to be garbage collected and the finalizer is not
     558           2 :         //   guaranteed to run, because there is no ordering that respects the
     559           2 :         //   dependencies.
     560           2 :         //
     561           2 :         // DB has cycles with several of its internal structures: readState,
     562           2 :         // newIters, tableCache, versions, etc. Each of this individually cause a
     563           2 :         // cycle and prevent the finalizer from being run. But we can workaround this
     564           2 :         // finializer limitation by setting a finalizer on another object that is
     565           2 :         // tied to the lifetime of DB: the DB.closed atomic.Value.
     566           2 :         dPtr := fmt.Sprintf("%p", d)
     567           2 :         invariants.SetFinalizer(d.closed, func(obj interface{}) {
     568           0 :                 v := obj.(*atomic.Value)
     569           0 :                 if err := v.Load(); err == nil {
     570           0 :                         fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
     571           0 :                         os.Exit(1)
     572           0 :                 }
     573             :         })
     574             : 
     575           2 :         return d, nil
     576             : }
     577             : 
     578             : // prepareAndOpenDirs opens the directories for the store (and creates them if
     579             : // necessary).
     580             : //
     581             : // Returns an error if ReadOnly is set and the directories don't exist.
     582             : func prepareAndOpenDirs(
     583             :         dirname string, opts *Options,
     584           2 : ) (walDirname string, dataDir vfs.File, walDir vfs.File, err error) {
     585           2 :         walDirname = opts.WALDir
     586           2 :         if opts.WALDir == "" {
     587           2 :                 walDirname = dirname
     588           2 :         }
     589             : 
     590             :         // Create directories if needed.
     591           2 :         if !opts.ReadOnly {
     592           2 :                 if err := opts.FS.MkdirAll(dirname, 0755); err != nil {
     593           1 :                         return "", nil, nil, err
     594           1 :                 }
     595           2 :                 if walDirname != dirname {
     596           2 :                         if err := opts.FS.MkdirAll(walDirname, 0755); err != nil {
     597           0 :                                 return "", nil, nil, err
     598           0 :                         }
     599             :                 }
     600             :         }
     601             : 
     602           2 :         dataDir, err = opts.FS.OpenDir(dirname)
     603           2 :         if err != nil {
     604           1 :                 if opts.ReadOnly && oserror.IsNotExist(err) {
     605           1 :                         return "", nil, nil, errors.Errorf("pebble: database %q does not exist", dirname)
     606           1 :                 }
     607           1 :                 return "", nil, nil, err
     608             :         }
     609             : 
     610           2 :         if walDirname == dirname {
     611           2 :                 walDir = dataDir
     612           2 :         } else {
     613           2 :                 walDir, err = opts.FS.OpenDir(walDirname)
     614           2 :                 if err != nil {
     615           1 :                         dataDir.Close()
     616           1 :                         return "", nil, nil, err
     617           1 :                 }
     618             :         }
     619           2 :         return walDirname, dataDir, walDir, nil
     620             : }
     621             : 
     622             : // GetVersion returns the engine version string from the latest options
     623             : // file present in dir. Used to check what Pebble or RocksDB version was last
     624             : // used to write to the database stored in this directory. An empty string is
     625             : // returned if no valid OPTIONS file with a version key was found.
     626           1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
     627           1 :         ls, err := fs.List(dir)
     628           1 :         if err != nil {
     629           0 :                 return "", err
     630           0 :         }
     631           1 :         var version string
     632           1 :         lastOptionsSeen := FileNum(0)
     633           1 :         for _, filename := range ls {
     634           1 :                 ft, fn, ok := base.ParseFilename(fs, filename)
     635           1 :                 if !ok {
     636           1 :                         continue
     637             :                 }
     638           1 :                 switch ft {
     639           1 :                 case fileTypeOptions:
     640           1 :                         // If this file has a higher number than the last options file
     641           1 :                         // processed, reset version. This is because rocksdb often
     642           1 :                         // writes multiple options files without deleting previous ones.
     643           1 :                         // Otherwise, skip parsing this options file.
     644           1 :                         if fn.FileNum() > lastOptionsSeen {
     645           1 :                                 version = ""
     646           1 :                                 lastOptionsSeen = fn.FileNum()
     647           1 :                         } else {
     648           0 :                                 continue
     649             :                         }
     650           1 :                         f, err := fs.Open(fs.PathJoin(dir, filename))
     651           1 :                         if err != nil {
     652           0 :                                 return "", err
     653           0 :                         }
     654           1 :                         data, err := io.ReadAll(f)
     655           1 :                         f.Close()
     656           1 : 
     657           1 :                         if err != nil {
     658           0 :                                 return "", err
     659           0 :                         }
     660           1 :                         err = parseOptions(string(data), func(section, key, value string) error {
     661           1 :                                 switch {
     662           1 :                                 case section == "Version":
     663           1 :                                         switch key {
     664           1 :                                         case "pebble_version":
     665           1 :                                                 version = value
     666           1 :                                         case "rocksdb_version":
     667           1 :                                                 version = fmt.Sprintf("rocksdb v%s", value)
     668             :                                         }
     669             :                                 }
     670           1 :                                 return nil
     671             :                         })
     672           1 :                         if err != nil {
     673           0 :                                 return "", err
     674           0 :                         }
     675             :                 }
     676             :         }
     677           1 :         return version, nil
     678             : }
     679             : 
     680             : // replayWAL replays the edits in the specified log file. If the DB is in
     681             : // read only mode, then the WALs are replayed into memtables and not flushed. If
     682             : // the DB is not in read only mode, then the contents of the WAL are guaranteed
     683             : // to be flushed.
     684             : //
     685             : // The toFlush return value is a list of flushables associated with the WAL
     686             : // being replayed which will be flushed. Once the version edit has been applied
     687             : // to the manifest, it is up to the caller of replayWAL to unreference the
     688             : // toFlush flushables returned by replayWAL.
     689             : //
     690             : // d.mu must be held when calling this, but the mutex may be dropped and
     691             : // re-acquired during the course of this method.
     692             : func (d *DB) replayWAL(
     693             :         jobID int,
     694             :         ve *versionEdit,
     695             :         fs vfs.FS,
     696             :         filename string,
     697             :         logNum base.DiskFileNum,
     698             :         strictWALTail bool,
     699           2 : ) (toFlush flushableList, maxSeqNum uint64, err error) {
     700           2 :         file, err := fs.Open(filename)
     701           2 :         if err != nil {
     702           0 :                 return nil, 0, err
     703           0 :         }
     704           2 :         defer file.Close()
     705           2 :         var (
     706           2 :                 b               Batch
     707           2 :                 buf             bytes.Buffer
     708           2 :                 mem             *memTable
     709           2 :                 entry           *flushableEntry
     710           2 :                 rr              = record.NewReader(file, logNum)
     711           2 :                 offset          int64 // byte offset in rr
     712           2 :                 lastFlushOffset int64
     713           2 :                 keysReplayed    int64 // number of keys replayed
     714           2 :                 batchesReplayed int64 // number of batches replayed
     715           2 :         )
     716           2 : 
     717           2 :         if d.opts.ReadOnly {
     718           1 :                 // In read-only mode, we replay directly into the mutable memtable which will
     719           1 :                 // never be flushed.
     720           1 :                 mem = d.mu.mem.mutable
     721           1 :                 if mem != nil {
     722           1 :                         entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
     723           1 :                 }
     724             :         }
     725             : 
     726             :         // Flushes the current memtable, if not nil.
     727           2 :         flushMem := func() {
     728           2 :                 if mem == nil {
     729           2 :                         return
     730           2 :                 }
     731           2 :                 var logSize uint64
     732           2 :                 if offset >= lastFlushOffset {
     733           2 :                         logSize = uint64(offset - lastFlushOffset)
     734           2 :                 }
     735             :                 // Else, this was the initial memtable in the read-only case which must have
     736             :                 // been empty, but we need to flush it since we don't want to add to it later.
     737           2 :                 lastFlushOffset = offset
     738           2 :                 entry.logSize = logSize
     739           2 :                 if !d.opts.ReadOnly {
     740           2 :                         toFlush = append(toFlush, entry)
     741           2 :                 }
     742           2 :                 mem, entry = nil, nil
     743             :         }
     744             :         // Creates a new memtable if there is no current memtable.
     745           2 :         ensureMem := func(seqNum uint64) {
     746           2 :                 if mem != nil {
     747           2 :                         return
     748           2 :                 }
     749           2 :                 mem, entry = d.newMemTable(logNum, seqNum)
     750           2 :                 if d.opts.ReadOnly {
     751           1 :                         d.mu.mem.mutable = mem
     752           1 :                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
     753           1 :                 }
     754             :         }
     755             : 
     756             :         // updateVE is used to update ve with information about new files created
     757             :         // during the flush of any flushable not of type ingestedFlushable. For the
     758             :         // flushable of type ingestedFlushable we use custom handling below.
     759           2 :         updateVE := func() error {
     760           2 :                 // TODO(bananabrick): See if we can use the actual base level here,
     761           2 :                 // instead of using 1.
     762           2 :                 c := newFlush(d.opts, d.mu.versions.currentVersion(),
     763           2 :                         1 /* base level */, toFlush, d.timeNow())
     764           2 :                 newVE, _, _, err := d.runCompaction(jobID, c)
     765           2 :                 if err != nil {
     766           0 :                         return errors.Wrapf(err, "running compaction during WAL replay")
     767           0 :                 }
     768           2 :                 ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...)
     769           2 :                 return nil
     770             :         }
     771             : 
     772           2 :         for {
     773           2 :                 offset = rr.Offset()
     774           2 :                 r, err := rr.Next()
     775           2 :                 if err == nil {
     776           2 :                         _, err = io.Copy(&buf, r)
     777           2 :                 }
     778           2 :                 if err != nil {
     779           2 :                         // It is common to encounter a zeroed or invalid chunk due to WAL
     780           2 :                         // preallocation and WAL recycling. We need to distinguish these
     781           2 :                         // errors from EOF in order to recognize that the record was
     782           2 :                         // truncated and to avoid replaying subsequent WALs, but want
     783           2 :                         // to otherwise treat them like EOF.
     784           2 :                         if err == io.EOF {
     785           2 :                                 break
     786           1 :                         } else if record.IsInvalidRecord(err) && !strictWALTail {
     787           1 :                                 break
     788             :                         }
     789           1 :                         return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
     790             :                 }
     791             : 
     792           2 :                 if buf.Len() < batchHeaderLen {
     793           0 :                         return nil, 0, base.CorruptionErrorf("pebble: corrupt log file %q (num %s)",
     794           0 :                                 filename, errors.Safe(logNum))
     795           0 :                 }
     796             : 
     797           2 :                 if d.opts.ErrorIfNotPristine {
     798           1 :                         return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
     799           1 :                 }
     800             : 
     801             :                 // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
     802             :                 // which is used below.
     803           2 :                 b = Batch{db: d}
     804           2 :                 b.SetRepr(buf.Bytes())
     805           2 :                 seqNum := b.SeqNum()
     806           2 :                 maxSeqNum = seqNum + uint64(b.Count())
     807           2 :                 keysReplayed += int64(b.Count())
     808           2 :                 batchesReplayed++
     809           2 :                 {
     810           2 :                         br := b.Reader()
     811           2 :                         if kind, encodedFileNum, _, _ := br.Next(); kind == InternalKeyKindIngestSST {
     812           2 :                                 fileNums := make([]base.DiskFileNum, 0, b.Count())
     813           2 :                                 addFileNum := func(encodedFileNum []byte) {
     814           2 :                                         fileNum, n := binary.Uvarint(encodedFileNum)
     815           2 :                                         if n <= 0 {
     816           0 :                                                 panic("pebble: ingest sstable file num is invalid.")
     817             :                                         }
     818           2 :                                         fileNums = append(fileNums, base.FileNum(fileNum).DiskFileNum())
     819             :                                 }
     820           2 :                                 addFileNum(encodedFileNum)
     821           2 : 
     822           2 :                                 for i := 1; i < int(b.Count()); i++ {
     823           1 :                                         kind, encodedFileNum, _, ok := br.Next()
     824           1 :                                         if kind != InternalKeyKindIngestSST {
     825           0 :                                                 panic("pebble: invalid batch key kind.")
     826             :                                         }
     827           1 :                                         if !ok {
     828           0 :                                                 panic("pebble: invalid batch count.")
     829             :                                         }
     830           1 :                                         addFileNum(encodedFileNum)
     831             :                                 }
     832             : 
     833           2 :                                 if _, _, _, ok := br.Next(); ok {
     834           0 :                                         panic("pebble: invalid number of entries in batch.")
     835             :                                 }
     836             : 
     837           2 :                                 meta := make([]*fileMetadata, len(fileNums))
     838           2 :                                 for i, n := range fileNums {
     839           2 :                                         var readable objstorage.Readable
     840           2 :                                         objMeta, err := d.objProvider.Lookup(fileTypeTable, n)
     841           2 :                                         if err != nil {
     842           0 :                                                 return nil, 0, errors.Wrap(err, "pebble: error when looking up ingested SSTs")
     843           0 :                                         }
     844           2 :                                         if objMeta.IsRemote() {
     845           1 :                                                 readable, err = d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true})
     846           1 :                                                 if err != nil {
     847           0 :                                                         return nil, 0, errors.Wrap(err, "pebble: error when opening flushable ingest files")
     848           0 :                                                 }
     849           2 :                                         } else {
     850           2 :                                                 path := base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n)
     851           2 :                                                 f, err := d.opts.FS.Open(path)
     852           2 :                                                 if err != nil {
     853           0 :                                                         return nil, 0, err
     854           0 :                                                 }
     855             : 
     856           2 :                                                 readable, err = sstable.NewSimpleReadable(f)
     857           2 :                                                 if err != nil {
     858           0 :                                                         return nil, 0, err
     859           0 :                                                 }
     860             :                                         }
     861             :                                         // NB: ingestLoad1 will close readable.
     862           2 :                                         meta[i], err = ingestLoad1(d.opts, d.FormatMajorVersion(), readable, d.cacheID, n)
     863           2 :                                         if err != nil {
     864           0 :                                                 return nil, 0, errors.Wrap(err, "pebble: error when loading flushable ingest files")
     865           0 :                                         }
     866             :                                 }
     867             : 
     868           2 :                                 if uint32(len(meta)) != b.Count() {
     869           0 :                                         panic("pebble: couldn't load all files in WAL entry.")
     870             :                                 }
     871             : 
     872           2 :                                 entry, err = d.newIngestedFlushableEntry(
     873           2 :                                         meta, seqNum, logNum,
     874           2 :                                 )
     875           2 :                                 if err != nil {
     876           0 :                                         return nil, 0, err
     877           0 :                                 }
     878             : 
     879           2 :                                 if d.opts.ReadOnly {
     880           1 :                                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
     881           1 :                                         // We added the IngestSST flushable to the queue. But there
     882           1 :                                         // must be at least one WAL entry waiting to be replayed. We
     883           1 :                                         // have to ensure this newer WAL entry isn't replayed into
     884           1 :                                         // the current value of d.mu.mem.mutable because the current
     885           1 :                                         // mutable memtable exists before this flushable entry in
     886           1 :                                         // the memtable queue. To ensure this, we just need to unset
     887           1 :                                         // d.mu.mem.mutable. When a newer WAL is replayed, we will
     888           1 :                                         // set d.mu.mem.mutable to a newer value.
     889           1 :                                         d.mu.mem.mutable = nil
     890           2 :                                 } else {
     891           2 :                                         toFlush = append(toFlush, entry)
     892           2 :                                         // During WAL replay, the lsm only has L0, hence, the
     893           2 :                                         // baseLevel is 1. For the sake of simplicity, we place the
     894           2 :                                         // ingested files in L0 here, instead of finding their
     895           2 :                                         // target levels. This is a simplification for the sake of
     896           2 :                                         // simpler code. It is expected that WAL replay should be
     897           2 :                                         // rare, and that flushables of type ingestedFlushable
     898           2 :                                         // should also be rare. So, placing the ingested files in L0
     899           2 :                                         // is alright.
     900           2 :                                         //
     901           2 :                                         // TODO(bananabrick): Maybe refactor this function to allow
     902           2 :                                         // us to easily place ingested files in levels as low as
     903           2 :                                         // possible during WAL replay. It would require breaking up
     904           2 :                                         // the application of ve to the manifest into chunks and is
     905           2 :                                         // not pretty w/o a refactor to this function and how it's
     906           2 :                                         // used.
     907           2 :                                         c := newFlush(
     908           2 :                                                 d.opts, d.mu.versions.currentVersion(),
     909           2 :                                                 1, /* base level */
     910           2 :                                                 []*flushableEntry{entry},
     911           2 :                                                 d.timeNow(),
     912           2 :                                         )
     913           2 :                                         for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
     914           2 :                                                 ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata})
     915           2 :                                         }
     916             :                                 }
     917           2 :                                 return toFlush, maxSeqNum, nil
     918             :                         }
     919             :                 }
     920             : 
     921           2 :                 if b.memTableSize >= uint64(d.largeBatchThreshold) {
     922           1 :                         flushMem()
     923           1 :                         // Make a copy of the data slice since it is currently owned by buf and will
     924           1 :                         // be reused in the next iteration.
     925           1 :                         b.data = append([]byte(nil), b.data...)
     926           1 :                         b.flushable = newFlushableBatch(&b, d.opts.Comparer)
     927           1 :                         entry := d.newFlushableEntry(b.flushable, logNum, b.SeqNum())
     928           1 :                         // Disable memory accounting by adding a reader ref that will never be
     929           1 :                         // removed.
     930           1 :                         entry.readerRefs.Add(1)
     931           1 :                         if d.opts.ReadOnly {
     932           1 :                                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     933           1 :                                 // We added the flushable batch to the flushable to the queue.
     934           1 :                                 // But there must be at least one WAL entry waiting to be
     935           1 :                                 // replayed. We have to ensure this newer WAL entry isn't
     936           1 :                                 // replayed into the current value of d.mu.mem.mutable because
     937           1 :                                 // the current mutable memtable exists before this flushable
     938           1 :                                 // entry in the memtable queue. To ensure this, we just need to
     939           1 :                                 // unset d.mu.mem.mutable. When a newer WAL is replayed, we will
     940           1 :                                 // set d.mu.mem.mutable to a newer value.
     941           1 :                                 d.mu.mem.mutable = nil
     942           1 :                         } else {
     943           1 :                                 toFlush = append(toFlush, entry)
     944           1 :                         }
     945           2 :                 } else {
     946           2 :                         ensureMem(seqNum)
     947           2 :                         if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
     948           0 :                                 return nil, 0, err
     949           0 :                         }
     950             :                         // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
     951             :                         // batch may not initially fit, but will eventually fit (since it is smaller than
     952             :                         // largeBatchThreshold).
     953           2 :                         for err == arenaskl.ErrArenaFull {
     954           2 :                                 flushMem()
     955           2 :                                 ensureMem(seqNum)
     956           2 :                                 err = mem.prepare(&b)
     957           2 :                                 if err != nil && err != arenaskl.ErrArenaFull {
     958           0 :                                         return nil, 0, err
     959           0 :                                 }
     960             :                         }
     961           2 :                         if err = mem.apply(&b, seqNum); err != nil {
     962           0 :                                 return nil, 0, err
     963           0 :                         }
     964           2 :                         mem.writerUnref()
     965             :                 }
     966           2 :                 buf.Reset()
     967             :         }
     968             : 
     969           2 :         d.opts.Logger.Infof("[JOB %d] WAL file %s with log number %s stopped reading at offset: %d; replayed %d keys in %d batches", jobID, filename, logNum.String(), offset, keysReplayed, batchesReplayed)
     970           2 :         flushMem()
     971           2 : 
     972           2 :         // mem is nil here.
     973           2 :         if !d.opts.ReadOnly {
     974           2 :                 err = updateVE()
     975           2 :                 if err != nil {
     976           0 :                         return nil, 0, err
     977           0 :                 }
     978             :         }
     979           2 :         return toFlush, maxSeqNum, err
     980             : }
     981             : 
     982           2 : func checkOptions(opts *Options, path string) (strictWALTail bool, err error) {
     983           2 :         f, err := opts.FS.Open(path)
     984           2 :         if err != nil {
     985           0 :                 return false, err
     986           0 :         }
     987           2 :         defer f.Close()
     988           2 : 
     989           2 :         data, err := io.ReadAll(f)
     990           2 :         if err != nil {
     991           0 :                 return false, err
     992           0 :         }
     993           2 :         return opts.checkOptions(string(data))
     994             : }
     995             : 
     996             : // DBDesc briefly describes high-level state about a database.
     997             : type DBDesc struct {
     998             :         // Exists is true if an existing database was found.
     999             :         Exists bool
    1000             :         // FormatMajorVersion indicates the database's current format
    1001             :         // version.
    1002             :         FormatMajorVersion FormatMajorVersion
    1003             :         // ManifestFilename is the filename of the current active manifest,
    1004             :         // if the database exists.
    1005             :         ManifestFilename string
    1006             : }
    1007             : 
    1008             : // Peek looks for an existing database in dirname on the provided FS. It
    1009             : // returns a brief description of the database. Peek is read-only and
    1010             : // does not open the database
    1011           1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
    1012           1 :         vers, versMarker, err := lookupFormatMajorVersion(fs, dirname)
    1013           1 :         if err != nil {
    1014           1 :                 return nil, err
    1015           1 :         }
    1016             :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1017             :         // PeekMarker variant that avoids opening the directory.
    1018           1 :         if err := versMarker.Close(); err != nil {
    1019           0 :                 return nil, err
    1020           0 :         }
    1021             : 
    1022             :         // Find the currently active manifest, if there is one.
    1023           1 :         manifestMarker, manifestFileNum, exists, err := findCurrentManifest(vers, fs, dirname)
    1024           1 :         if err != nil {
    1025           0 :                 return nil, err
    1026           0 :         }
    1027             :         // TODO(jackson): Immediately closing the marker is clunky. Add a
    1028             :         // PeekMarker variant that avoids opening the directory.
    1029           1 :         if err := manifestMarker.Close(); err != nil {
    1030           0 :                 return nil, err
    1031           0 :         }
    1032             : 
    1033           1 :         desc := &DBDesc{
    1034           1 :                 Exists:             exists,
    1035           1 :                 FormatMajorVersion: vers,
    1036           1 :         }
    1037           1 :         if exists {
    1038           1 :                 desc.ManifestFilename = base.MakeFilepath(fs, dirname, fileTypeManifest, manifestFileNum)
    1039           1 :         }
    1040           1 :         return desc, nil
    1041             : }
    1042             : 
    1043             : // LockDirectory acquires the database directory lock in the named directory,
    1044             : // preventing another process from opening the database. LockDirectory returns a
    1045             : // handle to the held lock that may be passed to Open through Options.Lock to
    1046             : // subsequently open the database, skipping lock acquistion during Open.
    1047             : //
    1048             : // LockDirectory may be used to expand the critical section protected by the
    1049             : // database lock to include setup before the call to Open.
    1050           2 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
    1051           2 :         fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, fileTypeLock, base.FileNum(0).DiskFileNum()))
    1052           2 :         if err != nil {
    1053           1 :                 return nil, err
    1054           1 :         }
    1055           2 :         l := &Lock{dirname: dirname, fileLock: fileLock}
    1056           2 :         l.refs.Store(1)
    1057           2 :         invariants.SetFinalizer(l, func(obj interface{}) {
    1058           2 :                 if refs := obj.(*Lock).refs.Load(); refs > 0 {
    1059           0 :                         panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
    1060             :                 }
    1061             :         })
    1062           2 :         return l, nil
    1063             : }
    1064             : 
    1065             : // Lock represents a file lock on a directory. It may be passed to Open through
    1066             : // Options.Lock to elide lock aquisition during Open.
    1067             : type Lock struct {
    1068             :         dirname  string
    1069             :         fileLock io.Closer
    1070             :         // refs is a count of the number of handles on the lock. refs must be 0, 1
    1071             :         // or 2.
    1072             :         //
    1073             :         // When acquired by the client and passed to Open, refs = 1 and the Open
    1074             :         // call increments it to 2. When the database is closed, it's decremented to
    1075             :         // 1. Finally when the original caller, calls Close on the Lock, it's
    1076             :         // drecemented to zero and the underlying file lock is released.
    1077             :         //
    1078             :         // When Open acquires the file lock, refs remains at 1 until the database is
    1079             :         // closed.
    1080             :         refs atomic.Int32
    1081             : }
    1082             : 
    1083           1 : func (l *Lock) refForOpen() error {
    1084           1 :         // During Open, when a user passed in a lock, the reference count must be
    1085           1 :         // exactly 1. If it's zero, the lock is no longer held and is invalid. If
    1086           1 :         // it's 2, the lock is already in use by another database within the
    1087           1 :         // process.
    1088           1 :         if !l.refs.CompareAndSwap(1, 2) {
    1089           1 :                 return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
    1090           1 :         }
    1091           1 :         return nil
    1092             : }
    1093             : 
    1094             : // Close releases the lock, permitting another process to lock and open the
    1095             : // database. Close must not be called until after a database using the Lock has
    1096             : // been closed.
    1097           2 : func (l *Lock) Close() error {
    1098           2 :         if l.refs.Add(-1) > 0 {
    1099           1 :                 return nil
    1100           1 :         }
    1101           2 :         defer func() { l.fileLock = nil }()
    1102           2 :         return l.fileLock.Close()
    1103             : }
    1104             : 
    1105             : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
    1106             : // does not exist.
    1107             : //
    1108             : // Note that errors can be wrapped with more details; use errors.Is().
    1109             : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
    1110             : 
    1111             : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
    1112             : // already exists.
    1113             : //
    1114             : // Note that errors can be wrapped with more details; use errors.Is().
    1115             : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
    1116             : 
    1117             : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
    1118             : // already exists and is not pristine.
    1119             : //
    1120             : // Note that errors can be wrapped with more details; use errors.Is().
    1121             : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
    1122             : 
    1123             : // IsCorruptionError returns true if the given error indicates database
    1124             : // corruption.
    1125           1 : func IsCorruptionError(err error) bool {
    1126           1 :         return errors.Is(err, base.ErrCorruption)
    1127           1 : }
    1128             : 
    1129           2 : func checkConsistency(v *manifest.Version, dirname string, objProvider objstorage.Provider) error {
    1130           2 :         var errs []error
    1131           2 :         dedup := make(map[base.DiskFileNum]struct{})
    1132           2 :         for level, files := range v.Levels {
    1133           2 :                 iter := files.Iter()
    1134           2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1135           2 :                         backingState := f.FileBacking
    1136           2 :                         if _, ok := dedup[backingState.DiskFileNum]; ok {
    1137           2 :                                 continue
    1138             :                         }
    1139           2 :                         dedup[backingState.DiskFileNum] = struct{}{}
    1140           2 :                         fileNum := backingState.DiskFileNum
    1141           2 :                         fileSize := backingState.Size
    1142           2 :                         // We allow foreign objects to have a mismatch between sizes. This is
    1143           2 :                         // because we might skew the backing size stored by our objprovider
    1144           2 :                         // to prevent us from over-prioritizing this file for compaction.
    1145           2 :                         meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
    1146           2 :                         var size int64
    1147           2 :                         if err == nil {
    1148           2 :                                 if objProvider.IsSharedForeign(meta) {
    1149           0 :                                         continue
    1150             :                                 }
    1151           2 :                                 size, err = objProvider.Size(meta)
    1152             :                         }
    1153           2 :                         if err != nil {
    1154           1 :                                 errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
    1155           1 :                                 continue
    1156             :                         }
    1157             : 
    1158           2 :                         if size != int64(fileSize) {
    1159           0 :                                 errs = append(errs, errors.Errorf(
    1160           0 :                                         "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
    1161           0 :                                         errors.Safe(level), fileNum, objProvider.Path(meta),
    1162           0 :                                         errors.Safe(size), errors.Safe(fileSize)))
    1163           0 :                                 continue
    1164             :                         }
    1165             :                 }
    1166             :         }
    1167           2 :         return errors.Join(errs...)
    1168             : }

Generated by: LCOV version 1.14