LCOV - code coverage report
Current view: top level - pebble - checkpoint.go (source / functions) Coverage Total Hit
Test: 2025-05-04 08:18Z 52551f8a - meta test only.lcov Lines: 68.7 % 358 246
Test Date: 2025-05-04 08:19:20 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "io"
      10              :         "os"
      11              : 
      12              :         "github.com/cockroachdb/errors"
      13              :         "github.com/cockroachdb/errors/oserror"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/manifest"
      16              :         "github.com/cockroachdb/pebble/record"
      17              :         "github.com/cockroachdb/pebble/vfs"
      18              :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      19              : )
      20              : 
      21              : // checkpointOptions hold the optional parameters to construct checkpoint
      22              : // snapshots.
      23              : type checkpointOptions struct {
      24              :         // flushWAL set to true will force a flush and sync of the WAL prior to
      25              :         // checkpointing.
      26              :         flushWAL bool
      27              : 
      28              :         // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
      29              :         restrictToSpans []CheckpointSpan
      30              : }
      31              : 
      32              : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
      33              : type CheckpointOption func(*checkpointOptions)
      34              : 
      35              : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
      36              : // checkpoint. This guarantees that any writes committed before calling
      37              : // DB.Checkpoint will be part of that checkpoint.
      38              : //
      39              : // Note that this setting can only be useful in cases when some writes are
      40              : // performed with Sync = false. Otherwise, the guarantee will already be met.
      41              : //
      42              : // Passing this option is functionally equivalent to calling
      43              : // DB.LogData(nil, Sync) right before DB.Checkpoint.
      44            0 : func WithFlushedWAL() CheckpointOption {
      45            0 :         return func(opt *checkpointOptions) {
      46            0 :                 opt.flushWAL = true
      47            0 :         }
      48              : }
      49              : 
      50              : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
      51              : // that don't overlap with any of these spans are excluded from the checkpoint.
      52              : //
      53              : // Note that the checkpoint can still surface keys outside of these spans (from
      54              : // the WAL and from SSTs that partially overlap with these spans). Moreover,
      55              : // these surface keys aren't necessarily "valid" in that they could have been
      56              : // modified but the SST containing the modification is excluded.
      57            1 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
      58            1 :         return func(opt *checkpointOptions) {
      59            1 :                 opt.restrictToSpans = spans
      60            1 :         }
      61              : }
      62              : 
      63              : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
      64              : // End) of interest for a checkpoint.
      65              : type CheckpointSpan struct {
      66              :         Start []byte
      67              :         End   []byte
      68              : }
      69              : 
      70              : // excludeFromCheckpoint returns true if an SST file should be excluded from the
      71              : // checkpoint because it does not overlap with the spans of interest
      72              : // (opt.restrictToSpans).
      73            1 : func excludeFromCheckpoint(f *tableMetadata, opt *checkpointOptions, cmp Compare) bool {
      74            1 :         if len(opt.restrictToSpans) == 0 {
      75            1 :                 // Option not set; don't exclude anything.
      76            1 :                 return false
      77            1 :         }
      78            1 :         for _, s := range opt.restrictToSpans {
      79            1 :                 spanBounds := base.UserKeyBoundsEndExclusive(s.Start, s.End)
      80            1 :                 if f.Overlaps(cmp, &spanBounds) {
      81            1 :                         return false
      82            1 :                 }
      83              :         }
      84              :         // None of the restrictToSpans overlapped; we can exclude this file.
      85            1 :         return true
      86              : }
      87              : 
      88              : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
      89              : // Those missing parents, as well as the closest existing ancestor, are synced.
      90              : // Returns a handle to the directory created at destDir.
      91            1 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
      92            1 :         // Collect paths for all directories between destDir (excluded) and its
      93            1 :         // closest existing ancestor (included).
      94            1 :         var parentPaths []string
      95            1 :         for parentPath := fs.PathDir(destDir); ; parentPath = fs.PathDir(parentPath) {
      96            1 :                 parentPaths = append(parentPaths, parentPath)
      97            1 :                 if fs.PathDir(parentPath) == parentPath {
      98            1 :                         break
      99              :                 }
     100            1 :                 _, err := fs.Stat(parentPath)
     101            1 :                 if err == nil {
     102            1 :                         // Exit loop at the closest existing ancestor.
     103            1 :                         break
     104              :                 }
     105            1 :                 if !oserror.IsNotExist(err) {
     106            0 :                         return nil, err
     107            0 :                 }
     108              :         }
     109              :         // Create destDir and any of its missing parents.
     110            1 :         if err := fs.MkdirAll(destDir, 0755); err != nil {
     111            0 :                 return nil, err
     112            0 :         }
     113              :         // Sync all the parent directories up to the closest existing ancestor,
     114              :         // included.
     115            1 :         for _, parentPath := range parentPaths {
     116            1 :                 parentDir, err := fs.OpenDir(parentPath)
     117            1 :                 if err != nil {
     118            0 :                         return nil, err
     119            0 :                 }
     120            1 :                 err = parentDir.Sync()
     121            1 :                 if err != nil {
     122            0 :                         _ = parentDir.Close()
     123            0 :                         return nil, err
     124            0 :                 }
     125            1 :                 err = parentDir.Close()
     126            1 :                 if err != nil {
     127            0 :                         return nil, err
     128            0 :                 }
     129              :         }
     130            1 :         return fs.OpenDir(destDir)
     131              : }
     132              : 
     133              : // Checkpoint constructs a snapshot of the DB instance in the specified
     134              : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
     135              : // snapshot. Hard links will be used when possible. Beware of the significant
     136              : // space overhead for a checkpoint if hard links are disabled. Also beware that
     137              : // even if hard links are used, the space overhead for the checkpoint will
     138              : // increase over time as the DB performs compactions.
     139              : //
     140              : // Note that shared files in a checkpoint could get deleted if the DB is
     141              : // restarted after a checkpoint operation, as the reference for the checkpoint
     142              : // is only maintained in memory. This is okay as long as users of Checkpoint
     143              : // crash shortly afterwards with a "poison file" preventing further restarts.
     144              : func (d *DB) Checkpoint(
     145              :         destDir string, opts ...CheckpointOption,
     146              : ) (
     147              :         ckErr error, /* used in deferred cleanup */
     148            1 : ) {
     149            1 :         opt := &checkpointOptions{}
     150            1 :         for _, fn := range opts {
     151            1 :                 fn(opt)
     152            1 :         }
     153              : 
     154            1 :         if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
     155            0 :                 if err == nil {
     156            0 :                         return &os.PathError{
     157            0 :                                 Op:   "checkpoint",
     158            0 :                                 Path: destDir,
     159            0 :                                 Err:  oserror.ErrExist,
     160            0 :                         }
     161            0 :                 }
     162            0 :                 return err
     163              :         }
     164              : 
     165            1 :         if opt.flushWAL && !d.opts.DisableWAL {
     166            0 :                 // Write an empty log-data record to flush and sync the WAL.
     167            0 :                 if err := d.LogData(nil /* data */, Sync); err != nil {
     168            0 :                         return err
     169            0 :                 }
     170              :         }
     171              : 
     172              :         // Disable file deletions.
     173            1 :         d.mu.Lock()
     174            1 :         d.disableFileDeletions()
     175            1 :         defer func() {
     176            1 :                 d.mu.Lock()
     177            1 :                 defer d.mu.Unlock()
     178            1 :                 d.enableFileDeletions()
     179            1 :         }()
     180              : 
     181              :         // TODO(peter): RocksDB provides the option to roll the manifest if the
     182              :         // MANIFEST size is too large. Should we do this too?
     183              : 
     184              :         // Lock the manifest before getting the current version. We need the
     185              :         // length of the manifest that we read to match the current version that
     186              :         // we read, otherwise we might copy a versionEdit not reflected in the
     187              :         // sstables we copy/link.
     188            1 :         d.mu.versions.logLock()
     189            1 :         // Get the the current version and the current manifest file number.
     190            1 :         current := d.mu.versions.currentVersion()
     191            1 :         formatVers := d.FormatMajorVersion()
     192            1 :         manifestFileNum := d.mu.versions.manifestFileNum
     193            1 :         manifestSize := d.mu.versions.manifest.Size()
     194            1 :         optionsFileNum := d.optionsFileNum
     195            1 : 
     196            1 :         virtualBackingFiles := make(map[base.DiskFileNum]struct{})
     197            1 :         d.mu.versions.virtualBackings.ForEach(func(backing *fileBacking) {
     198            1 :                 virtualBackingFiles[backing.DiskFileNum] = struct{}{}
     199            1 :         })
     200            1 :         versionBlobFiles := d.mu.versions.blobFiles.Metadatas()
     201            1 : 
     202            1 :         // Acquire the logs while holding mutexes to ensure we don't race with a
     203            1 :         // flush that might mark a log that's relevant to `current` as obsolete
     204            1 :         // before our call to List.
     205            1 :         allLogicalLogs := d.mu.log.manager.List()
     206            1 : 
     207            1 :         // Release the manifest and DB.mu so we don't block other operations on
     208            1 :         // the database.
     209            1 :         d.mu.versions.logUnlock()
     210            1 :         d.mu.Unlock()
     211            1 : 
     212            1 :         // Wrap the normal filesystem with one which wraps newly created files with
     213            1 :         // vfs.NewSyncingFile.
     214            1 :         fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
     215            1 :                 NoSyncOnClose: d.opts.NoSyncOnClose,
     216            1 :                 BytesPerSync:  d.opts.BytesPerSync,
     217            1 :         })
     218            1 : 
     219            1 :         // Create the dir and its parents (if necessary), and sync them.
     220            1 :         var dir vfs.File
     221            1 :         defer func() {
     222            1 :                 if dir != nil {
     223            0 :                         _ = dir.Close()
     224            0 :                 }
     225            1 :                 if ckErr != nil {
     226            0 :                         // Attempt to cleanup on error.
     227            0 :                         _ = fs.RemoveAll(destDir)
     228            0 :                 }
     229              :         }()
     230            1 :         dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
     231            1 :         if ckErr != nil {
     232            0 :                 return ckErr
     233            0 :         }
     234              : 
     235            1 :         {
     236            1 :                 // Copy the OPTIONS.
     237            1 :                 srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeOptions, optionsFileNum)
     238            1 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     239            1 :                 ckErr = copyCheckpointOptions(fs, srcPath, destPath)
     240            1 :                 if ckErr != nil {
     241            0 :                         return ckErr
     242            0 :                 }
     243              :         }
     244              : 
     245            1 :         {
     246            1 :                 // Set the format major version in the destination directory.
     247            1 :                 var versionMarker *atomicfs.Marker
     248            1 :                 versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
     249            1 :                 if ckErr != nil {
     250            0 :                         return ckErr
     251            0 :                 }
     252              : 
     253              :                 // We use the marker to encode the active format version in the
     254              :                 // marker filename. Unlike other uses of the atomic marker,
     255              :                 // there is no file with the filename `formatVers.String()` on
     256              :                 // the filesystem.
     257            1 :                 ckErr = versionMarker.Move(formatVers.String())
     258            1 :                 if ckErr != nil {
     259            0 :                         return ckErr
     260            0 :                 }
     261            1 :                 ckErr = versionMarker.Close()
     262            1 :                 if ckErr != nil {
     263            0 :                         return ckErr
     264            0 :                 }
     265              :         }
     266              : 
     267            1 :         var excludedTables map[deletedFileEntry]*tableMetadata
     268            1 :         var includedBlobFiles map[base.DiskFileNum]struct{}
     269            1 :         var remoteFiles []base.DiskFileNum
     270            1 :         // Set of FileBacking.DiskFileNum which will be required by virtual sstables
     271            1 :         // in the checkpoint.
     272            1 :         requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
     273            1 : 
     274            1 :         copyFile := func(typ base.FileType, fileNum base.DiskFileNum) error {
     275            1 :                 meta, err := d.objProvider.Lookup(typ, fileNum)
     276            1 :                 if err != nil {
     277            0 :                         return err
     278            0 :                 }
     279            1 :                 if meta.IsRemote() {
     280            0 :                         // We don't copy remote files. This is desirable as checkpointing is
     281            0 :                         // supposed to be a fast operation, and references to remote files can
     282            0 :                         // always be resolved by any checkpoint readers by reading the object
     283            0 :                         // catalog. We don't add this file to excludedFiles either, as that'd
     284            0 :                         // cause it to be deleted in the second manifest entry which is also
     285            0 :                         // inaccurate.
     286            0 :                         remoteFiles = append(remoteFiles, meta.DiskFileNum)
     287            0 :                         return nil
     288            0 :                 }
     289            1 :                 srcPath := base.MakeFilepath(fs, d.dirname, typ, fileNum)
     290            1 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     291            1 :                 return vfs.LinkOrCopy(fs, srcPath, destPath)
     292              :         }
     293              : 
     294              :         // Link or copy the sstables.
     295            1 :         for l := range current.Levels {
     296            1 :                 iter := current.Levels[l].Iter()
     297            1 :                 for f := iter.First(); f != nil; f = iter.Next() {
     298            1 :                         if excludeFromCheckpoint(f, opt, d.cmp) {
     299            1 :                                 if excludedTables == nil {
     300            1 :                                         excludedTables = make(map[deletedFileEntry]*tableMetadata)
     301            1 :                                 }
     302            1 :                                 excludedTables[deletedFileEntry{
     303            1 :                                         Level:   l,
     304            1 :                                         FileNum: f.FileNum,
     305            1 :                                 }] = f
     306            1 :                                 continue
     307              :                         }
     308              : 
     309              :                         // Copy any referenced blob files that have not already been copied.
     310            1 :                         if len(f.BlobReferences) > 0 {
     311            0 :                                 if includedBlobFiles == nil {
     312            0 :                                         includedBlobFiles = make(map[base.DiskFileNum]struct{})
     313            0 :                                 }
     314            0 :                                 for _, ref := range f.BlobReferences {
     315            0 :                                         if _, ok := includedBlobFiles[ref.FileNum]; !ok {
     316            0 :                                                 includedBlobFiles[ref.FileNum] = struct{}{}
     317            0 :                                                 ckErr = copyFile(base.FileTypeBlob, ref.FileNum)
     318            0 :                                                 if ckErr != nil {
     319            0 :                                                         return ckErr
     320            0 :                                                 }
     321              :                                         }
     322              :                                 }
     323              :                         }
     324              : 
     325            1 :                         fileBacking := f.FileBacking
     326            1 :                         if f.Virtual {
     327            1 :                                 if _, ok := requiredVirtualBackingFiles[fileBacking.DiskFileNum]; ok {
     328            1 :                                         continue
     329              :                                 }
     330            1 :                                 requiredVirtualBackingFiles[fileBacking.DiskFileNum] = struct{}{}
     331              :                         }
     332            1 :                         ckErr = copyFile(base.FileTypeTable, fileBacking.DiskFileNum)
     333            1 :                         if ckErr != nil {
     334            0 :                                 return ckErr
     335            0 :                         }
     336              :                 }
     337              :         }
     338              : 
     339            1 :         var removeBackingTables []base.DiskFileNum
     340            1 :         for diskFileNum := range virtualBackingFiles {
     341            1 :                 if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
     342            1 :                         // The backing sstable associated with fileNum is no longer
     343            1 :                         // required.
     344            1 :                         removeBackingTables = append(removeBackingTables, diskFileNum)
     345            1 :                 }
     346              :         }
     347              :         // Record the blob files that are not referenced by any included sstables.
     348              :         // When we write the MANIFEST of the checkpoint, we'll include a final
     349              :         // VersionEdit that removes these blob files so that the checkpointed
     350              :         // manifest is consistent.
     351            1 :         var excludedBlobFiles map[base.DiskFileNum]*manifest.BlobFileMetadata
     352            1 :         if len(includedBlobFiles) < len(versionBlobFiles) {
     353            0 :                 excludedBlobFiles = make(map[base.DiskFileNum]*manifest.BlobFileMetadata, len(versionBlobFiles)-len(includedBlobFiles))
     354            0 :                 for _, blobFile := range versionBlobFiles {
     355            0 :                         if _, ok := includedBlobFiles[blobFile.FileNum]; !ok {
     356            0 :                                 excludedBlobFiles[blobFile.FileNum] = blobFile
     357            0 :                         }
     358              :                 }
     359              :         }
     360              : 
     361            1 :         ckErr = d.writeCheckpointManifest(
     362            1 :                 fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
     363            1 :                 excludedTables, removeBackingTables, excludedBlobFiles,
     364            1 :         )
     365            1 :         if ckErr != nil {
     366            0 :                 return ckErr
     367            0 :         }
     368            1 :         if len(remoteFiles) > 0 {
     369            0 :                 ckErr = d.objProvider.CheckpointState(fs, destDir, remoteFiles)
     370            0 :                 if ckErr != nil {
     371            0 :                         return ckErr
     372            0 :                 }
     373              :         }
     374              : 
     375              :         // Copy the WAL files. We copy rather than link because WAL file recycling
     376              :         // will cause the WAL files to be reused which would invalidate the
     377              :         // checkpoint. It's possible allLogicalLogs includes logs that are not
     378              :         // relevant (beneath the version's MinUnflushedLogNum). These extra files
     379              :         // are harmless. The earlier (wal.Manager).List call will not include
     380              :         // obsolete logs that are sitting in the recycler or have already been
     381              :         // passed off to the cleanup manager for deletion.
     382              :         //
     383              :         // TODO(jackson): It would be desirable to copy all recycling and obsolete
     384              :         // WALs to aid corruption postmortem debugging should we need them.
     385            1 :         for _, log := range allLogicalLogs {
     386            1 :                 for i := 0; i < log.NumSegments(); i++ {
     387            1 :                         srcFS, srcPath := log.SegmentLocation(i)
     388            1 :                         destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath))
     389            1 :                         ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath)
     390            1 :                         if ckErr != nil {
     391            0 :                                 return ckErr
     392            0 :                         }
     393              :                 }
     394              :         }
     395              : 
     396              :         // Sync and close the checkpoint directory.
     397            1 :         ckErr = dir.Sync()
     398            1 :         if ckErr != nil {
     399            0 :                 return ckErr
     400            0 :         }
     401            1 :         ckErr = dir.Close()
     402            1 :         dir = nil
     403            1 :         return ckErr
     404              : }
     405              : 
     406              : // copyCheckpointOptions copies an OPTIONS file, commenting out some options
     407              : // that existed on the original database but no longer apply to the checkpointed
     408              : // database. For example, the entire [WAL Failover] stanza is commented out
     409              : // because Checkpoint will copy all WAL segment files from both the primary and
     410              : // secondary WAL directories into the checkpoint.
     411            1 : func copyCheckpointOptions(fs vfs.FS, srcPath, dstPath string) error {
     412            1 :         var buf bytes.Buffer
     413            1 :         f, err := fs.Open(srcPath)
     414            1 :         if err != nil {
     415            0 :                 return err
     416            0 :         }
     417            1 :         defer f.Close()
     418            1 :         b, err := io.ReadAll(f)
     419            1 :         if err != nil {
     420            0 :                 return err
     421            0 :         }
     422              :         // Copy the OPTIONS file verbatim, but commenting out the [WAL Failover]
     423              :         // section.
     424            1 :         err = parseOptions(string(b), parseOptionsFuncs{
     425            1 :                 visitNewSection: func(startOff, endOff int, section string) error {
     426            1 :                         if section == "WAL Failover" {
     427            1 :                                 buf.WriteString("# ")
     428            1 :                         }
     429            1 :                         buf.Write(b[startOff:endOff])
     430            1 :                         return nil
     431              :                 },
     432            1 :                 visitKeyValue: func(startOff, endOff int, section, key, value string) error {
     433            1 :                         if section == "WAL Failover" {
     434            1 :                                 buf.WriteString("# ")
     435            1 :                         }
     436            1 :                         buf.Write(b[startOff:endOff])
     437            1 :                         return nil
     438              :                 },
     439            1 :                 visitCommentOrWhitespace: func(startOff, endOff int, line string) error {
     440            1 :                         buf.Write(b[startOff:endOff])
     441            1 :                         return nil
     442            1 :                 },
     443              :         })
     444            1 :         if err != nil {
     445            0 :                 return err
     446            0 :         }
     447            1 :         nf, err := fs.Create(dstPath, vfs.WriteCategoryUnspecified)
     448            1 :         if err != nil {
     449            0 :                 return err
     450            0 :         }
     451            1 :         _, err = io.Copy(nf, &buf)
     452            1 :         if err != nil {
     453            0 :                 return err
     454            0 :         }
     455            1 :         return errors.CombineErrors(nf.Sync(), nf.Close())
     456              : }
     457              : 
     458              : func (d *DB) writeCheckpointManifest(
     459              :         fs vfs.FS,
     460              :         formatVers FormatMajorVersion,
     461              :         destDirPath string,
     462              :         destDir vfs.File,
     463              :         manifestFileNum base.DiskFileNum,
     464              :         manifestSize int64,
     465              :         excludedTables map[deletedFileEntry]*tableMetadata,
     466              :         removeBackingTables []base.DiskFileNum,
     467              :         excludedBlobFiles map[base.DiskFileNum]*manifest.BlobFileMetadata,
     468            1 : ) error {
     469            1 :         // Copy the MANIFEST, and create a pointer to it. We copy rather
     470            1 :         // than link because additional version edits added to the
     471            1 :         // MANIFEST after we took our snapshot of the sstables will
     472            1 :         // reference sstables that aren't in our checkpoint. For a
     473            1 :         // similar reason, we need to limit how much of the MANIFEST we
     474            1 :         // copy.
     475            1 :         // If some files are excluded from the checkpoint, also append a block that
     476            1 :         // records those files as deleted.
     477            1 :         if err := func() error {
     478            1 :                 srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeManifest, manifestFileNum)
     479            1 :                 destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
     480            1 :                 src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
     481            1 :                 if err != nil {
     482            0 :                         return err
     483            0 :                 }
     484            1 :                 defer src.Close()
     485            1 : 
     486            1 :                 dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
     487            1 :                 if err != nil {
     488            0 :                         return err
     489            0 :                 }
     490            1 :                 defer dst.Close()
     491            1 : 
     492            1 :                 // Copy all existing records. We need to copy at the record level in case we
     493            1 :                 // need to append another record with the excluded files (we cannot simply
     494            1 :                 // append a record after a raw data copy; see
     495            1 :                 // https://github.com/cockroachdb/cockroach/issues/100935).
     496            1 :                 r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
     497            1 :                 w := record.NewWriter(dst)
     498            1 :                 for {
     499            1 :                         rr, err := r.Next()
     500            1 :                         if err != nil {
     501            1 :                                 if err == io.EOF {
     502            1 :                                         break
     503              :                                 }
     504            0 :                                 return err
     505              :                         }
     506              : 
     507            1 :                         rw, err := w.Next()
     508            1 :                         if err != nil {
     509            0 :                                 return err
     510            0 :                         }
     511            1 :                         if _, err := io.Copy(rw, rr); err != nil {
     512            0 :                                 return err
     513            0 :                         }
     514              :                 }
     515              : 
     516            1 :                 if len(excludedTables) > 0 || len(excludedBlobFiles) > 0 {
     517            1 :                         // Write out an additional VersionEdit that deletes the excluded SST files.
     518            1 :                         ve := versionEdit{
     519            1 :                                 DeletedTables:        excludedTables,
     520            1 :                                 RemovedBackingTables: removeBackingTables,
     521            1 :                                 DeletedBlobFiles:     excludedBlobFiles,
     522            1 :                         }
     523            1 : 
     524            1 :                         rw, err := w.Next()
     525            1 :                         if err != nil {
     526            0 :                                 return err
     527            0 :                         }
     528            1 :                         if err := ve.Encode(rw); err != nil {
     529            0 :                                 return err
     530            0 :                         }
     531              :                 }
     532            1 :                 if err := w.Close(); err != nil {
     533            0 :                         return err
     534            0 :                 }
     535            1 :                 return dst.Sync()
     536            0 :         }(); err != nil {
     537            0 :                 return err
     538            0 :         }
     539              : 
     540            1 :         var manifestMarker *atomicfs.Marker
     541            1 :         manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
     542            1 :         if err != nil {
     543            0 :                 return err
     544            0 :         }
     545            1 :         if err := manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, manifestFileNum)); err != nil {
     546            0 :                 return err
     547            0 :         }
     548            1 :         return manifestMarker.Close()
     549              : }
        

Generated by: LCOV version 2.0-1