LCOV - code coverage report
Current view: top level - pebble - checkpoint.go (source / functions) Coverage Total Hit
Test: 2025-01-16 08:16Z 4d8664cd - meta test only.lcov Lines: 70.7 % 331 234
Test Date: 2025-01-16 08:17:31 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "io"
      10              :         "os"
      11              : 
      12              :         "github.com/cockroachdb/errors"
      13              :         "github.com/cockroachdb/errors/oserror"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/record"
      16              :         "github.com/cockroachdb/pebble/vfs"
      17              :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      18              : )
      19              : 
      20              : // checkpointOptions hold the optional parameters to construct checkpoint
      21              : // snapshots.
      22              : type checkpointOptions struct {
      23              :         // flushWAL set to true will force a flush and sync of the WAL prior to
      24              :         // checkpointing.
      25              :         flushWAL bool
      26              : 
      27              :         // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
      28              :         restrictToSpans []CheckpointSpan
      29              : }
      30              : 
      31              : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
      32              : type CheckpointOption func(*checkpointOptions)
      33              : 
      34              : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
      35              : // checkpoint. This guarantees that any writes committed before calling
      36              : // DB.Checkpoint will be part of that checkpoint.
      37              : //
      38              : // Note that this setting can only be useful in cases when some writes are
      39              : // performed with Sync = false. Otherwise, the guarantee will already be met.
      40              : //
      41              : // Passing this option is functionally equivalent to calling
      42              : // DB.LogData(nil, Sync) right before DB.Checkpoint.
      43            0 : func WithFlushedWAL() CheckpointOption {
      44            0 :         return func(opt *checkpointOptions) {
      45            0 :                 opt.flushWAL = true
      46            0 :         }
      47              : }
      48              : 
      49              : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
      50              : // that don't overlap with any of these spans are excluded from the checkpoint.
      51              : //
      52              : // Note that the checkpoint can still surface keys outside of these spans (from
      53              : // the WAL and from SSTs that partially overlap with these spans). Moreover,
      54              : // these surface keys aren't necessarily "valid" in that they could have been
      55              : // modified but the SST containing the modification is excluded.
      56            1 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
      57            1 :         return func(opt *checkpointOptions) {
      58            1 :                 opt.restrictToSpans = spans
      59            1 :         }
      60              : }
      61              : 
      62              : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
      63              : // End) of interest for a checkpoint.
      64              : type CheckpointSpan struct {
      65              :         Start []byte
      66              :         End   []byte
      67              : }
      68              : 
      69              : // excludeFromCheckpoint returns true if an SST file should be excluded from the
      70              : // checkpoint because it does not overlap with the spans of interest
      71              : // (opt.restrictToSpans).
      72            1 : func excludeFromCheckpoint(f *fileMetadata, opt *checkpointOptions, cmp Compare) bool {
      73            1 :         if len(opt.restrictToSpans) == 0 {
      74            1 :                 // Option not set; don't exclude anything.
      75            1 :                 return false
      76            1 :         }
      77            1 :         for _, s := range opt.restrictToSpans {
      78            1 :                 spanBounds := base.UserKeyBoundsEndExclusive(s.Start, s.End)
      79            1 :                 if f.Overlaps(cmp, &spanBounds) {
      80            1 :                         return false
      81            1 :                 }
      82              :         }
      83              :         // None of the restrictToSpans overlapped; we can exclude this file.
      84            1 :         return true
      85              : }
      86              : 
      87              : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
      88              : // Those missing parents, as well as the closest existing ancestor, are synced.
      89              : // Returns a handle to the directory created at destDir.
      90            1 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
      91            1 :         // Collect paths for all directories between destDir (excluded) and its
      92            1 :         // closest existing ancestor (included).
      93            1 :         var parentPaths []string
      94            1 :         for parentPath := fs.PathDir(destDir); ; parentPath = fs.PathDir(parentPath) {
      95            1 :                 parentPaths = append(parentPaths, parentPath)
      96            1 :                 if fs.PathDir(parentPath) == parentPath {
      97            1 :                         break
      98              :                 }
      99            1 :                 _, err := fs.Stat(parentPath)
     100            1 :                 if err == nil {
     101            1 :                         // Exit loop at the closest existing ancestor.
     102            1 :                         break
     103              :                 }
     104            1 :                 if !oserror.IsNotExist(err) {
     105            0 :                         return nil, err
     106            0 :                 }
     107              :         }
     108              :         // Create destDir and any of its missing parents.
     109            1 :         if err := fs.MkdirAll(destDir, 0755); err != nil {
     110            0 :                 return nil, err
     111            0 :         }
     112              :         // Sync all the parent directories up to the closest existing ancestor,
     113              :         // included.
     114            1 :         for _, parentPath := range parentPaths {
     115            1 :                 parentDir, err := fs.OpenDir(parentPath)
     116            1 :                 if err != nil {
     117            0 :                         return nil, err
     118            0 :                 }
     119            1 :                 err = parentDir.Sync()
     120            1 :                 if err != nil {
     121            0 :                         _ = parentDir.Close()
     122            0 :                         return nil, err
     123            0 :                 }
     124            1 :                 err = parentDir.Close()
     125            1 :                 if err != nil {
     126            0 :                         return nil, err
     127            0 :                 }
     128              :         }
     129            1 :         return fs.OpenDir(destDir)
     130              : }
     131              : 
     132              : // Checkpoint constructs a snapshot of the DB instance in the specified
     133              : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
     134              : // snapshot. Hard links will be used when possible. Beware of the significant
     135              : // space overhead for a checkpoint if hard links are disabled. Also beware that
     136              : // even if hard links are used, the space overhead for the checkpoint will
     137              : // increase over time as the DB performs compactions.
     138              : //
     139              : // Note that shared files in a checkpoint could get deleted if the DB is
     140              : // restarted after a checkpoint operation, as the reference for the checkpoint
     141              : // is only maintained in memory. This is okay as long as users of Checkpoint
     142              : // crash shortly afterwards with a "poison file" preventing further restarts.
     143              : func (d *DB) Checkpoint(
     144              :         destDir string, opts ...CheckpointOption,
     145              : ) (
     146              :         ckErr error, /* used in deferred cleanup */
     147            1 : ) {
     148            1 :         opt := &checkpointOptions{}
     149            1 :         for _, fn := range opts {
     150            1 :                 fn(opt)
     151            1 :         }
     152              : 
     153            1 :         if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
     154            0 :                 if err == nil {
     155            0 :                         return &os.PathError{
     156            0 :                                 Op:   "checkpoint",
     157            0 :                                 Path: destDir,
     158            0 :                                 Err:  oserror.ErrExist,
     159            0 :                         }
     160            0 :                 }
     161            0 :                 return err
     162              :         }
     163              : 
     164            1 :         if opt.flushWAL && !d.opts.DisableWAL {
     165            0 :                 // Write an empty log-data record to flush and sync the WAL.
     166            0 :                 if err := d.LogData(nil /* data */, Sync); err != nil {
     167            0 :                         return err
     168            0 :                 }
     169              :         }
     170              : 
     171              :         // Disable file deletions.
     172            1 :         d.mu.Lock()
     173            1 :         d.disableFileDeletions()
     174            1 :         defer func() {
     175            1 :                 d.mu.Lock()
     176            1 :                 defer d.mu.Unlock()
     177            1 :                 d.enableFileDeletions()
     178            1 :         }()
     179              : 
     180              :         // TODO(peter): RocksDB provides the option to roll the manifest if the
     181              :         // MANIFEST size is too large. Should we do this too?
     182              : 
     183              :         // Lock the manifest before getting the current version. We need the
     184              :         // length of the manifest that we read to match the current version that
     185              :         // we read, otherwise we might copy a versionEdit not reflected in the
     186              :         // sstables we copy/link.
     187            1 :         d.mu.versions.logLock()
     188            1 :         // Get the the current version and the current manifest file number.
     189            1 :         current := d.mu.versions.currentVersion()
     190            1 :         formatVers := d.FormatMajorVersion()
     191            1 :         manifestFileNum := d.mu.versions.manifestFileNum
     192            1 :         manifestSize := d.mu.versions.manifest.Size()
     193            1 :         optionsFileNum := d.optionsFileNum
     194            1 : 
     195            1 :         virtualBackingFiles := make(map[base.DiskFileNum]struct{})
     196            1 :         d.mu.versions.virtualBackings.ForEach(func(backing *fileBacking) {
     197            1 :                 virtualBackingFiles[backing.DiskFileNum] = struct{}{}
     198            1 :         })
     199              : 
     200              :         // Acquire the logs while holding mutexes to ensure we don't race with a
     201              :         // flush that might mark a log that's relevant to `current` as obsolete
     202              :         // before our call to List.
     203            1 :         allLogicalLogs := d.mu.log.manager.List()
     204            1 : 
     205            1 :         // Release the manifest and DB.mu so we don't block other operations on
     206            1 :         // the database.
     207            1 :         d.mu.versions.logUnlock()
     208            1 :         d.mu.Unlock()
     209            1 : 
     210            1 :         // Wrap the normal filesystem with one which wraps newly created files with
     211            1 :         // vfs.NewSyncingFile.
     212            1 :         fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
     213            1 :                 NoSyncOnClose: d.opts.NoSyncOnClose,
     214            1 :                 BytesPerSync:  d.opts.BytesPerSync,
     215            1 :         })
     216            1 : 
     217            1 :         // Create the dir and its parents (if necessary), and sync them.
     218            1 :         var dir vfs.File
     219            1 :         defer func() {
     220            1 :                 if dir != nil {
     221            0 :                         _ = dir.Close()
     222            0 :                 }
     223            1 :                 if ckErr != nil {
     224            0 :                         // Attempt to cleanup on error.
     225            0 :                         _ = fs.RemoveAll(destDir)
     226            0 :                 }
     227              :         }()
     228            1 :         dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
     229            1 :         if ckErr != nil {
     230            0 :                 return ckErr
     231            0 :         }
     232              : 
     233            1 :         {
     234            1 :                 // Copy the OPTIONS.
     235            1 :                 srcPath := base.MakeFilepath(fs, d.dirname, fileTypeOptions, optionsFileNum)
     236            1 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     237            1 :                 ckErr = copyCheckpointOptions(fs, srcPath, destPath)
     238            1 :                 if ckErr != nil {
     239            0 :                         return ckErr
     240            0 :                 }
     241              :         }
     242              : 
     243            1 :         {
     244            1 :                 // Set the format major version in the destination directory.
     245            1 :                 var versionMarker *atomicfs.Marker
     246            1 :                 versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
     247            1 :                 if ckErr != nil {
     248            0 :                         return ckErr
     249            0 :                 }
     250              : 
     251              :                 // We use the marker to encode the active format version in the
     252              :                 // marker filename. Unlike other uses of the atomic marker,
     253              :                 // there is no file with the filename `formatVers.String()` on
     254              :                 // the filesystem.
     255            1 :                 ckErr = versionMarker.Move(formatVers.String())
     256            1 :                 if ckErr != nil {
     257            0 :                         return ckErr
     258            0 :                 }
     259            1 :                 ckErr = versionMarker.Close()
     260            1 :                 if ckErr != nil {
     261            0 :                         return ckErr
     262            0 :                 }
     263              :         }
     264              : 
     265            1 :         var excludedFiles map[deletedFileEntry]*fileMetadata
     266            1 :         var remoteFiles []base.DiskFileNum
     267            1 :         // Set of FileBacking.DiskFileNum which will be required by virtual sstables
     268            1 :         // in the checkpoint.
     269            1 :         requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
     270            1 :         // Link or copy the sstables.
     271            1 :         for l := range current.Levels {
     272            1 :                 iter := current.Levels[l].Iter()
     273            1 :                 for f := iter.First(); f != nil; f = iter.Next() {
     274            1 :                         if excludeFromCheckpoint(f, opt, d.cmp) {
     275            1 :                                 if excludedFiles == nil {
     276            1 :                                         excludedFiles = make(map[deletedFileEntry]*fileMetadata)
     277            1 :                                 }
     278            1 :                                 excludedFiles[deletedFileEntry{
     279            1 :                                         Level:   l,
     280            1 :                                         FileNum: f.FileNum,
     281            1 :                                 }] = f
     282            1 :                                 continue
     283              :                         }
     284              : 
     285            1 :                         fileBacking := f.FileBacking
     286            1 :                         if f.Virtual {
     287            1 :                                 if _, ok := requiredVirtualBackingFiles[fileBacking.DiskFileNum]; ok {
     288            1 :                                         continue
     289              :                                 }
     290            1 :                                 requiredVirtualBackingFiles[fileBacking.DiskFileNum] = struct{}{}
     291              :                         }
     292            1 :                         meta, err := d.objProvider.Lookup(fileTypeTable, fileBacking.DiskFileNum)
     293            1 :                         if err != nil {
     294            0 :                                 ckErr = err
     295            0 :                                 return ckErr
     296            0 :                         }
     297            1 :                         if meta.IsRemote() {
     298            0 :                                 // We don't copy remote files. This is desirable as checkpointing is
     299            0 :                                 // supposed to be a fast operation, and references to remote files can
     300            0 :                                 // always be resolved by any checkpoint readers by reading the object
     301            0 :                                 // catalog. We don't add this file to excludedFiles either, as that'd
     302            0 :                                 // cause it to be deleted in the second manifest entry which is also
     303            0 :                                 // inaccurate.
     304            0 :                                 remoteFiles = append(remoteFiles, meta.DiskFileNum)
     305            0 :                                 continue
     306              :                         }
     307              : 
     308            1 :                         srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, fileBacking.DiskFileNum)
     309            1 :                         destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     310            1 :                         ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
     311            1 :                         if ckErr != nil {
     312            0 :                                 return ckErr
     313            0 :                         }
     314              :                 }
     315              :         }
     316              : 
     317            1 :         var removeBackingTables []base.DiskFileNum
     318            1 :         for diskFileNum := range virtualBackingFiles {
     319            1 :                 if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
     320            1 :                         // The backing sstable associated with fileNum is no longer
     321            1 :                         // required.
     322            1 :                         removeBackingTables = append(removeBackingTables, diskFileNum)
     323            1 :                 }
     324              :         }
     325              : 
     326            1 :         ckErr = d.writeCheckpointManifest(
     327            1 :                 fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
     328            1 :                 excludedFiles, removeBackingTables,
     329            1 :         )
     330            1 :         if ckErr != nil {
     331            0 :                 return ckErr
     332            0 :         }
     333            1 :         if len(remoteFiles) > 0 {
     334            0 :                 ckErr = d.objProvider.CheckpointState(fs, destDir, fileTypeTable, remoteFiles)
     335            0 :                 if ckErr != nil {
     336            0 :                         return ckErr
     337            0 :                 }
     338              :         }
     339              : 
     340              :         // Copy the WAL files. We copy rather than link because WAL file recycling
     341              :         // will cause the WAL files to be reused which would invalidate the
     342              :         // checkpoint. It's possible allLogicalLogs includes logs that are not
     343              :         // relevant (beneath the version's MinUnflushedLogNum). These extra files
     344              :         // are harmless. The earlier (wal.Manager).List call will not include
     345              :         // obsolete logs that are sitting in the recycler or have already been
     346              :         // passed off to the cleanup manager for deletion.
     347              :         //
     348              :         // TODO(jackson): It would be desirable to copy all recycling and obsolete
     349              :         // WALs to aid corruption postmortem debugging should we need them.
     350            1 :         for _, log := range allLogicalLogs {
     351            1 :                 for i := 0; i < log.NumSegments(); i++ {
     352            1 :                         srcFS, srcPath := log.SegmentLocation(i)
     353            1 :                         destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath))
     354            1 :                         ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath)
     355            1 :                         if ckErr != nil {
     356            0 :                                 return ckErr
     357            0 :                         }
     358              :                 }
     359              :         }
     360              : 
     361              :         // Sync and close the checkpoint directory.
     362            1 :         ckErr = dir.Sync()
     363            1 :         if ckErr != nil {
     364            0 :                 return ckErr
     365            0 :         }
     366            1 :         ckErr = dir.Close()
     367            1 :         dir = nil
     368            1 :         return ckErr
     369              : }
     370              : 
     371              : // copyCheckpointOptions copies an OPTIONS file, commenting out some options
     372              : // that existed on the original database but no longer apply to the checkpointed
     373              : // database. For example, the entire [WAL Failover] stanza is commented out
     374              : // because Checkpoint will copy all WAL segment files from both the primary and
     375              : // secondary WAL directories into the checkpoint.
     376            1 : func copyCheckpointOptions(fs vfs.FS, srcPath, dstPath string) error {
     377            1 :         var buf bytes.Buffer
     378            1 :         f, err := fs.Open(srcPath)
     379            1 :         if err != nil {
     380            0 :                 return err
     381            0 :         }
     382            1 :         defer f.Close()
     383            1 :         b, err := io.ReadAll(f)
     384            1 :         if err != nil {
     385            0 :                 return err
     386            0 :         }
     387              :         // Copy the OPTIONS file verbatim, but commenting out the [WAL Failover]
     388              :         // section.
     389            1 :         err = parseOptions(string(b), parseOptionsFuncs{
     390            1 :                 visitNewSection: func(startOff, endOff int, section string) error {
     391            1 :                         if section == "WAL Failover" {
     392            1 :                                 buf.WriteString("# ")
     393            1 :                         }
     394            1 :                         buf.Write(b[startOff:endOff])
     395            1 :                         return nil
     396              :                 },
     397            1 :                 visitKeyValue: func(startOff, endOff int, section, key, value string) error {
     398            1 :                         if section == "WAL Failover" {
     399            1 :                                 buf.WriteString("# ")
     400            1 :                         }
     401            1 :                         buf.Write(b[startOff:endOff])
     402            1 :                         return nil
     403              :                 },
     404            1 :                 visitCommentOrWhitespace: func(startOff, endOff int, line string) error {
     405            1 :                         buf.Write(b[startOff:endOff])
     406            1 :                         return nil
     407            1 :                 },
     408              :         })
     409            1 :         if err != nil {
     410            0 :                 return err
     411            0 :         }
     412            1 :         nf, err := fs.Create(dstPath, vfs.WriteCategoryUnspecified)
     413            1 :         if err != nil {
     414            0 :                 return err
     415            0 :         }
     416            1 :         _, err = io.Copy(nf, &buf)
     417            1 :         if err != nil {
     418            0 :                 return err
     419            0 :         }
     420            1 :         return errors.CombineErrors(nf.Sync(), nf.Close())
     421              : }
     422              : 
     423              : func (d *DB) writeCheckpointManifest(
     424              :         fs vfs.FS,
     425              :         formatVers FormatMajorVersion,
     426              :         destDirPath string,
     427              :         destDir vfs.File,
     428              :         manifestFileNum base.DiskFileNum,
     429              :         manifestSize int64,
     430              :         excludedFiles map[deletedFileEntry]*fileMetadata,
     431              :         removeBackingTables []base.DiskFileNum,
     432            1 : ) error {
     433            1 :         // Copy the MANIFEST, and create a pointer to it. We copy rather
     434            1 :         // than link because additional version edits added to the
     435            1 :         // MANIFEST after we took our snapshot of the sstables will
     436            1 :         // reference sstables that aren't in our checkpoint. For a
     437            1 :         // similar reason, we need to limit how much of the MANIFEST we
     438            1 :         // copy.
     439            1 :         // If some files are excluded from the checkpoint, also append a block that
     440            1 :         // records those files as deleted.
     441            1 :         if err := func() error {
     442            1 :                 srcPath := base.MakeFilepath(fs, d.dirname, fileTypeManifest, manifestFileNum)
     443            1 :                 destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
     444            1 :                 src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
     445            1 :                 if err != nil {
     446            0 :                         return err
     447            0 :                 }
     448            1 :                 defer src.Close()
     449            1 : 
     450            1 :                 dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
     451            1 :                 if err != nil {
     452            0 :                         return err
     453            0 :                 }
     454            1 :                 defer dst.Close()
     455            1 : 
     456            1 :                 // Copy all existing records. We need to copy at the record level in case we
     457            1 :                 // need to append another record with the excluded files (we cannot simply
     458            1 :                 // append a record after a raw data copy; see
     459            1 :                 // https://github.com/cockroachdb/cockroach/issues/100935).
     460            1 :                 r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
     461            1 :                 w := record.NewWriter(dst)
     462            1 :                 for {
     463            1 :                         rr, err := r.Next()
     464            1 :                         if err != nil {
     465            1 :                                 if err == io.EOF {
     466            1 :                                         break
     467              :                                 }
     468            0 :                                 return err
     469              :                         }
     470              : 
     471            1 :                         rw, err := w.Next()
     472            1 :                         if err != nil {
     473            0 :                                 return err
     474            0 :                         }
     475            1 :                         if _, err := io.Copy(rw, rr); err != nil {
     476            0 :                                 return err
     477            0 :                         }
     478              :                 }
     479              : 
     480            1 :                 if len(excludedFiles) > 0 {
     481            1 :                         // Write out an additional VersionEdit that deletes the excluded SST files.
     482            1 :                         ve := versionEdit{
     483            1 :                                 DeletedFiles:         excludedFiles,
     484            1 :                                 RemovedBackingTables: removeBackingTables,
     485            1 :                         }
     486            1 : 
     487            1 :                         rw, err := w.Next()
     488            1 :                         if err != nil {
     489            0 :                                 return err
     490            0 :                         }
     491            1 :                         if err := ve.Encode(rw); err != nil {
     492            0 :                                 return err
     493            0 :                         }
     494              :                 }
     495            1 :                 if err := w.Close(); err != nil {
     496            0 :                         return err
     497            0 :                 }
     498            1 :                 return dst.Sync()
     499            0 :         }(); err != nil {
     500            0 :                 return err
     501            0 :         }
     502              : 
     503            1 :         var manifestMarker *atomicfs.Marker
     504            1 :         manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
     505            1 :         if err != nil {
     506            0 :                 return err
     507            0 :         }
     508            1 :         if err := manifestMarker.Move(base.MakeFilename(fileTypeManifest, manifestFileNum)); err != nil {
     509            0 :                 return err
     510            0 :         }
     511            1 :         return manifestMarker.Close()
     512              : }
        

Generated by: LCOV version 2.0-1