LCOV - code coverage report
Current view: top level - pebble - recovery.go (source / functions) Coverage Total Hit
Test: 2025-10-19 08:18Z 3dd8e6f3 - tests only.lcov Lines: 90.3 % 216 195
Test Date: 2025-10-19 08:18:50 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "io"
       9              : 
      10              :         "github.com/cockroachdb/errors"
      11              :         "github.com/cockroachdb/pebble/internal/base"
      12              :         "github.com/cockroachdb/pebble/internal/invariants"
      13              :         "github.com/cockroachdb/pebble/internal/manifest"
      14              :         "github.com/cockroachdb/pebble/objstorage"
      15              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      16              :         "github.com/cockroachdb/pebble/record"
      17              :         "github.com/cockroachdb/pebble/vfs"
      18              :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      19              : )
      20              : 
      21              : // recoverState reads the named database directory and recovers the set of files
      22              : // encoding the database state at the moment the previous process exited.
      23              : // recoverState is read only and does not mutate the on-disk state.
      24            1 : func recoverState(opts *Options, dirname string) (s *recoveredState, err error) {
      25            1 :         rs := &recoveredState{
      26            1 :                 dirname: dirname,
      27            1 :                 fs:      opts.FS,
      28            1 :         }
      29            1 :         if err := rs.init(opts, dirname); err != nil {
      30            1 :                 return nil, errors.CombineErrors(err, rs.Close())
      31            1 :         }
      32            1 :         return rs, nil
      33              : }
      34              : 
      35            1 : func (rs *recoveredState) init(opts *Options, dirname string) error {
      36            1 :         // List the directory contents. This also happens to include WAL log files,
      37            1 :         // if they are in the same dir.
      38            1 :         var err error
      39            1 :         if rs.ls, err = opts.FS.List(dirname); err != nil {
      40            1 :                 return errors.Wrapf(err, "pebble: database %q", dirname)
      41            1 :         }
      42              :         // Find the currently format major version and active manifest.
      43            1 :         rs.fmv, rs.fmvMarker, err = lookupFormatMajorVersion(opts.FS, dirname, rs.ls)
      44            1 :         if err != nil {
      45            1 :                 return errors.Wrapf(err, "pebble: database %q", dirname)
      46            1 :         }
      47              : 
      48              :         // Open the object storage provider.
      49            1 :         providerSettings := opts.MakeObjStorageProviderSettings(dirname)
      50            1 :         providerSettings.FSDirInitialListing = rs.ls
      51            1 :         rs.objProvider, err = objstorageprovider.Open(providerSettings)
      52            1 :         if err != nil {
      53            1 :                 return errors.Wrapf(err, "pebble: database %q", dirname)
      54            1 :         }
      55              : 
      56              :         // Determine which manifest is current, and if one exists, replay it to
      57              :         // recover the current Version of the LSM.
      58            1 :         var manifestExists bool
      59            1 :         rs.manifestMarker, rs.manifestFileNum, manifestExists, err = findCurrentManifest(opts.FS, dirname, rs.ls)
      60            1 :         if err != nil {
      61            1 :                 return errors.Wrapf(err, "pebble: database %q", dirname)
      62            1 :         }
      63            1 :         if manifestExists {
      64            1 :                 recoveredVersion, err := recoverVersion(opts, dirname, rs.objProvider, rs.manifestFileNum)
      65            1 :                 if err != nil {
      66            1 :                         return err
      67            1 :                 }
      68            1 :                 if !opts.DisableConsistencyCheck {
      69            1 :                         if err := checkConsistency(recoveredVersion.version, rs.objProvider); err != nil {
      70            1 :                                 return err
      71            1 :                         }
      72              :                 }
      73            1 :                 rs.recoveredVersion = recoveredVersion
      74              :         }
      75              : 
      76              :         // Identify the maximal file number in the directory. We do not want to
      77              :         // reuse any existing file numbers even if they are obsolete file numbers to
      78              :         // avoid modifying an ingested sstable's original external file.
      79              :         //
      80              :         // We also identify the most recent OPTIONS file, so we can validate our
      81              :         // configured Options against the previous options, and we collect any
      82              :         // orphaned temporary files that should be removed.
      83            1 :         var previousOptionsFileNum base.DiskFileNum
      84            1 :         for _, filename := range rs.ls {
      85            1 :                 ft, fn, ok := base.ParseFilename(opts.FS, filename)
      86            1 :                 if !ok {
      87            1 :                         continue
      88              :                 }
      89            1 :                 rs.maxFilenumUsed = max(rs.maxFilenumUsed, fn)
      90            1 :                 switch ft {
      91            0 :                 case base.FileTypeLog:
      92              :                         // Ignore.
      93            1 :                 case base.FileTypeOptions:
      94            1 :                         if previousOptionsFileNum < fn {
      95            1 :                                 previousOptionsFileNum = fn
      96            1 :                                 rs.previousOptionsFilename = filename
      97            1 :                         }
      98            1 :                 case base.FileTypeTemp, base.FileTypeOldTemp:
      99            1 :                         rs.obsoleteTempFilenames = append(rs.obsoleteTempFilenames, filename)
     100              :                 }
     101              :         }
     102            1 :         return nil
     103              : }
     104              : 
     105              : // recoveredState encapsulates state recovered from reading the database
     106              : // directory.
     107              : type recoveredState struct {
     108              :         dirname                 string
     109              :         fmv                     FormatMajorVersion
     110              :         fmvMarker               *atomicfs.Marker
     111              :         fs                      vfs.FS
     112              :         ls                      []string
     113              :         manifestMarker          *atomicfs.Marker
     114              :         manifestFileNum         base.DiskFileNum
     115              :         maxFilenumUsed          base.DiskFileNum
     116              :         obsoleteTempFilenames   []string
     117              :         objProvider             objstorage.Provider
     118              :         previousOptionsFilename string
     119              :         recoveredVersion        *recoveredVersion
     120              : }
     121              : 
     122              : // RemoveObsolete removes obsolete files uncovered during recovery.
     123            1 : func (rs *recoveredState) RemoveObsolete() error {
     124            1 :         var err error
     125            1 :         // Atomic markers may leave behind obsolete files if there's a crash
     126            1 :         // mid-update.
     127            1 :         if rs.fmvMarker != nil {
     128            1 :                 err = errors.CombineErrors(err, rs.fmvMarker.RemoveObsolete())
     129            1 :         }
     130            1 :         if rs.manifestMarker != nil {
     131            1 :                 err = errors.CombineErrors(err, rs.manifestMarker.RemoveObsolete())
     132            1 :         }
     133              :         // Some codepaths write to a temporary file and then rename it to its final
     134              :         // location when complete.  A temp file is leftover if a process exits
     135              :         // before the rename. Remove any that were found.
     136            1 :         for _, filename := range rs.obsoleteTempFilenames {
     137            1 :                 err = errors.CombineErrors(err, rs.fs.Remove(rs.fs.PathJoin(rs.dirname, filename)))
     138            1 :         }
     139            1 :         return err
     140              : }
     141              : 
     142              : // Close closes resources held by the RecoveredState, including open file
     143              : // descriptors.
     144            1 : func (rs *recoveredState) Close() error {
     145            1 :         var err error
     146            1 :         if rs.fmvMarker != nil {
     147            1 :                 err = errors.CombineErrors(err, rs.fmvMarker.Close())
     148            1 :         }
     149            1 :         if rs.manifestMarker != nil {
     150            1 :                 err = errors.CombineErrors(err, rs.manifestMarker.Close())
     151            1 :         }
     152            1 :         if rs.objProvider != nil {
     153            1 :                 err = errors.CombineErrors(err, rs.objProvider.Close())
     154            1 :         }
     155            1 :         return err
     156              : }
     157              : 
     158              : // recoveredVersion describes the latest Version of the LSM recovered by
     159              : // replaying a manifest file.
     160              : type recoveredVersion struct {
     161              :         manifestFileNum    base.DiskFileNum
     162              :         minUnflushedLogNum base.DiskFileNum
     163              :         nextFileNum        base.DiskFileNum
     164              :         logSeqNum          base.SeqNum
     165              :         latest             *latestVersionState
     166              :         metrics            Metrics
     167              :         version            *manifest.Version
     168              : }
     169              : 
     170              : // recoverVersion replays the named manifest file to recover the latest version
     171              : // of the LSM from persisted state.
     172              : func recoverVersion(
     173              :         opts *Options, dirname string, provider objstorage.Provider, manifestFileNum base.DiskFileNum,
     174            1 : ) (*recoveredVersion, error) {
     175            1 :         vs := &recoveredVersion{
     176            1 :                 manifestFileNum: manifestFileNum,
     177            1 :                 nextFileNum:     1,
     178            1 :                 logSeqNum:       base.SeqNumStart,
     179            1 :                 latest: &latestVersionState{
     180            1 :                         l0Organizer:     manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes),
     181            1 :                         virtualBackings: manifest.MakeVirtualBackings(),
     182            1 :                 },
     183            1 :         }
     184            1 :         manifestPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeManifest, vs.manifestFileNum)
     185            1 :         manifestFilename := opts.FS.PathBase(manifestPath)
     186            1 : 
     187            1 :         // Read the versionEdits in the manifest file.
     188            1 :         var bve manifest.BulkVersionEdit
     189            1 :         bve.AllAddedTables = make(map[base.TableNum]*manifest.TableMetadata)
     190            1 :         manifestFile, err := opts.FS.Open(manifestPath)
     191            1 :         if err != nil {
     192            0 :                 return nil, errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
     193            0 :                         errors.Safe(manifestFilename), dirname)
     194            0 :         }
     195            1 :         defer manifestFile.Close()
     196            1 :         rr := record.NewReader(manifestFile, 0 /* logNum */)
     197            1 :         for {
     198            1 :                 r, err := rr.Next()
     199            1 :                 if err == io.EOF || record.IsInvalidRecord(err) {
     200            1 :                         break
     201              :                 }
     202            1 :                 if err != nil {
     203            0 :                         return nil, errors.Wrapf(err, "pebble: error when loading manifest file %q",
     204            0 :                                 errors.Safe(manifestFilename))
     205            0 :                 }
     206            1 :                 var ve manifest.VersionEdit
     207            1 :                 err = ve.Decode(r)
     208            1 :                 if err != nil {
     209            0 :                         // Break instead of returning an error if the record is corrupted
     210            0 :                         // or invalid.
     211            0 :                         if err == io.EOF || record.IsInvalidRecord(err) {
     212            0 :                                 break
     213              :                         }
     214            0 :                         return nil, err
     215              :                 }
     216            1 :                 if ve.ComparerName != "" {
     217            1 :                         if ve.ComparerName != opts.Comparer.Name {
     218            1 :                                 return nil, errors.Errorf("pebble: manifest file %q for DB %q: "+
     219            1 :                                         "comparer name from file %q != comparer name from Options %q",
     220            1 :                                         errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(opts.Comparer.Name))
     221            1 :                         }
     222              :                 }
     223            1 :                 if err := bve.Accumulate(&ve); err != nil {
     224            0 :                         return nil, err
     225            0 :                 }
     226            1 :                 if ve.MinUnflushedLogNum != 0 {
     227            1 :                         vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     228            1 :                 }
     229            1 :                 if ve.NextFileNum != 0 {
     230            1 :                         vs.nextFileNum = base.DiskFileNum(ve.NextFileNum)
     231            1 :                 }
     232            1 :                 if ve.LastSeqNum != 0 {
     233            1 :                         // logSeqNum is the _next_ sequence number that will be assigned,
     234            1 :                         // while LastSeqNum is the last assigned sequence number. Note that
     235            1 :                         // this behaviour mimics that in RocksDB; the first sequence number
     236            1 :                         // assigned is one greater than the one present in the manifest
     237            1 :                         // (assuming no WALs contain higher sequence numbers than the
     238            1 :                         // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
     239            1 :                         // next sequence number that will be assigned.
     240            1 :                         //
     241            1 :                         // If LastSeqNum is less than SeqNumStart, increase it to at least
     242            1 :                         // SeqNumStart to leave ample room for reserved sequence numbers.
     243            1 :                         vs.logSeqNum = max(ve.LastSeqNum+1, base.SeqNumStart)
     244            1 :                 }
     245              :         }
     246              : 
     247              :         // We have already set vs.nextFileNum=1 at the beginning of the function and
     248              :         // could have only updated it to some other non-zero value, so it cannot be
     249              :         // 0 here.
     250            1 :         if vs.minUnflushedLogNum == 0 {
     251            1 :                 if vs.nextFileNum >= 2 {
     252            1 :                         // We either have a freshly created DB, or a DB created by RocksDB
     253            1 :                         // that has not had a single flushed SSTable yet. This is because
     254            1 :                         // RocksDB bumps up nextFileNum in this case without bumping up
     255            1 :                         // minUnflushedLogNum, even if WALs with non-zero file numbers are
     256            1 :                         // present in the directory.
     257            1 :                 } else {
     258            0 :                         return nil, base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
     259            0 :                                 errors.Safe(manifestFilename), dirname)
     260            0 :                 }
     261              :         }
     262            1 :         vs.nextFileNum = max(vs.nextFileNum, vs.minUnflushedLogNum+1)
     263            1 : 
     264            1 :         // Populate the virtual backings for virtual sstables since we have finished
     265            1 :         // version edit accumulation.
     266            1 :         for _, b := range bve.AddedFileBacking {
     267            1 :                 isLocal := objstorage.IsLocalTable(provider, b.DiskFileNum)
     268            1 :                 vs.latest.virtualBackings.AddAndRef(b, isLocal)
     269            1 :         }
     270            1 :         for l, addedLevel := range bve.AddedTables {
     271            1 :                 for _, m := range addedLevel {
     272            1 :                         if m.Virtual {
     273            1 :                                 vs.latest.virtualBackings.AddTable(m, l)
     274            1 :                         }
     275              :                 }
     276              :         }
     277              : 
     278            1 :         if invariants.Enabled {
     279            1 :                 // There should be no deleted tables or backings, since we're starting
     280            1 :                 // from an empty state.
     281            1 :                 for _, deletedLevel := range bve.DeletedTables {
     282            1 :                         if len(deletedLevel) != 0 {
     283            0 :                                 panic("deleted files after manifest replay")
     284              :                         }
     285              :                 }
     286            1 :                 if len(bve.RemovedFileBacking) > 0 {
     287            0 :                         panic("deleted backings after manifest replay")
     288              :                 }
     289              :         }
     290              : 
     291            1 :         emptyVersion := manifest.NewInitialVersion(opts.Comparer)
     292            1 :         newVersion, err := bve.Apply(emptyVersion, opts.Experimental.ReadCompactionRate)
     293            1 :         if err != nil {
     294            0 :                 return nil, err
     295            0 :         }
     296            1 :         vs.latest.l0Organizer.PerformUpdate(vs.latest.l0Organizer.PrepareUpdate(&bve, newVersion), newVersion)
     297            1 :         vs.latest.l0Organizer.InitCompactingFileInfo(nil /* in-progress compactions */)
     298            1 :         vs.latest.blobFiles.Init(&bve, manifest.BlobRewriteHeuristic{
     299            1 :                 CurrentTime: opts.private.timeNow,
     300            1 :                 MinimumAge:  opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
     301            1 :         })
     302            1 :         vs.version = newVersion
     303            1 : 
     304            1 :         for i := range vs.metrics.Levels {
     305            1 :                 l := &vs.metrics.Levels[i]
     306            1 :                 l.TablesCount = int64(newVersion.Levels[i].Len())
     307            1 :                 files := newVersion.Levels[i].Slice()
     308            1 :                 l.TablesSize = int64(files.TableSizeSum())
     309            1 :         }
     310            1 :         for _, l := range newVersion.Levels {
     311            1 :                 for f := range l.All() {
     312            1 :                         if !f.Virtual {
     313            1 :                                 isLocal, localSize := sizeIfLocal(f.TableBacking, provider)
     314            1 :                                 vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
     315            1 :                                 if isLocal {
     316            1 :                                         vs.metrics.Table.Local.LiveCount++
     317            1 :                                 }
     318              :                         }
     319              :                 }
     320              :         }
     321            1 :         for backing := range vs.latest.virtualBackings.All() {
     322            1 :                 isLocal, localSize := sizeIfLocal(backing, provider)
     323            1 :                 vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
     324            1 :                 if isLocal {
     325            1 :                         vs.metrics.Table.Local.LiveCount++
     326            1 :                 }
     327              :         }
     328            1 :         return vs, nil
     329              : }
        

Generated by: LCOV version 2.0-1