LCOV - code coverage report
Current view: top level - pebble - recovery.go (source / functions) Coverage Total Hit
Test: 2025-11-23 08:19Z d7ce913e - meta test only.lcov Lines: 74.4 % 469 349
Test Date: 2025-11-23 08:20:36 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "context"
      10              :         "encoding/binary"
      11              :         "io"
      12              :         "slices"
      13              : 
      14              :         "github.com/cockroachdb/errors"
      15              :         "github.com/cockroachdb/pebble/batchrepr"
      16              :         "github.com/cockroachdb/pebble/internal/arenaskl"
      17              :         "github.com/cockroachdb/pebble/internal/base"
      18              :         "github.com/cockroachdb/pebble/internal/invariants"
      19              :         "github.com/cockroachdb/pebble/internal/keyspan"
      20              :         "github.com/cockroachdb/pebble/internal/manifest"
      21              :         "github.com/cockroachdb/pebble/objstorage"
      22              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      23              :         "github.com/cockroachdb/pebble/record"
      24              :         "github.com/cockroachdb/pebble/vfs"
      25              :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      26              :         "github.com/cockroachdb/pebble/wal"
      27              : )
      28              : 
      29              : // recoverState reads the named database directory and recovers the set of files
      30              : // encoding the database state at the moment the previous process exited.
      31              : // recoverState is read only and does not mutate the on-disk state.
      32            1 : func recoverState(opts *Options, dirname string) (s *recoveredState, err error) {
      33            1 :         rs := &recoveredState{
      34            1 :                 dirname: dirname,
      35            1 :                 fs:      opts.FS,
      36            1 :         }
      37            1 :         if err := rs.init(opts, dirname); err != nil {
      38            0 :                 return nil, errors.CombineErrors(err, rs.Close())
      39            0 :         }
      40            1 :         return rs, nil
      41              : }
      42              : 
      43            1 : func (rs *recoveredState) init(opts *Options, dirname string) error {
      44            1 :         dirs, err := prepareOpenAndLockDirs(dirname, opts)
      45            1 :         if err != nil {
      46            0 :                 err = errors.Wrapf(err, "error opening database at %q", dirname)
      47            0 :                 err = errors.CombineErrors(err, dirs.Close())
      48            0 :                 return err
      49            0 :         }
      50            1 :         rs.dirs = dirs
      51            1 : 
      52            1 :         // List the directory contents. This also happens to include WAL log files,
      53            1 :         // if they are in the same dir.
      54            1 :         if rs.ls, err = opts.FS.List(dirname); err != nil {
      55            0 :                 return errors.Wrapf(err, "pebble: database %q", dirname)
      56            0 :         }
      57              :         // Find the currently format major version and active manifest.
      58            1 :         rs.fmv, rs.fmvMarker, err = lookupFormatMajorVersion(opts.FS, dirname, rs.ls)
      59            1 :         if err != nil {
      60            0 :                 return errors.Wrapf(err, "pebble: database %q", dirname)
      61            0 :         }
      62              : 
      63              :         // Open the object storage provider.
      64            1 :         providerSettings := opts.MakeObjStorageProviderSettings(dirname)
      65            1 :         providerSettings.Local.FSDirInitialListing = rs.ls
      66            1 :         rs.objProvider, err = objstorageprovider.Open(providerSettings)
      67            1 :         if err != nil {
      68            0 :                 return errors.Wrapf(err, "pebble: database %q", dirname)
      69            0 :         }
      70              : 
      71              :         // Determine which manifest is current, and if one exists, replay it to
      72              :         // recover the current Version of the LSM.
      73            1 :         var manifestExists bool
      74            1 :         rs.manifestMarker, rs.manifestFileNum, manifestExists, err = findCurrentManifest(opts.FS, dirname, rs.ls)
      75            1 :         if err != nil {
      76            0 :                 return errors.Wrapf(err, "pebble: database %q", dirname)
      77            0 :         }
      78            1 :         if manifestExists {
      79            1 :                 recoveredVersion, err := recoverVersion(opts, dirname, rs.objProvider, rs.manifestFileNum)
      80            1 :                 if err != nil {
      81            0 :                         return err
      82            0 :                 }
      83            1 :                 if !opts.DisableConsistencyCheck {
      84            1 :                         if err := checkConsistency(recoveredVersion.version, rs.objProvider); err != nil {
      85            0 :                                 return err
      86            0 :                         }
      87              :                 }
      88            1 :                 rs.recoveredVersion = recoveredVersion
      89              :         }
      90              : 
      91              :         // Identify the maximal file number in the directory. We do not want to
      92              :         // reuse any existing file numbers even if they are obsolete file numbers to
      93              :         // avoid modifying an ingested sstable's original external file.
      94              :         //
      95              :         // We also identify the most recent OPTIONS file, so we can validate our
      96              :         // configured Options against the previous options, and we collect any
      97              :         // orphaned temporary files that should be removed.
      98            1 :         var previousOptionsFileNum base.DiskFileNum
      99            1 :         for _, filename := range rs.ls {
     100            1 :                 ft, fn, ok := base.ParseFilename(opts.FS, filename)
     101            1 :                 if !ok {
     102            1 :                         continue
     103              :                 }
     104            1 :                 rs.maxFilenumUsed = max(rs.maxFilenumUsed, fn)
     105            1 :                 switch ft {
     106            0 :                 case base.FileTypeLog:
     107              :                         // Ignore.
     108            1 :                 case base.FileTypeOptions:
     109            1 :                         if previousOptionsFileNum < fn {
     110            1 :                                 previousOptionsFileNum = fn
     111            1 :                                 rs.previousOptionsFilename = filename
     112            1 :                         }
     113            0 :                 case base.FileTypeTemp, base.FileTypeOldTemp:
     114            0 :                         rs.obsoleteTempFilenames = append(rs.obsoleteTempFilenames, filename)
     115              :                 }
     116              :         }
     117              : 
     118              :         // Validate the most-recent OPTIONS file, if there is one.
     119            1 :         if rs.previousOptionsFilename != "" {
     120            1 :                 path := opts.FS.PathJoin(dirname, rs.previousOptionsFilename)
     121            1 :                 previousOptions, err := readOptionsFile(opts, path)
     122            1 :                 if err != nil {
     123            0 :                         return err
     124            0 :                 }
     125            1 :                 if err := opts.CheckCompatibility(dirname, previousOptions); err != nil {
     126            0 :                         return err
     127            0 :                 }
     128              :         }
     129              : 
     130              :         // Ratchet rs.maxFilenumUsed ahead of all known objects in the objProvider.
     131              :         // This avoids FileNum collisions with obsolete sstables.
     132            1 :         objects := rs.objProvider.List()
     133            1 :         for _, obj := range objects {
     134            1 :                 rs.maxFilenumUsed = max(rs.maxFilenumUsed, obj.DiskFileNum)
     135            1 :         }
     136              : 
     137              :         // Find all the WAL files across the various WAL directories.
     138            1 :         wals, err := wal.Scan(rs.dirs.WALDirs()...)
     139            1 :         if err != nil {
     140            0 :                 return err
     141            0 :         }
     142            1 :         for _, w := range wals {
     143            1 :                 // Don't reuse any obsolete file numbers to avoid modifying an ingested
     144            1 :                 // sstable's original external file.
     145            1 :                 rs.maxFilenumUsed = max(rs.maxFilenumUsed, base.DiskFileNum(w.Num))
     146            1 :                 if rs.recoveredVersion == nil || base.DiskFileNum(w.Num) >= rs.recoveredVersion.minUnflushedLogNum {
     147            1 :                         rs.walsReplay = append(rs.walsReplay, w)
     148            1 :                 } else {
     149            1 :                         rs.walsObsolete = append(rs.walsObsolete, w)
     150            1 :                 }
     151              :         }
     152            1 :         return nil
     153              : }
     154              : 
     155              : // recoveredState encapsulates state recovered from reading the database
     156              : // directory.
     157              : type recoveredState struct {
     158              :         dirname                 string
     159              :         dirs                    *resolvedDirs
     160              :         fmv                     FormatMajorVersion
     161              :         fmvMarker               *atomicfs.Marker
     162              :         fs                      vfs.FS
     163              :         ls                      []string
     164              :         manifestMarker          *atomicfs.Marker
     165              :         manifestFileNum         base.DiskFileNum
     166              :         maxFilenumUsed          base.DiskFileNum
     167              :         obsoleteTempFilenames   []string
     168              :         objProvider             objstorage.Provider
     169              :         previousOptionsFilename string
     170              :         recoveredVersion        *recoveredVersion
     171              :         walsObsolete            wal.Logs
     172              :         walsReplay              wal.Logs
     173              : }
     174              : 
     175              : // RemoveObsolete removes obsolete files uncovered during recovery.
     176            1 : func (rs *recoveredState) RemoveObsolete(opts *Options) error {
     177            1 :         var err error
     178            1 :         // Atomic markers may leave behind obsolete files if there's a crash
     179            1 :         // mid-update.
     180            1 :         if rs.fmvMarker != nil {
     181            1 :                 err = errors.CombineErrors(err, rs.fmvMarker.RemoveObsolete())
     182            1 :         }
     183            1 :         if rs.manifestMarker != nil {
     184            1 :                 err = errors.CombineErrors(err, rs.manifestMarker.RemoveObsolete())
     185            1 :         }
     186              :         // Some codepaths write to a temporary file and then rename it to its final
     187              :         // location when complete.  A temp file is leftover if a process exits
     188              :         // before the rename. Remove any that were found.
     189            1 :         for _, filename := range rs.obsoleteTempFilenames {
     190            0 :                 err = errors.CombineErrors(err, rs.fs.Remove(rs.fs.PathJoin(rs.dirname, filename)))
     191            0 :         }
     192              :         // Remove any WAL files that are already obsolete. Pebble keeps some old WAL
     193              :         // files around for recycling.
     194            1 :         for _, w := range rs.walsObsolete {
     195            1 :                 for i := range w.NumSegments() {
     196            1 :                         fs, path := w.SegmentLocation(i)
     197            1 :                         rmErr := fs.Remove(path)
     198            1 :                         opts.EventListener.WALDeleted(WALDeleteInfo{
     199            1 :                                 JobID:   0,
     200            1 :                                 Path:    path,
     201            1 :                                 FileNum: base.DiskFileNum(w.Num),
     202            1 :                                 Err:     rmErr,
     203            1 :                         })
     204            1 :                 }
     205              :         }
     206            1 :         return err
     207              : }
     208              : 
     209              : // Close closes resources held by the RecoveredState, including open file
     210              : // descriptors.
     211            0 : func (rs *recoveredState) Close() error {
     212            0 :         var err error
     213            0 :         if rs.fmvMarker != nil {
     214            0 :                 err = errors.CombineErrors(err, rs.fmvMarker.Close())
     215            0 :         }
     216            0 :         if rs.manifestMarker != nil {
     217            0 :                 err = errors.CombineErrors(err, rs.manifestMarker.Close())
     218            0 :         }
     219            0 :         if rs.objProvider != nil {
     220            0 :                 err = errors.CombineErrors(err, rs.objProvider.Close())
     221            0 :         }
     222            0 :         if rs.dirs != nil {
     223            0 :                 err = errors.CombineErrors(err, rs.dirs.Close())
     224            0 :         }
     225            0 :         return err
     226              : }
     227              : 
     228              : // recoveredVersion describes the latest Version of the LSM recovered by
     229              : // replaying a manifest file.
     230              : type recoveredVersion struct {
     231              :         manifestFileNum    base.DiskFileNum
     232              :         minUnflushedLogNum base.DiskFileNum
     233              :         nextFileNum        base.DiskFileNum
     234              :         logSeqNum          base.SeqNum
     235              :         latest             *latestVersionState
     236              :         version            *manifest.Version
     237              : }
     238              : 
     239              : // recoverVersion replays the named manifest file to recover the latest version
     240              : // of the LSM from persisted state.
     241              : func recoverVersion(
     242              :         opts *Options, dirname string, provider objstorage.Provider, manifestFileNum base.DiskFileNum,
     243            1 : ) (*recoveredVersion, error) {
     244            1 :         rv := &recoveredVersion{
     245            1 :                 manifestFileNum: manifestFileNum,
     246            1 :                 nextFileNum:     1,
     247            1 :                 logSeqNum:       base.SeqNumStart,
     248            1 :                 latest: &latestVersionState{
     249            1 :                         l0Organizer:     manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes),
     250            1 :                         virtualBackings: manifest.MakeVirtualBackings(),
     251            1 :                 },
     252            1 :         }
     253            1 :         manifestPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeManifest, rv.manifestFileNum)
     254            1 :         manifestFilename := opts.FS.PathBase(manifestPath)
     255            1 : 
     256            1 :         // Read the versionEdits in the manifest file.
     257            1 :         var bve manifest.BulkVersionEdit
     258            1 :         bve.AllAddedTables = make(map[base.TableNum]*manifest.TableMetadata)
     259            1 :         manifestFile, err := opts.FS.Open(manifestPath)
     260            1 :         if err != nil {
     261            0 :                 return nil, errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
     262            0 :                         errors.Safe(manifestFilename), dirname)
     263            0 :         }
     264            1 :         defer manifestFile.Close()
     265            1 :         rr := record.NewReader(manifestFile, 0 /* logNum */)
     266            1 :         for {
     267            1 :                 r, err := rr.Next()
     268            1 :                 if err == io.EOF || record.IsInvalidRecord(err) {
     269            1 :                         break
     270              :                 }
     271            1 :                 if err != nil {
     272            0 :                         return nil, errors.Wrapf(err, "pebble: error when loading manifest file %q",
     273            0 :                                 errors.Safe(manifestFilename))
     274            0 :                 }
     275            1 :                 var ve manifest.VersionEdit
     276            1 :                 err = ve.Decode(r)
     277            1 :                 if err != nil {
     278            0 :                         // Break instead of returning an error if the record is corrupted
     279            0 :                         // or invalid.
     280            0 :                         if err == io.EOF || record.IsInvalidRecord(err) {
     281            0 :                                 break
     282              :                         }
     283            0 :                         return nil, err
     284              :                 }
     285            1 :                 if ve.ComparerName != "" {
     286            1 :                         if ve.ComparerName != opts.Comparer.Name {
     287            0 :                                 return nil, errors.Errorf("pebble: manifest file %q for DB %q: "+
     288            0 :                                         "comparer name from file %q != comparer name from Options %q",
     289            0 :                                         errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(opts.Comparer.Name))
     290            0 :                         }
     291              :                 }
     292            1 :                 if err := bve.Accumulate(&ve); err != nil {
     293            0 :                         return nil, err
     294            0 :                 }
     295            1 :                 if ve.MinUnflushedLogNum != 0 {
     296            1 :                         rv.minUnflushedLogNum = ve.MinUnflushedLogNum
     297            1 :                 }
     298            1 :                 if ve.NextFileNum != 0 {
     299            1 :                         rv.nextFileNum = base.DiskFileNum(ve.NextFileNum)
     300            1 :                 }
     301            1 :                 if ve.LastSeqNum != 0 {
     302            1 :                         // logSeqNum is the _next_ sequence number that will be assigned,
     303            1 :                         // while LastSeqNum is the last assigned sequence number. Note that
     304            1 :                         // this behaviour mimics that in RocksDB; the first sequence number
     305            1 :                         // assigned is one greater than the one present in the manifest
     306            1 :                         // (assuming no WALs contain higher sequence numbers than the
     307            1 :                         // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
     308            1 :                         // next sequence number that will be assigned.
     309            1 :                         //
     310            1 :                         // If LastSeqNum is less than SeqNumStart, increase it to at least
     311            1 :                         // SeqNumStart to leave ample room for reserved sequence numbers.
     312            1 :                         rv.logSeqNum = max(ve.LastSeqNum+1, base.SeqNumStart)
     313            1 :                 }
     314              :         }
     315              : 
     316              :         // We have already set vs.nextFileNum=1 at the beginning of the function and
     317              :         // could have only updated it to some other non-zero value, so it cannot be
     318              :         // 0 here.
     319            1 :         if rv.minUnflushedLogNum == 0 {
     320            1 :                 if rv.nextFileNum >= 2 {
     321            1 :                         // We either have a freshly created DB, or a DB created by RocksDB
     322            1 :                         // that has not had a single flushed SSTable yet. This is because
     323            1 :                         // RocksDB bumps up nextFileNum in this case without bumping up
     324            1 :                         // minUnflushedLogNum, even if WALs with non-zero file numbers are
     325            1 :                         // present in the directory.
     326            1 :                 } else {
     327            0 :                         return nil, base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
     328            0 :                                 errors.Safe(manifestFilename), dirname)
     329            0 :                 }
     330              :         }
     331            1 :         rv.nextFileNum = max(rv.nextFileNum, rv.minUnflushedLogNum+1)
     332            1 : 
     333            1 :         // Populate the virtual backings for virtual sstables since we have finished
     334            1 :         // version edit accumulation.
     335            1 :         for _, b := range bve.AddedFileBacking {
     336            1 :                 placement := objstorage.Placement(provider, base.FileTypeTable, b.DiskFileNum)
     337            1 :                 rv.latest.virtualBackings.AddAndRef(b, placement)
     338            1 :         }
     339            1 :         for l, addedLevel := range bve.AddedTables {
     340            1 :                 for _, m := range addedLevel {
     341            1 :                         if m.Virtual {
     342            1 :                                 rv.latest.virtualBackings.AddTable(m, l)
     343            1 :                         }
     344              :                 }
     345              :         }
     346              : 
     347            1 :         if invariants.Enabled {
     348            1 :                 // There should be no deleted tables or backings, since we're starting
     349            1 :                 // from an empty state.
     350            1 :                 for _, deletedLevel := range bve.DeletedTables {
     351            1 :                         if len(deletedLevel) != 0 {
     352            0 :                                 panic("deleted files after manifest replay")
     353              :                         }
     354              :                 }
     355            1 :                 if len(bve.RemovedFileBacking) > 0 {
     356            0 :                         panic("deleted backings after manifest replay")
     357              :                 }
     358              :         }
     359              : 
     360            1 :         emptyVersion := manifest.NewInitialVersion(opts.Comparer)
     361            1 :         newVersion, err := bve.Apply(emptyVersion, opts.Experimental.ReadCompactionRate)
     362            1 :         if err != nil {
     363            0 :                 return nil, err
     364            0 :         }
     365            1 :         rv.latest.l0Organizer.PerformUpdate(rv.latest.l0Organizer.PrepareUpdate(&bve, newVersion), newVersion)
     366            1 :         rv.latest.l0Organizer.InitCompactingFileInfo(nil /* in-progress compactions */)
     367            1 :         rv.latest.blobFiles.Init(&bve, manifest.BlobRewriteHeuristic{
     368            1 :                 CurrentTime: opts.private.timeNow,
     369            1 :                 MinimumAge:  opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
     370            1 :         })
     371            1 :         rv.version = newVersion
     372            1 :         return rv, nil
     373              : }
     374              : 
     375              : // replayWAL replays the edits in the specified WAL. If the DB is in read
     376              : // only mode, then the WALs are replayed into memtables and not flushed. If
     377              : // the DB is not in read only mode, then the contents of the WAL are
     378              : // guaranteed to be flushed when a flush is scheduled after this method is run.
     379              : // Note that this flushing is very important for guaranteeing durability:
     380              : // the application may have had a number of pending
     381              : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
     382              : // happened but the corresponding data may now be readable from the WAL (while
     383              : // sitting in write-back caches in the kernel or the storage device). By
     384              : // reading the WAL (including the non-fsynced data) and then flushing all
     385              : // these changes (flush does fsyncs), we are able to guarantee that the
     386              : // initial state of the DB is durable.
     387              : //
     388              : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
     389              : // WALs into the flushable queue. Flushing of the queue is expected to be handled
     390              : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
     391              : //
     392              : // d.mu must be held when calling this, but the mutex may be dropped and
     393              : // re-acquired during the course of this method.
     394              : func (d *DB) replayWAL(
     395              :         jobID JobID, ll wal.LogicalLog, strictWALTail bool,
     396            1 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
     397            1 :         rr := ll.OpenForRead()
     398            1 :         defer func() { _ = rr.Close() }()
     399            1 :         var (
     400            1 :                 b               Batch
     401            1 :                 buf             bytes.Buffer
     402            1 :                 mem             *memTable
     403            1 :                 entry           *flushableEntry
     404            1 :                 offset          wal.Offset
     405            1 :                 lastFlushOffset int64
     406            1 :                 keysReplayed    int64 // number of keys replayed
     407            1 :                 batchesReplayed int64 // number of batches replayed
     408            1 :         )
     409            1 : 
     410            1 :         // TODO(jackson): This function is interspersed with panics, in addition to
     411            1 :         // corruption error propagation. Audit them to ensure we're truly only
     412            1 :         // panicking where the error points to Pebble bug and not user or
     413            1 :         // hardware-induced corruption.
     414            1 : 
     415            1 :         // "Flushes" (ie. closes off) the current memtable, if not nil.
     416            1 :         flushMem := func() {
     417            1 :                 if mem == nil {
     418            1 :                         return
     419            1 :                 }
     420            1 :                 mem.writerUnref()
     421            1 :                 if d.mu.mem.mutable == mem {
     422            1 :                         d.mu.mem.mutable = nil
     423            1 :                 }
     424            1 :                 entry.flushForced = !d.opts.ReadOnly
     425            1 :                 var logSize uint64
     426            1 :                 mergedOffset := offset.Physical + offset.PreviousFilesBytes
     427            1 :                 if mergedOffset >= lastFlushOffset {
     428            1 :                         logSize = uint64(mergedOffset - lastFlushOffset)
     429            1 :                 }
     430              :                 // Else, this was the initial memtable in the read-only case which must have
     431              :                 // been empty, but we need to flush it since we don't want to add to it later.
     432            1 :                 lastFlushOffset = mergedOffset
     433            1 :                 entry.logSize = logSize
     434            1 :                 mem, entry = nil, nil
     435              :         }
     436              : 
     437            1 :         mem = d.mu.mem.mutable
     438            1 :         if mem != nil {
     439            1 :                 entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
     440            1 :                 if !d.opts.ReadOnly {
     441            1 :                         flushMem()
     442            1 :                 }
     443              :         }
     444              : 
     445              :         // Creates a new memtable if there is no current memtable.
     446            1 :         ensureMem := func(seqNum base.SeqNum) {
     447            1 :                 if mem != nil {
     448            1 :                         return
     449            1 :                 }
     450            1 :                 mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
     451            1 :                 d.mu.mem.mutable = mem
     452            1 :                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     453              :         }
     454              : 
     455            1 :         defer func() {
     456            1 :                 if err != nil {
     457            0 :                         err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
     458            0 :                 }
     459              :         }()
     460              : 
     461            1 :         for {
     462            1 :                 var r io.Reader
     463            1 :                 var err error
     464            1 :                 r, offset, err = rr.NextRecord()
     465            1 :                 if err == nil {
     466            1 :                         _, err = io.Copy(&buf, r)
     467            1 :                 }
     468            1 :                 if err != nil {
     469            1 :                         // It is common to encounter a zeroed or invalid chunk due to WAL
     470            1 :                         // preallocation and WAL recycling. However zeroed or invalid chunks
     471            1 :                         // can also be a consequence of corruption / disk rot. When the log
     472            1 :                         // reader encounters one of these cases, it attempts to disambiguate
     473            1 :                         // by reading ahead looking for a future record. If a future chunk
     474            1 :                         // indicates the chunk at the original offset should've been valid, it
     475            1 :                         // surfaces record.ErrInvalidChunk or record.ErrZeroedChunk. These
     476            1 :                         // errors are always indicative of corruption and data loss.
     477            1 :                         //
     478            1 :                         // Otherwise, the reader surfaces record.ErrUnexpectedEOF indicating
     479            1 :                         // that the WAL terminated uncleanly and ambiguously. If the WAL is
     480            1 :                         // the most recent logical WAL, the caller passes in
     481            1 :                         // (strictWALTail=false), indicating we should tolerate the unclean
     482            1 :                         // ending. If the WAL is an older WAL, the caller passes in
     483            1 :                         // (strictWALTail=true), indicating that the WAL should have been
     484            1 :                         // closed cleanly, and we should interpret the
     485            1 :                         // `record.ErrUnexpectedEOF` as corruption and stop recovery.
     486            1 :                         if errors.Is(err, io.EOF) {
     487            1 :                                 break
     488            0 :                         } else if errors.Is(err, record.ErrUnexpectedEOF) && !strictWALTail {
     489            0 :                                 break
     490            0 :                         } else if (errors.Is(err, record.ErrUnexpectedEOF) && strictWALTail) ||
     491            0 :                                 errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) {
     492            0 :                                 // If a read-ahead returns record.ErrInvalidChunk or
     493            0 :                                 // record.ErrZeroedChunk, then there's definitively corruption.
     494            0 :                                 //
     495            0 :                                 // If strictWALTail=true, then record.ErrUnexpectedEOF should
     496            0 :                                 // also be considered corruption because the strictWALTail
     497            0 :                                 // indicates we expect a clean end to the WAL.
     498            0 :                                 //
     499            0 :                                 // Other I/O related errors should not be marked with corruption
     500            0 :                                 // and simply returned.
     501            0 :                                 err = errors.Mark(err, ErrCorruption)
     502            0 :                         }
     503              : 
     504            0 :                         return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
     505              :                 }
     506              : 
     507            1 :                 if buf.Len() < batchrepr.HeaderLen {
     508            0 :                         return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
     509            0 :                                 errors.Safe(base.DiskFileNum(ll.Num)), offset)
     510            0 :                 }
     511              : 
     512            1 :                 if d.opts.ErrorIfNotPristine {
     513            0 :                         return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
     514            0 :                 }
     515              : 
     516              :                 // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
     517              :                 // which is used below.
     518            1 :                 b = Batch{}
     519            1 :                 b.db = d
     520            1 :                 if err := b.SetRepr(buf.Bytes()); err != nil {
     521            0 :                         return nil, 0, err
     522            0 :                 }
     523            1 :                 seqNum := b.SeqNum()
     524            1 :                 maxSeqNum = seqNum + base.SeqNum(b.Count())
     525            1 :                 keysReplayed += int64(b.Count())
     526            1 :                 batchesReplayed++
     527            1 :                 {
     528            1 :                         br := b.Reader()
     529            1 :                         if kind, _, _, ok, err := br.Next(); err != nil {
     530            0 :                                 return nil, 0, err
     531            1 :                         } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
     532            1 :                                 // We're in the flushable ingests (+ possibly excises) case.
     533            1 :                                 //
     534            1 :                                 // Ingests require an up-to-date view of the LSM to determine the target
     535            1 :                                 // level of ingested sstables, and to accurately compute excises. Instead of
     536            1 :                                 // doing an ingest in this function, we just enqueue a flushable ingest
     537            1 :                                 // in the flushables queue and run a regular flush.
     538            1 :                                 flushMem()
     539            1 :                                 // mem is nil here.
     540            1 :                                 entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
     541            1 :                                 if err != nil {
     542            0 :                                         return nil, 0, err
     543            0 :                                 }
     544            1 :                                 fi := entry.flushable.(*ingestedFlushable)
     545            1 :                                 flushableIngests = append(flushableIngests, fi)
     546            1 :                                 d.mu.mem.queue = append(d.mu.mem.queue, entry)
     547            1 :                                 // A flushable ingest is always followed by a WAL rotation.
     548            1 :                                 break
     549              :                         }
     550              :                 }
     551              : 
     552            1 :                 if b.memTableSize >= uint64(d.largeBatchThreshold) {
     553            1 :                         flushMem()
     554            1 :                         // Make a copy of the data slice since it is currently owned by buf and will
     555            1 :                         // be reused in the next iteration.
     556            1 :                         b.data = slices.Clone(b.data)
     557            1 :                         b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
     558            1 :                         if err != nil {
     559            0 :                                 return nil, 0, err
     560            0 :                         }
     561            1 :                         entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
     562            1 :                         // Disable memory accounting by adding a reader ref that will never be
     563            1 :                         // removed.
     564            1 :                         entry.readerRefs.Add(1)
     565            1 :                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
     566            1 :                 } else {
     567            1 :                         ensureMem(seqNum)
     568            1 :                         if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
     569            0 :                                 return nil, 0, err
     570            0 :                         }
     571              :                         // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
     572              :                         // batch may not initially fit, but will eventually fit (since it is smaller than
     573              :                         // largeBatchThreshold).
     574            1 :                         for err == arenaskl.ErrArenaFull {
     575            1 :                                 flushMem()
     576            1 :                                 ensureMem(seqNum)
     577            1 :                                 err = mem.prepare(&b)
     578            1 :                                 if err != nil && err != arenaskl.ErrArenaFull {
     579            0 :                                         return nil, 0, err
     580            0 :                                 }
     581              :                         }
     582            1 :                         if err = mem.apply(&b, seqNum); err != nil {
     583            0 :                                 return nil, 0, err
     584            0 :                         }
     585            1 :                         mem.writerUnref()
     586              :                 }
     587            1 :                 buf.Reset()
     588              :         }
     589              : 
     590            1 :         d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
     591            1 :                 jobID, ll.String(), offset, keysReplayed, batchesReplayed)
     592            1 :         if !d.opts.ReadOnly {
     593            1 :                 flushMem()
     594            1 :         }
     595              : 
     596              :         // mem is nil here, if !ReadOnly.
     597            1 :         return flushableIngests, maxSeqNum, err
     598              : }
     599              : 
     600              : func (d *DB) replayIngestedFlushable(
     601              :         b *Batch, logNum base.DiskFileNum,
     602            1 : ) (entry *flushableEntry, err error) {
     603            1 :         br := b.Reader()
     604            1 :         seqNum := b.SeqNum()
     605            1 : 
     606            1 :         fileNums := make([]base.DiskFileNum, 0, b.Count())
     607            1 :         var exciseSpan KeyRange
     608            1 :         addFileNum := func(encodedFileNum []byte) {
     609            1 :                 fileNum, n := binary.Uvarint(encodedFileNum)
     610            1 :                 if n <= 0 {
     611            0 :                         panic("pebble: ingest sstable file num is invalid")
     612              :                 }
     613            1 :                 fileNums = append(fileNums, base.DiskFileNum(fileNum))
     614              :         }
     615              : 
     616            1 :         for i := 0; i < int(b.Count()); i++ {
     617            1 :                 kind, key, val, ok, err := br.Next()
     618            1 :                 if err != nil {
     619            0 :                         return nil, err
     620            0 :                 }
     621            1 :                 if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
     622            0 :                         panic("pebble: invalid batch key kind")
     623              :                 }
     624            1 :                 if !ok {
     625            0 :                         panic("pebble: invalid batch count")
     626              :                 }
     627            1 :                 if kind == base.InternalKeyKindExcise {
     628            1 :                         if exciseSpan.Valid() {
     629            0 :                                 panic("pebble: multiple excise spans in a single batch")
     630              :                         }
     631            1 :                         exciseSpan.Start = slices.Clone(key)
     632            1 :                         exciseSpan.End = slices.Clone(val)
     633            1 :                         continue
     634              :                 }
     635            1 :                 addFileNum(key)
     636              :         }
     637              : 
     638            1 :         if _, _, _, ok, err := br.Next(); err != nil {
     639            0 :                 return nil, err
     640            1 :         } else if ok {
     641            0 :                 panic("pebble: invalid number of entries in batch")
     642              :         }
     643              : 
     644            1 :         meta := make([]*manifest.TableMetadata, len(fileNums))
     645            1 :         var lastRangeKey keyspan.Span
     646            1 :         for i, n := range fileNums {
     647            1 :                 readable, err := d.objProvider.OpenForReading(context.TODO(), base.FileTypeTable, n,
     648            1 :                         objstorage.OpenOptions{MustExist: true})
     649            1 :                 if err != nil {
     650            0 :                         return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
     651            0 :                 }
     652              :                 // NB: ingestLoad1 will close readable.
     653            1 :                 meta[i], lastRangeKey, _, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
     654            1 :                         readable, d.cacheHandle, &d.compressionCounters, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
     655            1 :                 if err != nil {
     656            0 :                         return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
     657            0 :                 }
     658              :         }
     659            1 :         if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
     660            0 :                 return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
     661            0 :                         d.opts.Comparer.FormatKey(lastRangeKey.End))
     662            0 :         }
     663              : 
     664            1 :         numFiles := len(meta)
     665            1 :         if exciseSpan.Valid() {
     666            1 :                 numFiles++
     667            1 :         }
     668            1 :         if uint32(numFiles) != b.Count() {
     669            0 :                 panic("pebble: couldn't load all files in WAL entry")
     670              :         }
     671              : 
     672            1 :         return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
     673              : }
        

Generated by: LCOV version 2.0-1