LCOV - code coverage report
Current view: top level - pebble - checkpoint.go (source / functions) Hit Total Coverage
Test: 2024-12-11 08:18Z ab9741a3 - tests + meta.lcov Lines: 264 331 79.8 %
Date: 2024-12-11 08:19:09 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "io"
      10             :         "os"
      11             : 
      12             :         "github.com/cockroachdb/errors"
      13             :         "github.com/cockroachdb/errors/oserror"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/record"
      16             :         "github.com/cockroachdb/pebble/vfs"
      17             :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      18             : )
      19             : 
      20             : // checkpointOptions hold the optional parameters to construct checkpoint
      21             : // snapshots.
      22             : type checkpointOptions struct {
      23             :         // flushWAL set to true will force a flush and sync of the WAL prior to
      24             :         // checkpointing.
      25             :         flushWAL bool
      26             : 
      27             :         // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
      28             :         restrictToSpans []CheckpointSpan
      29             : }
      30             : 
      31             : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
      32             : type CheckpointOption func(*checkpointOptions)
      33             : 
      34             : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
      35             : // checkpoint. This guarantees that any writes committed before calling
      36             : // DB.Checkpoint will be part of that checkpoint.
      37             : //
      38             : // Note that this setting can only be useful in cases when some writes are
      39             : // performed with Sync = false. Otherwise, the guarantee will already be met.
      40             : //
      41             : // Passing this option is functionally equivalent to calling
      42             : // DB.LogData(nil, Sync) right before DB.Checkpoint.
      43           1 : func WithFlushedWAL() CheckpointOption {
      44           1 :         return func(opt *checkpointOptions) {
      45           1 :                 opt.flushWAL = true
      46           1 :         }
      47             : }
      48             : 
      49             : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
      50             : // that don't overlap with any of these spans are excluded from the checkpoint.
      51             : //
      52             : // Note that the checkpoint can still surface keys outside of these spans (from
      53             : // the WAL and from SSTs that partially overlap with these spans). Moreover,
      54             : // these surface keys aren't necessarily "valid" in that they could have been
      55             : // modified but the SST containing the modification is excluded.
      56           2 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
      57           2 :         return func(opt *checkpointOptions) {
      58           2 :                 opt.restrictToSpans = spans
      59           2 :         }
      60             : }
      61             : 
      62             : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
      63             : // End) of interest for a checkpoint.
      64             : type CheckpointSpan struct {
      65             :         Start []byte
      66             :         End   []byte
      67             : }
      68             : 
      69             : // excludeFromCheckpoint returns true if an SST file should be excluded from the
      70             : // checkpoint because it does not overlap with the spans of interest
      71             : // (opt.restrictToSpans).
      72           2 : func excludeFromCheckpoint(f *fileMetadata, opt *checkpointOptions, cmp Compare) bool {
      73           2 :         if len(opt.restrictToSpans) == 0 {
      74           2 :                 // Option not set; don't exclude anything.
      75           2 :                 return false
      76           2 :         }
      77           2 :         for _, s := range opt.restrictToSpans {
      78           2 :                 spanBounds := base.UserKeyBoundsEndExclusive(s.Start, s.End)
      79           2 :                 if f.Overlaps(cmp, &spanBounds) {
      80           2 :                         return false
      81           2 :                 }
      82             :         }
      83             :         // None of the restrictToSpans overlapped; we can exclude this file.
      84           2 :         return true
      85             : }
      86             : 
      87             : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
      88             : // Those missing parents, as well as the closest existing ancestor, are synced.
      89             : // Returns a handle to the directory created at destDir.
      90           2 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
      91           2 :         // Collect paths for all directories between destDir (excluded) and its
      92           2 :         // closest existing ancestor (included).
      93           2 :         var parentPaths []string
      94           2 :         for parentPath := fs.PathDir(destDir); ; parentPath = fs.PathDir(parentPath) {
      95           2 :                 parentPaths = append(parentPaths, parentPath)
      96           2 :                 if fs.PathDir(parentPath) == parentPath {
      97           2 :                         break
      98             :                 }
      99           2 :                 _, err := fs.Stat(parentPath)
     100           2 :                 if err == nil {
     101           2 :                         // Exit loop at the closest existing ancestor.
     102           2 :                         break
     103             :                 }
     104           2 :                 if !oserror.IsNotExist(err) {
     105           0 :                         return nil, err
     106           0 :                 }
     107             :         }
     108             :         // Create destDir and any of its missing parents.
     109           2 :         if err := fs.MkdirAll(destDir, 0755); err != nil {
     110           1 :                 return nil, err
     111           1 :         }
     112             :         // Sync all the parent directories up to the closest existing ancestor,
     113             :         // included.
     114           2 :         for _, parentPath := range parentPaths {
     115           2 :                 parentDir, err := fs.OpenDir(parentPath)
     116           2 :                 if err != nil {
     117           1 :                         return nil, err
     118           1 :                 }
     119           2 :                 err = parentDir.Sync()
     120           2 :                 if err != nil {
     121           1 :                         _ = parentDir.Close()
     122           1 :                         return nil, err
     123           1 :                 }
     124           2 :                 err = parentDir.Close()
     125           2 :                 if err != nil {
     126           0 :                         return nil, err
     127           0 :                 }
     128             :         }
     129           2 :         return fs.OpenDir(destDir)
     130             : }
     131             : 
     132             : // Checkpoint constructs a snapshot of the DB instance in the specified
     133             : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
     134             : // snapshot. Hard links will be used when possible. Beware of the significant
     135             : // space overhead for a checkpoint if hard links are disabled. Also beware that
     136             : // even if hard links are used, the space overhead for the checkpoint will
     137             : // increase over time as the DB performs compactions.
     138             : //
     139             : // Note that shared files in a checkpoint could get deleted if the DB is
     140             : // restarted after a checkpoint operation, as the reference for the checkpoint
     141             : // is only maintained in memory. This is okay as long as users of Checkpoint
     142             : // crash shortly afterwards with a "poison file" preventing further restarts.
     143             : func (d *DB) Checkpoint(
     144             :         destDir string, opts ...CheckpointOption,
     145             : ) (
     146             :         ckErr error, /* used in deferred cleanup */
     147           2 : ) {
     148           2 :         opt := &checkpointOptions{}
     149           2 :         for _, fn := range opts {
     150           2 :                 fn(opt)
     151           2 :         }
     152             : 
     153           2 :         if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
     154           1 :                 if err == nil {
     155           1 :                         return &os.PathError{
     156           1 :                                 Op:   "checkpoint",
     157           1 :                                 Path: destDir,
     158           1 :                                 Err:  oserror.ErrExist,
     159           1 :                         }
     160           1 :                 }
     161           0 :                 return err
     162             :         }
     163             : 
     164           2 :         if opt.flushWAL && !d.opts.DisableWAL {
     165           1 :                 // Write an empty log-data record to flush and sync the WAL.
     166           1 :                 if err := d.LogData(nil /* data */, Sync); err != nil {
     167           0 :                         return err
     168           0 :                 }
     169             :         }
     170             : 
     171             :         // Disable file deletions.
     172           2 :         d.mu.Lock()
     173           2 :         d.disableFileDeletions()
     174           2 :         defer func() {
     175           2 :                 d.mu.Lock()
     176           2 :                 defer d.mu.Unlock()
     177           2 :                 d.enableFileDeletions()
     178           2 :         }()
     179             : 
     180             :         // TODO(peter): RocksDB provides the option to roll the manifest if the
     181             :         // MANIFEST size is too large. Should we do this too?
     182             : 
     183             :         // Lock the manifest before getting the current version. We need the
     184             :         // length of the manifest that we read to match the current version that
     185             :         // we read, otherwise we might copy a versionEdit not reflected in the
     186             :         // sstables we copy/link.
     187           2 :         d.mu.versions.logLock()
     188           2 :         // Get the the current version and the current manifest file number.
     189           2 :         current := d.mu.versions.currentVersion()
     190           2 :         formatVers := d.FormatMajorVersion()
     191           2 :         manifestFileNum := d.mu.versions.manifestFileNum
     192           2 :         manifestSize := d.mu.versions.manifest.Size()
     193           2 :         optionsFileNum := d.optionsFileNum
     194           2 : 
     195           2 :         virtualBackingFiles := make(map[base.DiskFileNum]struct{})
     196           2 :         d.mu.versions.virtualBackings.ForEach(func(backing *fileBacking) {
     197           2 :                 virtualBackingFiles[backing.DiskFileNum] = struct{}{}
     198           2 :         })
     199             : 
     200             :         // Acquire the logs while holding mutexes to ensure we don't race with a
     201             :         // flush that might mark a log that's relevant to `current` as obsolete
     202             :         // before our call to List.
     203           2 :         allLogicalLogs := d.mu.log.manager.List()
     204           2 : 
     205           2 :         // Release the manifest and DB.mu so we don't block other operations on
     206           2 :         // the database.
     207           2 :         d.mu.versions.logUnlock()
     208           2 :         d.mu.Unlock()
     209           2 : 
     210           2 :         // Wrap the normal filesystem with one which wraps newly created files with
     211           2 :         // vfs.NewSyncingFile.
     212           2 :         fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
     213           2 :                 NoSyncOnClose: d.opts.NoSyncOnClose,
     214           2 :                 BytesPerSync:  d.opts.BytesPerSync,
     215           2 :         })
     216           2 : 
     217           2 :         // Create the dir and its parents (if necessary), and sync them.
     218           2 :         var dir vfs.File
     219           2 :         defer func() {
     220           2 :                 if dir != nil {
     221           0 :                         _ = dir.Close()
     222           0 :                 }
     223           2 :                 if ckErr != nil {
     224           0 :                         // Attempt to cleanup on error.
     225           0 :                         _ = fs.RemoveAll(destDir)
     226           0 :                 }
     227             :         }()
     228           2 :         dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
     229           2 :         if ckErr != nil {
     230           0 :                 return ckErr
     231           0 :         }
     232             : 
     233           2 :         {
     234           2 :                 // Copy the OPTIONS.
     235           2 :                 srcPath := base.MakeFilepath(fs, d.dirname, fileTypeOptions, optionsFileNum)
     236           2 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     237           2 :                 ckErr = copyCheckpointOptions(fs, srcPath, destPath)
     238           2 :                 if ckErr != nil {
     239           0 :                         return ckErr
     240           0 :                 }
     241             :         }
     242             : 
     243           2 :         {
     244           2 :                 // Set the format major version in the destination directory.
     245           2 :                 var versionMarker *atomicfs.Marker
     246           2 :                 versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
     247           2 :                 if ckErr != nil {
     248           0 :                         return ckErr
     249           0 :                 }
     250             : 
     251             :                 // We use the marker to encode the active format version in the
     252             :                 // marker filename. Unlike other uses of the atomic marker,
     253             :                 // there is no file with the filename `formatVers.String()` on
     254             :                 // the filesystem.
     255           2 :                 ckErr = versionMarker.Move(formatVers.String())
     256           2 :                 if ckErr != nil {
     257           0 :                         return ckErr
     258           0 :                 }
     259           2 :                 ckErr = versionMarker.Close()
     260           2 :                 if ckErr != nil {
     261           0 :                         return ckErr
     262           0 :                 }
     263             :         }
     264             : 
     265           2 :         var excludedFiles map[deletedFileEntry]*fileMetadata
     266           2 :         var remoteFiles []base.DiskFileNum
     267           2 :         // Set of FileBacking.DiskFileNum which will be required by virtual sstables
     268           2 :         // in the checkpoint.
     269           2 :         requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
     270           2 :         // Link or copy the sstables.
     271           2 :         for l := range current.Levels {
     272           2 :                 iter := current.Levels[l].Iter()
     273           2 :                 for f := iter.First(); f != nil; f = iter.Next() {
     274           2 :                         if excludeFromCheckpoint(f, opt, d.cmp) {
     275           2 :                                 if excludedFiles == nil {
     276           2 :                                         excludedFiles = make(map[deletedFileEntry]*fileMetadata)
     277           2 :                                 }
     278           2 :                                 excludedFiles[deletedFileEntry{
     279           2 :                                         Level:   l,
     280           2 :                                         FileNum: f.FileNum,
     281           2 :                                 }] = f
     282           2 :                                 continue
     283             :                         }
     284             : 
     285           2 :                         fileBacking := f.FileBacking
     286           2 :                         if f.Virtual {
     287           2 :                                 if _, ok := requiredVirtualBackingFiles[fileBacking.DiskFileNum]; ok {
     288           2 :                                         continue
     289             :                                 }
     290           2 :                                 requiredVirtualBackingFiles[fileBacking.DiskFileNum] = struct{}{}
     291             :                         }
     292           2 :                         meta, err := d.objProvider.Lookup(fileTypeTable, fileBacking.DiskFileNum)
     293           2 :                         if err != nil {
     294           0 :                                 ckErr = err
     295           0 :                                 return ckErr
     296           0 :                         }
     297           2 :                         if meta.IsRemote() {
     298           1 :                                 // We don't copy remote files. This is desirable as checkpointing is
     299           1 :                                 // supposed to be a fast operation, and references to remote files can
     300           1 :                                 // always be resolved by any checkpoint readers by reading the object
     301           1 :                                 // catalog. We don't add this file to excludedFiles either, as that'd
     302           1 :                                 // cause it to be deleted in the second manifest entry which is also
     303           1 :                                 // inaccurate.
     304           1 :                                 remoteFiles = append(remoteFiles, meta.DiskFileNum)
     305           1 :                                 continue
     306             :                         }
     307             : 
     308           2 :                         srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, fileBacking.DiskFileNum)
     309           2 :                         destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     310           2 :                         ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
     311           2 :                         if ckErr != nil {
     312           0 :                                 return ckErr
     313           0 :                         }
     314             :                 }
     315             :         }
     316             : 
     317           2 :         var removeBackingTables []base.DiskFileNum
     318           2 :         for diskFileNum := range virtualBackingFiles {
     319           2 :                 if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
     320           2 :                         // The backing sstable associated with fileNum is no longer
     321           2 :                         // required.
     322           2 :                         removeBackingTables = append(removeBackingTables, diskFileNum)
     323           2 :                 }
     324             :         }
     325             : 
     326           2 :         ckErr = d.writeCheckpointManifest(
     327           2 :                 fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
     328           2 :                 excludedFiles, removeBackingTables,
     329           2 :         )
     330           2 :         if ckErr != nil {
     331           0 :                 return ckErr
     332           0 :         }
     333           2 :         if len(remoteFiles) > 0 {
     334           1 :                 ckErr = d.objProvider.CheckpointState(fs, destDir, fileTypeTable, remoteFiles)
     335           1 :                 if ckErr != nil {
     336           0 :                         return ckErr
     337           0 :                 }
     338             :         }
     339             : 
     340             :         // Copy the WAL files. We copy rather than link because WAL file recycling
     341             :         // will cause the WAL files to be reused which would invalidate the
     342             :         // checkpoint. It's possible allLogicalLogs includes logs that are not
     343             :         // relevant (beneath the version's MinUnflushedLogNum). These extra files
     344             :         // are harmless. The earlier (wal.Manager).List call will not include
     345             :         // obsolete logs that are sitting in the recycler or have already been
     346             :         // passed off to the cleanup manager for deletion.
     347             :         //
     348             :         // TODO(jackson): It would be desirable to copy all recycling and obsolete
     349             :         // WALs to aid corruption postmortem debugging should we need them.
     350           2 :         for _, log := range allLogicalLogs {
     351           2 :                 for i := 0; i < log.NumSegments(); i++ {
     352           2 :                         srcFS, srcPath := log.SegmentLocation(i)
     353           2 :                         destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath))
     354           2 :                         ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath)
     355           2 :                         if ckErr != nil {
     356           0 :                                 return ckErr
     357           0 :                         }
     358             :                 }
     359             :         }
     360             : 
     361             :         // Sync and close the checkpoint directory.
     362           2 :         ckErr = dir.Sync()
     363           2 :         if ckErr != nil {
     364           0 :                 return ckErr
     365           0 :         }
     366           2 :         ckErr = dir.Close()
     367           2 :         dir = nil
     368           2 :         return ckErr
     369             : }
     370             : 
     371             : // copyCheckpointOptions copies an OPTIONS file, commenting out some options
     372             : // that existed on the original database but no longer apply to the checkpointed
     373             : // database. For example, the entire [WAL Failover] stanza is commented out
     374             : // because Checkpoint will copy all WAL segment files from both the primary and
     375             : // secondary WAL directories into the checkpoint.
     376           2 : func copyCheckpointOptions(fs vfs.FS, srcPath, dstPath string) error {
     377           2 :         var buf bytes.Buffer
     378           2 :         f, err := fs.Open(srcPath)
     379           2 :         if err != nil {
     380           0 :                 return err
     381           0 :         }
     382           2 :         defer f.Close()
     383           2 :         b, err := io.ReadAll(f)
     384           2 :         if err != nil {
     385           0 :                 return err
     386           0 :         }
     387             :         // Copy the OPTIONS file verbatim, but commenting out the [WAL Failover]
     388             :         // section.
     389           2 :         err = parseOptions(string(b), parseOptionsFuncs{
     390           2 :                 visitNewSection: func(startOff, endOff int, section string) error {
     391           2 :                         if section == "WAL Failover" {
     392           2 :                                 buf.WriteString("# ")
     393           2 :                         }
     394           2 :                         buf.Write(b[startOff:endOff])
     395           2 :                         return nil
     396             :                 },
     397           2 :                 visitKeyValue: func(startOff, endOff int, section, key, value string) error {
     398           2 :                         if section == "WAL Failover" {
     399           2 :                                 buf.WriteString("# ")
     400           2 :                         }
     401           2 :                         buf.Write(b[startOff:endOff])
     402           2 :                         return nil
     403             :                 },
     404           2 :                 visitCommentOrWhitespace: func(startOff, endOff int, line string) error {
     405           2 :                         buf.Write(b[startOff:endOff])
     406           2 :                         return nil
     407           2 :                 },
     408             :         })
     409           2 :         if err != nil {
     410           0 :                 return err
     411           0 :         }
     412           2 :         nf, err := fs.Create(dstPath, vfs.WriteCategoryUnspecified)
     413           2 :         if err != nil {
     414           0 :                 return err
     415           0 :         }
     416           2 :         _, err = io.Copy(nf, &buf)
     417           2 :         if err != nil {
     418           0 :                 return err
     419           0 :         }
     420           2 :         return errors.CombineErrors(nf.Sync(), nf.Close())
     421             : }
     422             : 
     423             : func (d *DB) writeCheckpointManifest(
     424             :         fs vfs.FS,
     425             :         formatVers FormatMajorVersion,
     426             :         destDirPath string,
     427             :         destDir vfs.File,
     428             :         manifestFileNum base.DiskFileNum,
     429             :         manifestSize int64,
     430             :         excludedFiles map[deletedFileEntry]*fileMetadata,
     431             :         removeBackingTables []base.DiskFileNum,
     432           2 : ) error {
     433           2 :         // Copy the MANIFEST, and create a pointer to it. We copy rather
     434           2 :         // than link because additional version edits added to the
     435           2 :         // MANIFEST after we took our snapshot of the sstables will
     436           2 :         // reference sstables that aren't in our checkpoint. For a
     437           2 :         // similar reason, we need to limit how much of the MANIFEST we
     438           2 :         // copy.
     439           2 :         // If some files are excluded from the checkpoint, also append a block that
     440           2 :         // records those files as deleted.
     441           2 :         if err := func() error {
     442           2 :                 srcPath := base.MakeFilepath(fs, d.dirname, fileTypeManifest, manifestFileNum)
     443           2 :                 destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
     444           2 :                 src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
     445           2 :                 if err != nil {
     446           0 :                         return err
     447           0 :                 }
     448           2 :                 defer src.Close()
     449           2 : 
     450           2 :                 dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
     451           2 :                 if err != nil {
     452           0 :                         return err
     453           0 :                 }
     454           2 :                 defer dst.Close()
     455           2 : 
     456           2 :                 // Copy all existing records. We need to copy at the record level in case we
     457           2 :                 // need to append another record with the excluded files (we cannot simply
     458           2 :                 // append a record after a raw data copy; see
     459           2 :                 // https://github.com/cockroachdb/cockroach/issues/100935).
     460           2 :                 r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
     461           2 :                 w := record.NewWriter(dst)
     462           2 :                 for {
     463           2 :                         rr, err := r.Next()
     464           2 :                         if err != nil {
     465           2 :                                 if err == io.EOF {
     466           2 :                                         break
     467             :                                 }
     468           0 :                                 return err
     469             :                         }
     470             : 
     471           2 :                         rw, err := w.Next()
     472           2 :                         if err != nil {
     473           0 :                                 return err
     474           0 :                         }
     475           2 :                         if _, err := io.Copy(rw, rr); err != nil {
     476           0 :                                 return err
     477           0 :                         }
     478             :                 }
     479             : 
     480           2 :                 if len(excludedFiles) > 0 {
     481           2 :                         // Write out an additional VersionEdit that deletes the excluded SST files.
     482           2 :                         ve := versionEdit{
     483           2 :                                 DeletedFiles:         excludedFiles,
     484           2 :                                 RemovedBackingTables: removeBackingTables,
     485           2 :                         }
     486           2 : 
     487           2 :                         rw, err := w.Next()
     488           2 :                         if err != nil {
     489           0 :                                 return err
     490           0 :                         }
     491           2 :                         if err := ve.Encode(rw); err != nil {
     492           0 :                                 return err
     493           0 :                         }
     494             :                 }
     495           2 :                 if err := w.Close(); err != nil {
     496           0 :                         return err
     497           0 :                 }
     498           2 :                 return dst.Sync()
     499           0 :         }(); err != nil {
     500           0 :                 return err
     501           0 :         }
     502             : 
     503           2 :         var manifestMarker *atomicfs.Marker
     504           2 :         manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
     505           2 :         if err != nil {
     506           0 :                 return err
     507           0 :         }
     508           2 :         if err := manifestMarker.Move(base.MakeFilename(fileTypeManifest, manifestFileNum)); err != nil {
     509           0 :                 return err
     510           0 :         }
     511           2 :         return manifestMarker.Close()
     512             : }

Generated by: LCOV version 1.14