LCOV - code coverage report
Current view: top level - pebble - checkpoint.go (source / functions) Coverage Total Hit
Test: 2025-07-07 08:18Z 6f57a213 - tests only.lcov Lines: 81.3 % 374 304
Test Date: 2025-07-07 08:19:55 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "io"
      10              :         "os"
      11              : 
      12              :         "github.com/cockroachdb/errors"
      13              :         "github.com/cockroachdb/errors/oserror"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/manifest"
      16              :         "github.com/cockroachdb/pebble/record"
      17              :         "github.com/cockroachdb/pebble/vfs"
      18              :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      19              : )
      20              : 
      21              : // checkpointOptions hold the optional parameters to construct checkpoint
      22              : // snapshots.
      23              : type checkpointOptions struct {
      24              :         // flushWAL set to true will force a flush and sync of the WAL prior to
      25              :         // checkpointing.
      26              :         flushWAL bool
      27              : 
      28              :         // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
      29              :         restrictToSpans []CheckpointSpan
      30              : }
      31              : 
      32              : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
      33              : type CheckpointOption func(*checkpointOptions)
      34              : 
      35              : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
      36              : // checkpoint. This guarantees that any writes committed before calling
      37              : // DB.Checkpoint will be part of that checkpoint.
      38              : //
      39              : // Note that this setting can only be useful in cases when some writes are
      40              : // performed with Sync = false. Otherwise, the guarantee will already be met.
      41              : //
      42              : // Passing this option is functionally equivalent to calling
      43              : // DB.LogData(nil, Sync) right before DB.Checkpoint.
      44            1 : func WithFlushedWAL() CheckpointOption {
      45            1 :         return func(opt *checkpointOptions) {
      46            1 :                 opt.flushWAL = true
      47            1 :         }
      48              : }
      49              : 
      50              : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
      51              : // that don't overlap with any of these spans are excluded from the checkpoint.
      52              : //
      53              : // Note that the checkpoint can still surface keys outside of these spans (from
      54              : // the WAL and from SSTs that partially overlap with these spans). Moreover,
      55              : // these surface keys aren't necessarily "valid" in that they could have been
      56              : // modified but the SST containing the modification is excluded.
      57            1 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
      58            1 :         return func(opt *checkpointOptions) {
      59            1 :                 opt.restrictToSpans = spans
      60            1 :         }
      61              : }
      62              : 
      63              : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
      64              : // End) of interest for a checkpoint.
      65              : type CheckpointSpan struct {
      66              :         Start []byte
      67              :         End   []byte
      68              : }
      69              : 
      70              : // excludeFromCheckpoint returns true if an SST file should be excluded from the
      71              : // checkpoint because it does not overlap with the spans of interest
      72              : // (opt.restrictToSpans).
      73            1 : func excludeFromCheckpoint(f *manifest.TableMetadata, opt *checkpointOptions, cmp Compare) bool {
      74            1 :         if len(opt.restrictToSpans) == 0 {
      75            1 :                 // Option not set; don't exclude anything.
      76            1 :                 return false
      77            1 :         }
      78            1 :         for _, s := range opt.restrictToSpans {
      79            1 :                 spanBounds := base.UserKeyBoundsEndExclusive(s.Start, s.End)
      80            1 :                 if f.Overlaps(cmp, &spanBounds) {
      81            1 :                         return false
      82            1 :                 }
      83              :         }
      84              :         // None of the restrictToSpans overlapped; we can exclude this file.
      85            1 :         return true
      86              : }
      87              : 
      88              : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
      89              : // Those missing parents, as well as the closest existing ancestor, are synced.
      90              : // Returns a handle to the directory created at destDir.
      91            1 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
      92            1 :         // Collect paths for all directories between destDir (excluded) and its
      93            1 :         // closest existing ancestor (included).
      94            1 :         var parentPaths []string
      95            1 :         for parentPath := fs.PathDir(destDir); ; parentPath = fs.PathDir(parentPath) {
      96            1 :                 parentPaths = append(parentPaths, parentPath)
      97            1 :                 if fs.PathDir(parentPath) == parentPath {
      98            1 :                         break
      99              :                 }
     100            1 :                 _, err := fs.Stat(parentPath)
     101            1 :                 if err == nil {
     102            1 :                         // Exit loop at the closest existing ancestor.
     103            1 :                         break
     104              :                 }
     105            1 :                 if !oserror.IsNotExist(err) {
     106            0 :                         return nil, err
     107            0 :                 }
     108              :         }
     109              :         // Create destDir and any of its missing parents.
     110            1 :         if err := fs.MkdirAll(destDir, 0755); err != nil {
     111            1 :                 return nil, err
     112            1 :         }
     113              :         // Sync all the parent directories up to the closest existing ancestor,
     114              :         // included.
     115            1 :         for _, parentPath := range parentPaths {
     116            1 :                 parentDir, err := fs.OpenDir(parentPath)
     117            1 :                 if err != nil {
     118            1 :                         return nil, err
     119            1 :                 }
     120            1 :                 err = parentDir.Sync()
     121            1 :                 if err != nil {
     122            1 :                         _ = parentDir.Close()
     123            1 :                         return nil, err
     124            1 :                 }
     125            1 :                 err = parentDir.Close()
     126            1 :                 if err != nil {
     127            0 :                         return nil, err
     128            0 :                 }
     129              :         }
     130            1 :         return fs.OpenDir(destDir)
     131              : }
     132              : 
     133              : // Checkpoint constructs a snapshot of the DB instance in the specified
     134              : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
     135              : // snapshot. Hard links will be used when possible. Beware of the significant
     136              : // space overhead for a checkpoint if hard links are disabled. Also beware that
     137              : // even if hard links are used, the space overhead for the checkpoint will
     138              : // increase over time as the DB performs compactions.
     139              : //
     140              : // Note that shared files in a checkpoint could get deleted if the DB is
     141              : // restarted after a checkpoint operation, as the reference for the checkpoint
     142              : // is only maintained in memory. This is okay as long as users of Checkpoint
     143              : // crash shortly afterwards with a "poison file" preventing further restarts.
     144              : func (d *DB) Checkpoint(
     145              :         destDir string, opts ...CheckpointOption,
     146              : ) (
     147              :         ckErr error, /* used in deferred cleanup */
     148            1 : ) {
     149            1 :         opt := &checkpointOptions{}
     150            1 :         for _, fn := range opts {
     151            1 :                 fn(opt)
     152            1 :         }
     153              : 
     154            1 :         if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
     155            1 :                 if err == nil {
     156            1 :                         return &os.PathError{
     157            1 :                                 Op:   "checkpoint",
     158            1 :                                 Path: destDir,
     159            1 :                                 Err:  oserror.ErrExist,
     160            1 :                         }
     161            1 :                 }
     162            0 :                 return err
     163              :         }
     164              : 
     165            1 :         if opt.flushWAL && !d.opts.DisableWAL {
     166            1 :                 // Write an empty log-data record to flush and sync the WAL.
     167            1 :                 if err := d.LogData(nil /* data */, Sync); err != nil {
     168            0 :                         return err
     169            0 :                 }
     170              :         }
     171              : 
     172              :         // Disable file deletions.
     173              :         // We acquire a reference on the version down below that will prevent any
     174              :         // sstables or blob files from becoming "obsolete" and potentially deleted,
     175              :         // but this doesn't protect the current WALs or manifests.
     176            1 :         d.mu.Lock()
     177            1 :         d.disableFileDeletions()
     178            1 :         defer func() {
     179            1 :                 d.mu.Lock()
     180            1 :                 defer d.mu.Unlock()
     181            1 :                 d.enableFileDeletions()
     182            1 :         }()
     183              : 
     184              :         // TODO(peter): RocksDB provides the option to roll the manifest if the
     185              :         // MANIFEST size is too large. Should we do this too?
     186              : 
     187              :         // Lock the manifest before getting the current version. We need the
     188              :         // length of the manifest that we read to match the current version that
     189              :         // we read, otherwise we might copy a versionEdit not reflected in the
     190              :         // sstables we copy/link.
     191            1 :         d.mu.versions.logLock()
     192            1 :         // Get the the current version and the current manifest file number.
     193            1 :         current := d.mu.versions.currentVersion()
     194            1 :         formatVers := d.FormatMajorVersion()
     195            1 :         manifestFileNum := d.mu.versions.manifestFileNum
     196            1 :         manifestSize := d.mu.versions.manifest.Size()
     197            1 :         optionsFileNum := d.optionsFileNum
     198            1 : 
     199            1 :         virtualBackingFiles := make(map[base.DiskFileNum]struct{})
     200            1 :         d.mu.versions.latest.virtualBackings.ForEach(func(backing *manifest.TableBacking) {
     201            1 :                 virtualBackingFiles[backing.DiskFileNum] = struct{}{}
     202            1 :         })
     203            1 :         versionBlobFiles := d.mu.versions.latest.blobFiles.Metadatas()
     204            1 : 
     205            1 :         // Acquire the logs while holding mutexes to ensure we don't race with a
     206            1 :         // flush that might mark a log that's relevant to `current` as obsolete
     207            1 :         // before our call to List.
     208            1 :         allLogicalLogs := d.mu.log.manager.List()
     209            1 : 
     210            1 :         // Release the manifest and DB.mu so we don't block other operations on the
     211            1 :         // database.
     212            1 :         //
     213            1 :         // But first reference the version to ensure that the version's in-memory
     214            1 :         // state and its physical files remain available for the checkpoint. In
     215            1 :         // particular, the Version.BlobFileSet is only valid while a version is
     216            1 :         // referenced.
     217            1 :         current.Ref()
     218            1 :         d.mu.versions.logUnlock()
     219            1 :         d.mu.Unlock()
     220            1 :         defer current.Unref()
     221            1 : 
     222            1 :         // Wrap the normal filesystem with one which wraps newly created files with
     223            1 :         // vfs.NewSyncingFile.
     224            1 :         fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
     225            1 :                 NoSyncOnClose: d.opts.NoSyncOnClose,
     226            1 :                 BytesPerSync:  d.opts.BytesPerSync,
     227            1 :         })
     228            1 : 
     229            1 :         // Create the dir and its parents (if necessary), and sync them.
     230            1 :         var dir vfs.File
     231            1 :         defer func() {
     232            1 :                 if dir != nil {
     233            0 :                         _ = dir.Close()
     234            0 :                 }
     235            1 :                 if ckErr != nil {
     236            0 :                         // Attempt to cleanup on error.
     237            0 :                         _ = fs.RemoveAll(destDir)
     238            0 :                 }
     239              :         }()
     240            1 :         dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
     241            1 :         if ckErr != nil {
     242            0 :                 return ckErr
     243            0 :         }
     244              : 
     245            1 :         {
     246            1 :                 // Copy the OPTIONS.
     247            1 :                 srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeOptions, optionsFileNum)
     248            1 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     249            1 :                 ckErr = copyCheckpointOptions(fs, srcPath, destPath)
     250            1 :                 if ckErr != nil {
     251            0 :                         return ckErr
     252            0 :                 }
     253              :         }
     254              : 
     255            1 :         {
     256            1 :                 // Set the format major version in the destination directory.
     257            1 :                 var versionMarker *atomicfs.Marker
     258            1 :                 versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
     259            1 :                 if ckErr != nil {
     260            0 :                         return ckErr
     261            0 :                 }
     262              : 
     263              :                 // We use the marker to encode the active format version in the
     264              :                 // marker filename. Unlike other uses of the atomic marker,
     265              :                 // there is no file with the filename `formatVers.String()` on
     266              :                 // the filesystem.
     267            1 :                 ckErr = versionMarker.Move(formatVers.String())
     268            1 :                 if ckErr != nil {
     269            0 :                         return ckErr
     270            0 :                 }
     271            1 :                 ckErr = versionMarker.Close()
     272            1 :                 if ckErr != nil {
     273            0 :                         return ckErr
     274            0 :                 }
     275              :         }
     276              : 
     277            1 :         var excludedTables map[manifest.DeletedTableEntry]*manifest.TableMetadata
     278            1 :         var includedBlobFiles map[base.BlobFileID]struct{}
     279            1 :         var remoteFiles []base.DiskFileNum
     280            1 :         // Set of TableBacking.DiskFileNum which will be required by virtual sstables
     281            1 :         // in the checkpoint.
     282            1 :         requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
     283            1 : 
     284            1 :         copyFile := func(typ base.FileType, fileNum base.DiskFileNum) error {
     285            1 :                 meta, err := d.objProvider.Lookup(typ, fileNum)
     286            1 :                 if err != nil {
     287            0 :                         return err
     288            0 :                 }
     289            1 :                 if meta.IsRemote() {
     290            1 :                         // We don't copy remote files. This is desirable as checkpointing is
     291            1 :                         // supposed to be a fast operation, and references to remote files can
     292            1 :                         // always be resolved by any checkpoint readers by reading the object
     293            1 :                         // catalog. We don't add this file to excludedFiles either, as that'd
     294            1 :                         // cause it to be deleted in the second manifest entry which is also
     295            1 :                         // inaccurate.
     296            1 :                         remoteFiles = append(remoteFiles, meta.DiskFileNum)
     297            1 :                         return nil
     298            1 :                 }
     299            1 :                 srcPath := base.MakeFilepath(fs, d.dirname, typ, fileNum)
     300            1 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     301            1 :                 return vfs.LinkOrCopy(fs, srcPath, destPath)
     302              :         }
     303              : 
     304              :         // Link or copy the sstables.
     305            1 :         for l := range current.Levels {
     306            1 :                 iter := current.Levels[l].Iter()
     307            1 :                 for f := iter.First(); f != nil; f = iter.Next() {
     308            1 :                         if excludeFromCheckpoint(f, opt, d.cmp) {
     309            1 :                                 if excludedTables == nil {
     310            1 :                                         excludedTables = make(map[manifest.DeletedTableEntry]*manifest.TableMetadata)
     311            1 :                                 }
     312            1 :                                 excludedTables[manifest.DeletedTableEntry{
     313            1 :                                         Level:   l,
     314            1 :                                         FileNum: f.TableNum,
     315            1 :                                 }] = f
     316            1 :                                 continue
     317              :                         }
     318              : 
     319              :                         // Copy any referenced blob files that have not already been copied.
     320            1 :                         if len(f.BlobReferences) > 0 {
     321            1 :                                 if includedBlobFiles == nil {
     322            1 :                                         includedBlobFiles = make(map[base.BlobFileID]struct{})
     323            1 :                                 }
     324            1 :                                 for _, ref := range f.BlobReferences {
     325            1 :                                         if _, ok := includedBlobFiles[ref.FileID]; !ok {
     326            1 :                                                 includedBlobFiles[ref.FileID] = struct{}{}
     327            1 : 
     328            1 :                                                 // Map the BlobFileID to a DiskFileNum in the current version.
     329            1 :                                                 diskFileNum, ok := current.BlobFiles.Lookup(ref.FileID)
     330            1 :                                                 if !ok {
     331            0 :                                                         return errors.Errorf("blob file %s not found", ref.FileID)
     332            0 :                                                 }
     333            1 :                                                 ckErr = copyFile(base.FileTypeBlob, diskFileNum)
     334            1 :                                                 if ckErr != nil {
     335            0 :                                                         return ckErr
     336            0 :                                                 }
     337              :                                         }
     338              :                                 }
     339              :                         }
     340              : 
     341            1 :                         tableBacking := f.TableBacking
     342            1 :                         if f.Virtual {
     343            1 :                                 if _, ok := requiredVirtualBackingFiles[tableBacking.DiskFileNum]; ok {
     344            1 :                                         continue
     345              :                                 }
     346            1 :                                 requiredVirtualBackingFiles[tableBacking.DiskFileNum] = struct{}{}
     347              :                         }
     348            1 :                         ckErr = copyFile(base.FileTypeTable, tableBacking.DiskFileNum)
     349            1 :                         if ckErr != nil {
     350            0 :                                 return ckErr
     351            0 :                         }
     352              :                 }
     353              :         }
     354              : 
     355            1 :         var removeBackingTables []base.DiskFileNum
     356            1 :         for diskFileNum := range virtualBackingFiles {
     357            1 :                 if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
     358            1 :                         // The backing sstable associated with fileNum is no longer
     359            1 :                         // required.
     360            1 :                         removeBackingTables = append(removeBackingTables, diskFileNum)
     361            1 :                 }
     362              :         }
     363              :         // Record the blob files that are not referenced by any included sstables.
     364              :         // When we write the MANIFEST of the checkpoint, we'll include a final
     365              :         // VersionEdit that removes these blob files so that the checkpointed
     366              :         // manifest is consistent.
     367            1 :         var excludedBlobFiles map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile
     368            1 :         if len(includedBlobFiles) < len(versionBlobFiles) {
     369            1 :                 excludedBlobFiles = make(map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile, len(versionBlobFiles)-len(includedBlobFiles))
     370            1 :                 for _, meta := range versionBlobFiles {
     371            1 :                         if _, ok := includedBlobFiles[meta.FileID]; !ok {
     372            1 :                                 excludedBlobFiles[manifest.DeletedBlobFileEntry{
     373            1 :                                         FileID:  meta.FileID,
     374            1 :                                         FileNum: meta.Physical.FileNum,
     375            1 :                                 }] = meta.Physical
     376            1 :                         }
     377              :                 }
     378              :         }
     379              : 
     380            1 :         ckErr = d.writeCheckpointManifest(
     381            1 :                 fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
     382            1 :                 excludedTables, removeBackingTables, excludedBlobFiles,
     383            1 :         )
     384            1 :         if ckErr != nil {
     385            0 :                 return ckErr
     386            0 :         }
     387            1 :         if len(remoteFiles) > 0 {
     388            1 :                 ckErr = d.objProvider.CheckpointState(fs, destDir, remoteFiles)
     389            1 :                 if ckErr != nil {
     390            0 :                         return ckErr
     391            0 :                 }
     392              :         }
     393              : 
     394              :         // Copy the WAL files. We copy rather than link because WAL file recycling
     395              :         // will cause the WAL files to be reused which would invalidate the
     396              :         // checkpoint. It's possible allLogicalLogs includes logs that are not
     397              :         // relevant (beneath the version's MinUnflushedLogNum). These extra files
     398              :         // are harmless. The earlier (wal.Manager).List call will not include
     399              :         // obsolete logs that are sitting in the recycler or have already been
     400              :         // passed off to the cleanup manager for deletion.
     401              :         //
     402              :         // TODO(jackson): It would be desirable to copy all recycling and obsolete
     403              :         // WALs to aid corruption postmortem debugging should we need them.
     404            1 :         for _, log := range allLogicalLogs {
     405            1 :                 for i := 0; i < log.NumSegments(); i++ {
     406            1 :                         srcFS, srcPath := log.SegmentLocation(i)
     407            1 :                         destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath))
     408            1 :                         ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath)
     409            1 :                         if ckErr != nil {
     410            0 :                                 return ckErr
     411            0 :                         }
     412              :                 }
     413              :         }
     414              : 
     415              :         // Sync and close the checkpoint directory.
     416            1 :         ckErr = dir.Sync()
     417            1 :         if ckErr != nil {
     418            0 :                 return ckErr
     419            0 :         }
     420            1 :         ckErr = dir.Close()
     421            1 :         dir = nil
     422            1 :         return ckErr
     423              : }
     424              : 
     425              : // copyCheckpointOptions copies an OPTIONS file, commenting out some options
     426              : // that existed on the original database but no longer apply to the checkpointed
     427              : // database. For example, the entire [WAL Failover] stanza is commented out
     428              : // because Checkpoint will copy all WAL segment files from both the primary and
     429              : // secondary WAL directories into the checkpoint.
     430            1 : func copyCheckpointOptions(fs vfs.FS, srcPath, dstPath string) error {
     431            1 :         var buf bytes.Buffer
     432            1 :         f, err := fs.Open(srcPath)
     433            1 :         if err != nil {
     434            0 :                 return err
     435            0 :         }
     436            1 :         defer f.Close()
     437            1 :         b, err := io.ReadAll(f)
     438            1 :         if err != nil {
     439            0 :                 return err
     440            0 :         }
     441              :         // Copy the OPTIONS file verbatim, but commenting out the [WAL Failover]
     442              :         // section.
     443            1 :         err = parseOptions(string(b), parseOptionsFuncs{
     444            1 :                 visitNewSection: func(startOff, endOff int, section string) error {
     445            1 :                         if section == "WAL Failover" {
     446            1 :                                 buf.WriteString("# ")
     447            1 :                         }
     448            1 :                         buf.Write(b[startOff:endOff])
     449            1 :                         return nil
     450              :                 },
     451            1 :                 visitKeyValue: func(startOff, endOff int, section, key, value string) error {
     452            1 :                         if section == "WAL Failover" {
     453            1 :                                 buf.WriteString("# ")
     454            1 :                         }
     455            1 :                         buf.Write(b[startOff:endOff])
     456            1 :                         return nil
     457              :                 },
     458            1 :                 visitCommentOrWhitespace: func(startOff, endOff int, line string) error {
     459            1 :                         buf.Write(b[startOff:endOff])
     460            1 :                         return nil
     461            1 :                 },
     462              :         })
     463            1 :         if err != nil {
     464            0 :                 return err
     465            0 :         }
     466            1 :         nf, err := fs.Create(dstPath, vfs.WriteCategoryUnspecified)
     467            1 :         if err != nil {
     468            0 :                 return err
     469            0 :         }
     470            1 :         _, err = io.Copy(nf, &buf)
     471            1 :         if err != nil {
     472            0 :                 return err
     473            0 :         }
     474            1 :         return errors.CombineErrors(nf.Sync(), nf.Close())
     475              : }
     476              : 
     477              : func (d *DB) writeCheckpointManifest(
     478              :         fs vfs.FS,
     479              :         formatVers FormatMajorVersion,
     480              :         destDirPath string,
     481              :         destDir vfs.File,
     482              :         manifestFileNum base.DiskFileNum,
     483              :         manifestSize int64,
     484              :         excludedTables map[manifest.DeletedTableEntry]*manifest.TableMetadata,
     485              :         removeBackingTables []base.DiskFileNum,
     486              :         excludedBlobFiles map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile,
     487            1 : ) error {
     488            1 :         // Copy the MANIFEST, and create a pointer to it. We copy rather
     489            1 :         // than link because additional version edits added to the
     490            1 :         // MANIFEST after we took our snapshot of the sstables will
     491            1 :         // reference sstables that aren't in our checkpoint. For a
     492            1 :         // similar reason, we need to limit how much of the MANIFEST we
     493            1 :         // copy.
     494            1 :         // If some files are excluded from the checkpoint, also append a block that
     495            1 :         // records those files as deleted.
     496            1 :         if err := func() error {
     497            1 :                 srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeManifest, manifestFileNum)
     498            1 :                 destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
     499            1 :                 src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
     500            1 :                 if err != nil {
     501            0 :                         return err
     502            0 :                 }
     503            1 :                 defer src.Close()
     504            1 : 
     505            1 :                 dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
     506            1 :                 if err != nil {
     507            0 :                         return err
     508            0 :                 }
     509            1 :                 defer dst.Close()
     510            1 : 
     511            1 :                 // Copy all existing records. We need to copy at the record level in case we
     512            1 :                 // need to append another record with the excluded files (we cannot simply
     513            1 :                 // append a record after a raw data copy; see
     514            1 :                 // https://github.com/cockroachdb/cockroach/issues/100935).
     515            1 :                 r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
     516            1 :                 w := record.NewWriter(dst)
     517            1 :                 for {
     518            1 :                         rr, err := r.Next()
     519            1 :                         if err != nil {
     520            1 :                                 if err == io.EOF {
     521            1 :                                         break
     522              :                                 }
     523            0 :                                 return err
     524              :                         }
     525              : 
     526            1 :                         rw, err := w.Next()
     527            1 :                         if err != nil {
     528            0 :                                 return err
     529            0 :                         }
     530            1 :                         if _, err := io.Copy(rw, rr); err != nil {
     531            0 :                                 return err
     532            0 :                         }
     533              :                 }
     534              : 
     535            1 :                 if len(excludedTables) > 0 || len(excludedBlobFiles) > 0 {
     536            1 :                         // Write out an additional VersionEdit that deletes the excluded SST files.
     537            1 :                         ve := manifest.VersionEdit{
     538            1 :                                 DeletedTables:        excludedTables,
     539            1 :                                 RemovedBackingTables: removeBackingTables,
     540            1 :                                 DeletedBlobFiles:     excludedBlobFiles,
     541            1 :                         }
     542            1 : 
     543            1 :                         rw, err := w.Next()
     544            1 :                         if err != nil {
     545            0 :                                 return err
     546            0 :                         }
     547            1 :                         if err := ve.Encode(rw); err != nil {
     548            0 :                                 return err
     549            0 :                         }
     550              :                 }
     551            1 :                 if err := w.Close(); err != nil {
     552            0 :                         return err
     553            0 :                 }
     554            1 :                 return dst.Sync()
     555            0 :         }(); err != nil {
     556            0 :                 return err
     557            0 :         }
     558              : 
     559            1 :         var manifestMarker *atomicfs.Marker
     560            1 :         manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
     561            1 :         if err != nil {
     562            0 :                 return err
     563            0 :         }
     564            1 :         if err := manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, manifestFileNum)); err != nil {
     565            0 :                 return err
     566            0 :         }
     567            1 :         return manifestMarker.Close()
     568              : }
        

Generated by: LCOV version 2.0-1