LCOV - code coverage report
Current view: top level - pebble - checkpoint.go (source / functions) Hit Total Coverage
Test: 2023-09-11 08:17Z 1efa535d - meta test only.lcov Lines: 188 272 69.1 %
Date: 2023-09-11 08:18:45 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "io"
       9             :         "os"
      10             : 
      11             :         "github.com/cockroachdb/errors/oserror"
      12             :         "github.com/cockroachdb/pebble/internal/base"
      13             :         "github.com/cockroachdb/pebble/record"
      14             :         "github.com/cockroachdb/pebble/vfs"
      15             :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      16             : )
      17             : 
      18             : // checkpointOptions hold the optional parameters to construct checkpoint
      19             : // snapshots.
      20             : type checkpointOptions struct {
      21             :         // flushWAL set to true will force a flush and sync of the WAL prior to
      22             :         // checkpointing.
      23             :         flushWAL bool
      24             : 
      25             :         // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
      26             :         restrictToSpans []CheckpointSpan
      27             : }
      28             : 
      29             : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
      30             : type CheckpointOption func(*checkpointOptions)
      31             : 
      32             : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
      33             : // checkpoint. This guarantees that any writes committed before calling
      34             : // DB.Checkpoint will be part of that checkpoint.
      35             : //
      36             : // Note that this setting can only be useful in cases when some writes are
      37             : // performed with Sync = false. Otherwise, the guarantee will already be met.
      38             : //
      39             : // Passing this option is functionally equivalent to calling
      40             : // DB.LogData(nil, Sync) right before DB.Checkpoint.
      41           0 : func WithFlushedWAL() CheckpointOption {
      42           0 :         return func(opt *checkpointOptions) {
      43           0 :                 opt.flushWAL = true
      44           0 :         }
      45             : }
      46             : 
      47             : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
      48             : // that don't overlap with any of these spans are excluded from the checkpoint.
      49             : //
      50             : // Note that the checkpoint can still surface keys outside of these spans (from
      51             : // the WAL and from SSTs that partially overlap with these spans). Moreover,
      52             : // these surface keys aren't necessarily "valid" in that they could have been
      53             : // modified but the SST containing the modification is excluded.
      54           1 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
      55           1 :         return func(opt *checkpointOptions) {
      56           1 :                 opt.restrictToSpans = spans
      57           1 :         }
      58             : }
      59             : 
      60             : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
      61             : // End) of interest for a checkpoint.
      62             : type CheckpointSpan struct {
      63             :         Start []byte
      64             :         End   []byte
      65             : }
      66             : 
      67             : // excludeFromCheckpoint returns true if an SST file should be excluded from the
      68             : // checkpoint because it does not overlap with the spans of interest
      69             : // (opt.restrictToSpans).
      70           1 : func excludeFromCheckpoint(f *fileMetadata, opt *checkpointOptions, cmp Compare) bool {
      71           1 :         if len(opt.restrictToSpans) == 0 {
      72           1 :                 // Option not set; don't exclude anything.
      73           1 :                 return false
      74           1 :         }
      75           1 :         for _, s := range opt.restrictToSpans {
      76           1 :                 if f.Overlaps(cmp, s.Start, s.End, true /* exclusiveEnd */) {
      77           1 :                         return false
      78           1 :                 }
      79             :         }
      80             :         // None of the restrictToSpans overlapped; we can exclude this file.
      81           1 :         return true
      82             : }
      83             : 
      84             : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
      85             : // Those missing parents, as well as the closest existing ancestor, are synced.
      86             : // Returns a handle to the directory created at destDir.
      87           1 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
      88           1 :         // Collect paths for all directories between destDir (excluded) and its
      89           1 :         // closest existing ancestor (included).
      90           1 :         var parentPaths []string
      91           1 :         foundExistingAncestor := false
      92           1 :         for parentPath := fs.PathDir(destDir); parentPath != "."; parentPath = fs.PathDir(parentPath) {
      93           1 :                 parentPaths = append(parentPaths, parentPath)
      94           1 :                 _, err := fs.Stat(parentPath)
      95           1 :                 if err == nil {
      96           1 :                         // Exit loop at the closest existing ancestor.
      97           1 :                         foundExistingAncestor = true
      98           1 :                         break
      99             :                 }
     100           1 :                 if !oserror.IsNotExist(err) {
     101           0 :                         return nil, err
     102           0 :                 }
     103             :         }
     104             :         // Handle empty filesystem edge case.
     105           1 :         if !foundExistingAncestor {
     106           0 :                 parentPaths = append(parentPaths, "")
     107           0 :         }
     108             :         // Create destDir and any of its missing parents.
     109           1 :         if err := fs.MkdirAll(destDir, 0755); err != nil {
     110           0 :                 return nil, err
     111           0 :         }
     112             :         // Sync all the parent directories up to the closest existing ancestor,
     113             :         // included.
     114           1 :         for _, parentPath := range parentPaths {
     115           1 :                 parentDir, err := fs.OpenDir(parentPath)
     116           1 :                 if err != nil {
     117           0 :                         return nil, err
     118           0 :                 }
     119           1 :                 err = parentDir.Sync()
     120           1 :                 if err != nil {
     121           0 :                         _ = parentDir.Close()
     122           0 :                         return nil, err
     123           0 :                 }
     124           1 :                 err = parentDir.Close()
     125           1 :                 if err != nil {
     126           0 :                         return nil, err
     127           0 :                 }
     128             :         }
     129           1 :         return fs.OpenDir(destDir)
     130             : }
     131             : 
     132             : // Checkpoint constructs a snapshot of the DB instance in the specified
     133             : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
     134             : // snapshot. Hard links will be used when possible. Beware of the significant
     135             : // space overhead for a checkpoint if hard links are disabled. Also beware that
     136             : // even if hard links are used, the space overhead for the checkpoint will
     137             : // increase over time as the DB performs compactions.
     138             : //
     139             : // TODO(bananabrick): Test checkpointing of virtual sstables once virtual
     140             : // sstables is running e2e.
     141             : func (d *DB) Checkpoint(
     142             :         destDir string, opts ...CheckpointOption,
     143             : ) (
     144             :         ckErr error, /* used in deferred cleanup */
     145           1 : ) {
     146           1 :         opt := &checkpointOptions{}
     147           1 :         for _, fn := range opts {
     148           1 :                 fn(opt)
     149           1 :         }
     150             : 
     151           1 :         if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
     152           0 :                 if err == nil {
     153           0 :                         return &os.PathError{
     154           0 :                                 Op:   "checkpoint",
     155           0 :                                 Path: destDir,
     156           0 :                                 Err:  oserror.ErrExist,
     157           0 :                         }
     158           0 :                 }
     159           0 :                 return err
     160             :         }
     161             : 
     162           1 :         if opt.flushWAL && !d.opts.DisableWAL {
     163           0 :                 // Write an empty log-data record to flush and sync the WAL.
     164           0 :                 if err := d.LogData(nil /* data */, Sync); err != nil {
     165           0 :                         return err
     166           0 :                 }
     167             :         }
     168             : 
     169             :         // Disable file deletions.
     170           1 :         d.mu.Lock()
     171           1 :         d.disableFileDeletions()
     172           1 :         defer func() {
     173           1 :                 d.mu.Lock()
     174           1 :                 defer d.mu.Unlock()
     175           1 :                 d.enableFileDeletions()
     176           1 :         }()
     177             : 
     178             :         // TODO(peter): RocksDB provides the option to roll the manifest if the
     179             :         // MANIFEST size is too large. Should we do this too?
     180             : 
     181             :         // Lock the manifest before getting the current version. We need the
     182             :         // length of the manifest that we read to match the current version that
     183             :         // we read, otherwise we might copy a versionEdit not reflected in the
     184             :         // sstables we copy/link.
     185           1 :         d.mu.versions.logLock()
     186           1 :         // Get the unflushed log files, the current version, and the current manifest
     187           1 :         // file number.
     188           1 :         memQueue := d.mu.mem.queue
     189           1 :         current := d.mu.versions.currentVersion()
     190           1 :         formatVers := d.FormatMajorVersion()
     191           1 :         manifestFileNum := d.mu.versions.manifestFileNum
     192           1 :         manifestSize := d.mu.versions.manifest.Size()
     193           1 :         optionsFileNum := d.optionsFileNum
     194           1 :         virtualBackingFiles := make(map[base.DiskFileNum]struct{})
     195           1 :         for diskFileNum := range d.mu.versions.fileBackingMap {
     196           0 :                 virtualBackingFiles[diskFileNum] = struct{}{}
     197           0 :         }
     198             :         // Release the manifest and DB.mu so we don't block other operations on
     199             :         // the database.
     200           1 :         d.mu.versions.logUnlock()
     201           1 :         d.mu.Unlock()
     202           1 : 
     203           1 :         // Wrap the normal filesystem with one which wraps newly created files with
     204           1 :         // vfs.NewSyncingFile.
     205           1 :         fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
     206           1 :                 NoSyncOnClose: d.opts.NoSyncOnClose,
     207           1 :                 BytesPerSync:  d.opts.BytesPerSync,
     208           1 :         })
     209           1 : 
     210           1 :         // Create the dir and its parents (if necessary), and sync them.
     211           1 :         var dir vfs.File
     212           1 :         defer func() {
     213           1 :                 if dir != nil {
     214           0 :                         _ = dir.Close()
     215           0 :                 }
     216           1 :                 if ckErr != nil {
     217           0 :                         // Attempt to cleanup on error.
     218           0 :                         _ = fs.RemoveAll(destDir)
     219           0 :                 }
     220             :         }()
     221           1 :         dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
     222           1 :         if ckErr != nil {
     223           0 :                 return ckErr
     224           0 :         }
     225             : 
     226           1 :         {
     227           1 :                 // Link or copy the OPTIONS.
     228           1 :                 srcPath := base.MakeFilepath(fs, d.dirname, fileTypeOptions, optionsFileNum)
     229           1 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     230           1 :                 ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
     231           1 :                 if ckErr != nil {
     232           0 :                         return ckErr
     233           0 :                 }
     234             :         }
     235             : 
     236           1 :         {
     237           1 :                 // Set the format major version in the destination directory.
     238           1 :                 var versionMarker *atomicfs.Marker
     239           1 :                 versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
     240           1 :                 if ckErr != nil {
     241           0 :                         return ckErr
     242           0 :                 }
     243             : 
     244             :                 // We use the marker to encode the active format version in the
     245             :                 // marker filename. Unlike other uses of the atomic marker,
     246             :                 // there is no file with the filename `formatVers.String()` on
     247             :                 // the filesystem.
     248           1 :                 ckErr = versionMarker.Move(formatVers.String())
     249           1 :                 if ckErr != nil {
     250           0 :                         return ckErr
     251           0 :                 }
     252           1 :                 ckErr = versionMarker.Close()
     253           1 :                 if ckErr != nil {
     254           0 :                         return ckErr
     255           0 :                 }
     256             :         }
     257             : 
     258           1 :         var excludedFiles map[deletedFileEntry]*fileMetadata
     259           1 :         // Set of FileBacking.DiskFileNum which will be required by virtual sstables
     260           1 :         // in the checkpoint.
     261           1 :         requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
     262           1 :         // Link or copy the sstables.
     263           1 :         for l := range current.Levels {
     264           1 :                 iter := current.Levels[l].Iter()
     265           1 :                 for f := iter.First(); f != nil; f = iter.Next() {
     266           1 :                         if excludeFromCheckpoint(f, opt, d.cmp) {
     267           1 :                                 if excludedFiles == nil {
     268           1 :                                         excludedFiles = make(map[deletedFileEntry]*fileMetadata)
     269           1 :                                 }
     270           1 :                                 excludedFiles[deletedFileEntry{
     271           1 :                                         Level:   l,
     272           1 :                                         FileNum: f.FileNum,
     273           1 :                                 }] = f
     274           1 :                                 continue
     275             :                         }
     276             : 
     277           1 :                         fileBacking := f.FileBacking
     278           1 :                         if f.Virtual {
     279           0 :                                 if _, ok := requiredVirtualBackingFiles[fileBacking.DiskFileNum]; ok {
     280           0 :                                         continue
     281             :                                 }
     282           0 :                                 requiredVirtualBackingFiles[fileBacking.DiskFileNum] = struct{}{}
     283             :                         }
     284             : 
     285           1 :                         srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, fileBacking.DiskFileNum)
     286           1 :                         destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     287           1 :                         ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
     288           1 :                         if ckErr != nil {
     289           0 :                                 return ckErr
     290           0 :                         }
     291             :                 }
     292             :         }
     293             : 
     294           1 :         var removeBackingTables []base.DiskFileNum
     295           1 :         for diskFileNum := range virtualBackingFiles {
     296           0 :                 if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
     297           0 :                         // The backing sstable associated with fileNum is no longer
     298           0 :                         // required.
     299           0 :                         removeBackingTables = append(removeBackingTables, diskFileNum)
     300           0 :                 }
     301             :         }
     302             : 
     303           1 :         ckErr = d.writeCheckpointManifest(
     304           1 :                 fs, formatVers, destDir, dir, manifestFileNum.DiskFileNum(), manifestSize,
     305           1 :                 excludedFiles, removeBackingTables,
     306           1 :         )
     307           1 :         if ckErr != nil {
     308           0 :                 return ckErr
     309           0 :         }
     310             : 
     311             :         // Copy the WAL files. We copy rather than link because WAL file recycling
     312             :         // will cause the WAL files to be reused which would invalidate the
     313             :         // checkpoint.
     314           1 :         for i := range memQueue {
     315           1 :                 logNum := memQueue[i].logNum
     316           1 :                 if logNum == 0 {
     317           1 :                         continue
     318             :                 }
     319           1 :                 srcPath := base.MakeFilepath(fs, d.walDirname, fileTypeLog, logNum.DiskFileNum())
     320           1 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     321           1 :                 ckErr = vfs.Copy(fs, srcPath, destPath)
     322           1 :                 if ckErr != nil {
     323           0 :                         return ckErr
     324           0 :                 }
     325             :         }
     326             : 
     327             :         // Sync and close the checkpoint directory.
     328           1 :         ckErr = dir.Sync()
     329           1 :         if ckErr != nil {
     330           0 :                 return ckErr
     331           0 :         }
     332           1 :         ckErr = dir.Close()
     333           1 :         dir = nil
     334           1 :         return ckErr
     335             : }
     336             : 
     337             : func (d *DB) writeCheckpointManifest(
     338             :         fs vfs.FS,
     339             :         formatVers FormatMajorVersion,
     340             :         destDirPath string,
     341             :         destDir vfs.File,
     342             :         manifestFileNum base.DiskFileNum,
     343             :         manifestSize int64,
     344             :         excludedFiles map[deletedFileEntry]*fileMetadata,
     345             :         removeBackingTables []base.DiskFileNum,
     346           1 : ) error {
     347           1 :         // Copy the MANIFEST, and create a pointer to it. We copy rather
     348           1 :         // than link because additional version edits added to the
     349           1 :         // MANIFEST after we took our snapshot of the sstables will
     350           1 :         // reference sstables that aren't in our checkpoint. For a
     351           1 :         // similar reason, we need to limit how much of the MANIFEST we
     352           1 :         // copy.
     353           1 :         // If some files are excluded from the checkpoint, also append a block that
     354           1 :         // records those files as deleted.
     355           1 :         if err := func() error {
     356           1 :                 srcPath := base.MakeFilepath(fs, d.dirname, fileTypeManifest, manifestFileNum)
     357           1 :                 destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
     358           1 :                 src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
     359           1 :                 if err != nil {
     360           0 :                         return err
     361           0 :                 }
     362           1 :                 defer src.Close()
     363           1 : 
     364           1 :                 dst, err := fs.Create(destPath)
     365           1 :                 if err != nil {
     366           0 :                         return err
     367           0 :                 }
     368           1 :                 defer dst.Close()
     369           1 : 
     370           1 :                 // Copy all existing records. We need to copy at the record level in case we
     371           1 :                 // need to append another record with the excluded files (we cannot simply
     372           1 :                 // append a record after a raw data copy; see
     373           1 :                 // https://github.com/cockroachdb/cockroach/issues/100935).
     374           1 :                 r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum.FileNum())
     375           1 :                 w := record.NewWriter(dst)
     376           1 :                 for {
     377           1 :                         rr, err := r.Next()
     378           1 :                         if err != nil {
     379           1 :                                 if err == io.EOF {
     380           1 :                                         break
     381             :                                 }
     382           0 :                                 return err
     383             :                         }
     384             : 
     385           1 :                         rw, err := w.Next()
     386           1 :                         if err != nil {
     387           0 :                                 return err
     388           0 :                         }
     389           1 :                         if _, err := io.Copy(rw, rr); err != nil {
     390           0 :                                 return err
     391           0 :                         }
     392             :                 }
     393             : 
     394           1 :                 if len(excludedFiles) > 0 {
     395           1 :                         // Write out an additional VersionEdit that deletes the excluded SST files.
     396           1 :                         ve := versionEdit{
     397           1 :                                 DeletedFiles:         excludedFiles,
     398           1 :                                 RemovedBackingTables: removeBackingTables,
     399           1 :                         }
     400           1 : 
     401           1 :                         rw, err := w.Next()
     402           1 :                         if err != nil {
     403           0 :                                 return err
     404           0 :                         }
     405           1 :                         if err := ve.Encode(rw); err != nil {
     406           0 :                                 return err
     407           0 :                         }
     408             :                 }
     409           1 :                 if err := w.Close(); err != nil {
     410           0 :                         return err
     411           0 :                 }
     412           1 :                 return dst.Sync()
     413           0 :         }(); err != nil {
     414           0 :                 return err
     415           0 :         }
     416             : 
     417             :         // Recent format versions use an atomic marker for setting the
     418             :         // active manifest. Older versions use the CURRENT file. The
     419             :         // setCurrentFunc function will return a closure that will
     420             :         // take the appropriate action for the database's format
     421             :         // version.
     422           1 :         var manifestMarker *atomicfs.Marker
     423           1 :         manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
     424           1 :         if err != nil {
     425           0 :                 return err
     426           0 :         }
     427           1 :         if err := setCurrentFunc(formatVers, manifestMarker, fs, destDirPath, destDir)(manifestFileNum.FileNum()); err != nil {
     428           0 :                 return err
     429           0 :         }
     430           1 :         return manifestMarker.Close()
     431             : }

Generated by: LCOV version 1.14