LCOV - code coverage report
Current view: top level - pebble - checkpoint.go (source / functions) Coverage Total Hit
Test: 2025-10-06 08:19Z 3214797c - tests + meta.lcov Lines: 81.4 % 377 307
Test Date: 2025-10-06 08:22:01 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "io"
      10              :         "os"
      11              : 
      12              :         "github.com/cockroachdb/errors"
      13              :         "github.com/cockroachdb/errors/oserror"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/manifest"
      16              :         "github.com/cockroachdb/pebble/record"
      17              :         "github.com/cockroachdb/pebble/vfs"
      18              :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      19              :         "github.com/cockroachdb/pebble/wal"
      20              : )
      21              : 
      22              : // checkpointOptions hold the optional parameters to construct checkpoint
      23              : // snapshots.
      24              : type checkpointOptions struct {
      25              :         // flushWAL set to true will force a flush and sync of the WAL prior to
      26              :         // checkpointing.
      27              :         flushWAL bool
      28              : 
      29              :         // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
      30              :         restrictToSpans []CheckpointSpan
      31              : }
      32              : 
      33              : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
      34              : type CheckpointOption func(*checkpointOptions)
      35              : 
      36              : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
      37              : // checkpoint. This guarantees that any writes committed before calling
      38              : // DB.Checkpoint will be part of that checkpoint.
      39              : //
      40              : // Note that this setting can only be useful in cases when some writes are
      41              : // performed with Sync = false. Otherwise, the guarantee will already be met.
      42              : //
      43              : // Passing this option is functionally equivalent to calling
      44              : // DB.LogData(nil, Sync) right before DB.Checkpoint.
      45            1 : func WithFlushedWAL() CheckpointOption {
      46            1 :         return func(opt *checkpointOptions) {
      47            1 :                 opt.flushWAL = true
      48            1 :         }
      49              : }
      50              : 
      51              : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
      52              : // that don't overlap with any of these spans are excluded from the checkpoint.
      53              : //
      54              : // Note that the checkpoint can still surface keys outside of these spans (from
      55              : // the WAL and from SSTs that partially overlap with these spans). Moreover,
      56              : // these surface keys aren't necessarily "valid" in that they could have been
      57              : // modified but the SST containing the modification is excluded.
      58            2 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
      59            2 :         return func(opt *checkpointOptions) {
      60            2 :                 opt.restrictToSpans = spans
      61            2 :         }
      62              : }
      63              : 
      64              : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
      65              : // End) of interest for a checkpoint.
      66              : type CheckpointSpan struct {
      67              :         Start []byte
      68              :         End   []byte
      69              : }
      70              : 
      71              : // excludeFromCheckpoint returns true if an SST file should be excluded from the
      72              : // checkpoint because it does not overlap with the spans of interest
      73              : // (opt.restrictToSpans).
      74            2 : func excludeFromCheckpoint(f *manifest.TableMetadata, opt *checkpointOptions, cmp Compare) bool {
      75            2 :         if len(opt.restrictToSpans) == 0 {
      76            2 :                 // Option not set; don't exclude anything.
      77            2 :                 return false
      78            2 :         }
      79            2 :         for _, s := range opt.restrictToSpans {
      80            2 :                 spanBounds := base.UserKeyBoundsEndExclusive(s.Start, s.End)
      81            2 :                 if f.Overlaps(cmp, &spanBounds) {
      82            2 :                         return false
      83            2 :                 }
      84              :         }
      85              :         // None of the restrictToSpans overlapped; we can exclude this file.
      86            2 :         return true
      87              : }
      88              : 
      89              : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
      90              : // Those missing parents, as well as the closest existing ancestor, are synced.
      91              : // Returns a handle to the directory created at destDir.
      92            2 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
      93            2 :         // Collect paths for all directories between destDir (excluded) and its
      94            2 :         // closest existing ancestor (included).
      95            2 :         var parentPaths []string
      96            2 :         for parentPath := fs.PathDir(destDir); ; parentPath = fs.PathDir(parentPath) {
      97            2 :                 parentPaths = append(parentPaths, parentPath)
      98            2 :                 if fs.PathDir(parentPath) == parentPath {
      99            1 :                         break
     100              :                 }
     101            2 :                 _, err := fs.Stat(parentPath)
     102            2 :                 if err == nil {
     103            2 :                         // Exit loop at the closest existing ancestor.
     104            2 :                         break
     105              :                 }
     106            2 :                 if !oserror.IsNotExist(err) {
     107            0 :                         return nil, err
     108            0 :                 }
     109              :         }
     110              :         // Create destDir and any of its missing parents.
     111            2 :         if err := fs.MkdirAll(destDir, 0755); err != nil {
     112            1 :                 return nil, err
     113            1 :         }
     114              :         // Sync all the parent directories up to the closest existing ancestor,
     115              :         // included.
     116            2 :         for _, parentPath := range parentPaths {
     117            2 :                 parentDir, err := fs.OpenDir(parentPath)
     118            2 :                 if err != nil {
     119            1 :                         return nil, err
     120            1 :                 }
     121            2 :                 err = parentDir.Sync()
     122            2 :                 if err != nil {
     123            1 :                         _ = parentDir.Close()
     124            1 :                         return nil, err
     125            1 :                 }
     126            2 :                 err = parentDir.Close()
     127            2 :                 if err != nil {
     128            0 :                         return nil, err
     129            0 :                 }
     130              :         }
     131            2 :         return fs.OpenDir(destDir)
     132              : }
     133              : 
     134              : // Checkpoint constructs a snapshot of the DB instance in the specified
     135              : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
     136              : // snapshot. Hard links will be used when possible. Beware of the significant
     137              : // space overhead for a checkpoint if hard links are disabled. Also beware that
     138              : // even if hard links are used, the space overhead for the checkpoint will
     139              : // increase over time as the DB performs compactions.
     140              : //
     141              : // Note that shared files in a checkpoint could get deleted if the DB is
     142              : // restarted after a checkpoint operation, as the reference for the checkpoint
     143              : // is only maintained in memory. This is okay as long as users of Checkpoint
     144              : // crash shortly afterwards with a "poison file" preventing further restarts.
     145              : func (d *DB) Checkpoint(
     146              :         destDir string, opts ...CheckpointOption,
     147              : ) (
     148              :         ckErr error, /* used in deferred cleanup */
     149            2 : ) {
     150            2 :         opt := &checkpointOptions{}
     151            2 :         for _, fn := range opts {
     152            2 :                 fn(opt)
     153            2 :         }
     154              : 
     155            2 :         if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
     156            1 :                 if err == nil {
     157            1 :                         return &os.PathError{
     158            1 :                                 Op:   "checkpoint",
     159            1 :                                 Path: destDir,
     160            1 :                                 Err:  oserror.ErrExist,
     161            1 :                         }
     162            1 :                 }
     163            0 :                 return err
     164              :         }
     165              : 
     166            2 :         if opt.flushWAL && !d.opts.DisableWAL {
     167            1 :                 // Write an empty log-data record to flush and sync the WAL.
     168            1 :                 if err := d.LogData(nil /* data */, Sync); err != nil {
     169            0 :                         return err
     170            0 :                 }
     171              :         }
     172              : 
     173              :         // Disable file deletions.
     174              :         // We acquire a reference on the version down below that will prevent any
     175              :         // sstables or blob files from becoming "obsolete" and potentially deleted,
     176              :         // but this doesn't protect the current WALs or manifests.
     177            2 :         d.mu.Lock()
     178            2 :         d.disableFileDeletions()
     179            2 :         defer func() {
     180            2 :                 d.mu.Lock()
     181            2 :                 defer d.mu.Unlock()
     182            2 :                 d.enableFileDeletions()
     183            2 :         }()
     184              : 
     185              :         // TODO(peter): RocksDB provides the option to roll the manifest if the
     186              :         // MANIFEST size is too large. Should we do this too?
     187              : 
     188              :         // Lock the manifest before getting the current version. We need the
     189              :         // length of the manifest that we read to match the current version that
     190              :         // we read, otherwise we might copy a versionEdit not reflected in the
     191              :         // sstables we copy/link.
     192            2 :         d.mu.versions.logLock()
     193            2 :         // Get the the current version and the current manifest file number.
     194            2 :         current := d.mu.versions.currentVersion()
     195            2 :         formatVers := d.FormatMajorVersion()
     196            2 :         manifestFileNum := d.mu.versions.manifestFileNum
     197            2 :         manifestSize := d.mu.versions.manifest.Size()
     198            2 :         optionsFileNum := d.optionsFileNum
     199            2 : 
     200            2 :         virtualBackingFiles := make(map[base.DiskFileNum]struct{})
     201            2 :         for backing := range d.mu.versions.latest.virtualBackings.All() {
     202            2 :                 virtualBackingFiles[backing.DiskFileNum] = struct{}{}
     203            2 :         }
     204            2 :         versionBlobFiles := d.mu.versions.latest.blobFiles.Metadatas()
     205            2 : 
     206            2 :         // Acquire the logs while holding mutexes to ensure we don't race with a
     207            2 :         // flush that might mark a log that's relevant to `current` as obsolete
     208            2 :         // before our call to List.
     209            2 :         allLogicalLogs := d.mu.log.manager.List()
     210            2 : 
     211            2 :         // Grab the visible sequence number. The checkpoint's view of the state of
     212            2 :         // the database will be equivalent to an iterator that acquired this same
     213            2 :         // visible sequence number.
     214            2 :         visibleSeqNum := d.mu.versions.visibleSeqNum.Load()
     215            2 : 
     216            2 :         // Release the manifest and DB.mu so we don't block other operations on the
     217            2 :         // database.
     218            2 :         //
     219            2 :         // But first reference the version to ensure that the version's in-memory
     220            2 :         // state and its physical files remain available for the checkpoint. In
     221            2 :         // particular, the Version.BlobFileSet is only valid while a version is
     222            2 :         // referenced.
     223            2 :         current.Ref()
     224            2 :         d.mu.versions.logUnlock()
     225            2 :         d.mu.Unlock()
     226            2 :         defer current.Unref()
     227            2 : 
     228            2 :         // Wrap the normal filesystem with one which wraps newly created files with
     229            2 :         // vfs.NewSyncingFile.
     230            2 :         fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
     231            2 :                 NoSyncOnClose: d.opts.NoSyncOnClose,
     232            2 :                 BytesPerSync:  d.opts.BytesPerSync,
     233            2 :         })
     234            2 : 
     235            2 :         // Create the dir and its parents (if necessary), and sync them.
     236            2 :         var dir vfs.File
     237            2 :         defer func() {
     238            2 :                 if dir != nil {
     239            0 :                         _ = dir.Close()
     240            0 :                 }
     241            2 :                 if ckErr != nil {
     242            0 :                         // Attempt to cleanup on error.
     243            0 :                         _ = fs.RemoveAll(destDir)
     244            0 :                 }
     245              :         }()
     246            2 :         dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
     247            2 :         if ckErr != nil {
     248            0 :                 return ckErr
     249            0 :         }
     250              : 
     251            2 :         {
     252            2 :                 // Copy the OPTIONS.
     253            2 :                 srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeOptions, optionsFileNum)
     254            2 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     255            2 :                 ckErr = copyCheckpointOptions(fs, srcPath, destPath)
     256            2 :                 if ckErr != nil {
     257            0 :                         return ckErr
     258            0 :                 }
     259              :         }
     260              : 
     261            2 :         {
     262            2 :                 // Set the format major version in the destination directory.
     263            2 :                 var versionMarker *atomicfs.Marker
     264            2 :                 versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
     265            2 :                 if ckErr != nil {
     266            0 :                         return ckErr
     267            0 :                 }
     268              : 
     269              :                 // We use the marker to encode the active format version in the
     270              :                 // marker filename. Unlike other uses of the atomic marker,
     271              :                 // there is no file with the filename `formatVers.String()` on
     272              :                 // the filesystem.
     273            2 :                 ckErr = versionMarker.Move(formatVers.String())
     274            2 :                 if ckErr != nil {
     275            0 :                         return ckErr
     276            0 :                 }
     277            2 :                 ckErr = versionMarker.Close()
     278            2 :                 if ckErr != nil {
     279            0 :                         return ckErr
     280            0 :                 }
     281              :         }
     282              : 
     283            2 :         var excludedTables map[manifest.DeletedTableEntry]*manifest.TableMetadata
     284            2 :         var includedBlobFiles map[base.BlobFileID]struct{}
     285            2 :         var remoteFiles []base.DiskFileNum
     286            2 :         // Set of TableBacking.DiskFileNum which will be required by virtual sstables
     287            2 :         // in the checkpoint.
     288            2 :         requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
     289            2 : 
     290            2 :         copyFile := func(typ base.FileType, fileNum base.DiskFileNum) error {
     291            2 :                 meta, err := d.objProvider.Lookup(typ, fileNum)
     292            2 :                 if err != nil {
     293            0 :                         return err
     294            0 :                 }
     295            2 :                 if meta.IsRemote() {
     296            1 :                         // We don't copy remote files. This is desirable as checkpointing is
     297            1 :                         // supposed to be a fast operation, and references to remote files can
     298            1 :                         // always be resolved by any checkpoint readers by reading the object
     299            1 :                         // catalog. We don't add this file to excludedFiles either, as that'd
     300            1 :                         // cause it to be deleted in the second manifest entry which is also
     301            1 :                         // inaccurate.
     302            1 :                         remoteFiles = append(remoteFiles, meta.DiskFileNum)
     303            1 :                         return nil
     304            1 :                 }
     305            2 :                 srcPath := base.MakeFilepath(fs, d.dirname, typ, fileNum)
     306            2 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     307            2 :                 return vfs.LinkOrCopy(fs, srcPath, destPath)
     308              :         }
     309              : 
     310              :         // Link or copy the sstables.
     311            2 :         for l := range current.Levels {
     312            2 :                 iter := current.Levels[l].Iter()
     313            2 :                 for f := iter.First(); f != nil; f = iter.Next() {
     314            2 :                         if excludeFromCheckpoint(f, opt, d.cmp) {
     315            2 :                                 if excludedTables == nil {
     316            2 :                                         excludedTables = make(map[manifest.DeletedTableEntry]*manifest.TableMetadata)
     317            2 :                                 }
     318            2 :                                 excludedTables[manifest.DeletedTableEntry{
     319            2 :                                         Level:   l,
     320            2 :                                         FileNum: f.TableNum,
     321            2 :                                 }] = f
     322            2 :                                 continue
     323              :                         }
     324              : 
     325              :                         // Copy any referenced blob files that have not already been copied.
     326            2 :                         if len(f.BlobReferences) > 0 {
     327            2 :                                 if includedBlobFiles == nil {
     328            2 :                                         includedBlobFiles = make(map[base.BlobFileID]struct{})
     329            2 :                                 }
     330            2 :                                 for _, ref := range f.BlobReferences {
     331            2 :                                         if _, ok := includedBlobFiles[ref.FileID]; !ok {
     332            2 :                                                 includedBlobFiles[ref.FileID] = struct{}{}
     333            2 : 
     334            2 :                                                 // Map the BlobFileID to a DiskFileNum in the current version.
     335            2 :                                                 obj, ok := current.BlobFiles.Lookup(ref.FileID)
     336            2 :                                                 if !ok {
     337            0 :                                                         return errors.Errorf("blob file %s not found", ref.FileID)
     338            0 :                                                 }
     339            2 :                                                 ckErr = copyFile(obj.FileInfo())
     340            2 :                                                 if ckErr != nil {
     341            0 :                                                         return ckErr
     342            0 :                                                 }
     343              :                                         }
     344              :                                 }
     345              :                         }
     346              : 
     347            2 :                         tableBacking := f.TableBacking
     348            2 :                         if f.Virtual {
     349            2 :                                 if _, ok := requiredVirtualBackingFiles[tableBacking.DiskFileNum]; ok {
     350            2 :                                         continue
     351              :                                 }
     352            2 :                                 requiredVirtualBackingFiles[tableBacking.DiskFileNum] = struct{}{}
     353              :                         }
     354            2 :                         ckErr = copyFile(base.FileTypeTable, tableBacking.DiskFileNum)
     355            2 :                         if ckErr != nil {
     356            0 :                                 return ckErr
     357            0 :                         }
     358              :                 }
     359              :         }
     360              : 
     361            2 :         var removeBackingTables []base.DiskFileNum
     362            2 :         for diskFileNum := range virtualBackingFiles {
     363            2 :                 if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
     364            2 :                         // The backing sstable associated with fileNum is no longer
     365            2 :                         // required.
     366            2 :                         removeBackingTables = append(removeBackingTables, diskFileNum)
     367            2 :                 }
     368              :         }
     369              :         // Record the blob files that are not referenced by any included sstables.
     370              :         // When we write the MANIFEST of the checkpoint, we'll include a final
     371              :         // VersionEdit that removes these blob files so that the checkpointed
     372              :         // manifest is consistent.
     373            2 :         var excludedBlobFiles map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile
     374            2 :         if len(includedBlobFiles) < len(versionBlobFiles) {
     375            2 :                 excludedBlobFiles = make(map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile, len(versionBlobFiles)-len(includedBlobFiles))
     376            2 :                 for _, meta := range versionBlobFiles {
     377            2 :                         if _, ok := includedBlobFiles[meta.FileID]; !ok {
     378            2 :                                 excludedBlobFiles[manifest.DeletedBlobFileEntry{
     379            2 :                                         FileID:  meta.FileID,
     380            2 :                                         FileNum: meta.Physical.FileNum,
     381            2 :                                 }] = meta.Physical
     382            2 :                         }
     383              :                 }
     384              :         }
     385              : 
     386            2 :         ckErr = d.writeCheckpointManifest(
     387            2 :                 fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
     388            2 :                 excludedTables, removeBackingTables, excludedBlobFiles,
     389            2 :         )
     390            2 :         if ckErr != nil {
     391            0 :                 return ckErr
     392            0 :         }
     393            2 :         if len(remoteFiles) > 0 {
     394            1 :                 ckErr = d.objProvider.CheckpointState(fs, destDir, remoteFiles)
     395            1 :                 if ckErr != nil {
     396            0 :                         return ckErr
     397            0 :                 }
     398              :         }
     399              : 
     400              :         // Copy the WAL files. We copy rather than link for a few reasons:
     401              :         // - WAL file recycling will cause the WAL files to be reused which
     402              :         //   would invalidate the checkpoint.
     403              :         // - While we're performing our checkpoint, the latest WAL may still be
     404              :         //   receiving writes. We must exclude these writes, otherwise we'll
     405              :         //   violate the guarantee that the checkpoint is a consistent snapshot
     406              :         //   (we could copy a write that was committed after an ingest that was
     407              :         //    committed after we acquired our version).
     408              :         //
     409              :         // It's possible allLogicalLogs includes logs that are not relevant (beneath
     410              :         // the version's MinUnflushedLogNum). These extra files are harmless. The
     411              :         // earlier (wal.Manager).List call will not include obsolete logs that are
     412              :         // sitting in the recycler or have already been passed off to the cleanup
     413              :         // manager for deletion.
     414              :         //
     415              :         // TODO(jackson): It would be desirable to copy all recycling and obsolete
     416              :         // WALs to aid corruption postmortem debugging should we need them.
     417            2 :         for _, log := range allLogicalLogs {
     418            2 :                 ckErr = wal.Copy(fs, destDir, log, visibleSeqNum, record.LogWriterConfig{
     419            2 :                         WriteWALSyncOffsets: func() bool { return formatVers > FormatWALSyncChunks },
     420              :                 })
     421            2 :                 if ckErr != nil {
     422            0 :                         return ckErr
     423            0 :                 }
     424              :         }
     425              : 
     426              :         // Sync and close the checkpoint directory.
     427            2 :         ckErr = dir.Sync()
     428            2 :         if ckErr != nil {
     429            0 :                 return ckErr
     430            0 :         }
     431            2 :         ckErr = dir.Close()
     432            2 :         dir = nil
     433            2 :         return ckErr
     434              : }
     435              : 
     436              : // copyCheckpointOptions copies an OPTIONS file, commenting out some options
     437              : // that existed on the original database but no longer apply to the checkpointed
     438              : // database. For example, the entire [WAL Failover] stanza is commented out
     439              : // because Checkpoint will copy all WAL segment files from both the primary and
     440              : // secondary WAL directories into the checkpoint.
     441            2 : func copyCheckpointOptions(fs vfs.FS, srcPath, dstPath string) error {
     442            2 :         var buf bytes.Buffer
     443            2 :         f, err := fs.Open(srcPath)
     444            2 :         if err != nil {
     445            0 :                 return err
     446            0 :         }
     447            2 :         defer f.Close()
     448            2 :         b, err := io.ReadAll(f)
     449            2 :         if err != nil {
     450            0 :                 return err
     451            0 :         }
     452              :         // Copy the OPTIONS file verbatim, but commenting out the [WAL Failover]
     453              :         // section.
     454            2 :         err = parseOptions(string(b), parseOptionsFuncs{
     455            2 :                 visitNewSection: func(startOff, endOff int, section string) error {
     456            2 :                         if section == "WAL Failover" {
     457            2 :                                 buf.WriteString("# ")
     458            2 :                         }
     459            2 :                         buf.Write(b[startOff:endOff])
     460            2 :                         return nil
     461              :                 },
     462            2 :                 visitKeyValue: func(startOff, endOff int, section, key, value string) error {
     463            2 :                         if section == "WAL Failover" {
     464            2 :                                 buf.WriteString("# ")
     465            2 :                         }
     466            2 :                         buf.Write(b[startOff:endOff])
     467            2 :                         return nil
     468              :                 },
     469            2 :                 visitCommentOrWhitespace: func(startOff, endOff int, line string) error {
     470            2 :                         buf.Write(b[startOff:endOff])
     471            2 :                         return nil
     472            2 :                 },
     473              :         })
     474            2 :         if err != nil {
     475            0 :                 return err
     476            0 :         }
     477            2 :         nf, err := fs.Create(dstPath, vfs.WriteCategoryUnspecified)
     478            2 :         if err != nil {
     479            0 :                 return err
     480            0 :         }
     481            2 :         _, err = io.Copy(nf, &buf)
     482            2 :         if err != nil {
     483            0 :                 return err
     484            0 :         }
     485            2 :         return errors.CombineErrors(nf.Sync(), nf.Close())
     486              : }
     487              : 
     488              : func (d *DB) writeCheckpointManifest(
     489              :         fs vfs.FS,
     490              :         formatVers FormatMajorVersion,
     491              :         destDirPath string,
     492              :         destDir vfs.File,
     493              :         manifestFileNum base.DiskFileNum,
     494              :         manifestSize int64,
     495              :         excludedTables map[manifest.DeletedTableEntry]*manifest.TableMetadata,
     496              :         removeBackingTables []base.DiskFileNum,
     497              :         excludedBlobFiles map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile,
     498            2 : ) error {
     499            2 :         // Copy the MANIFEST, and create a pointer to it. We copy rather
     500            2 :         // than link because additional version edits added to the
     501            2 :         // MANIFEST after we took our snapshot of the sstables will
     502            2 :         // reference sstables that aren't in our checkpoint. For a
     503            2 :         // similar reason, we need to limit how much of the MANIFEST we
     504            2 :         // copy.
     505            2 :         // If some files are excluded from the checkpoint, also append a block that
     506            2 :         // records those files as deleted.
     507            2 :         if err := func() error {
     508            2 :                 srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeManifest, manifestFileNum)
     509            2 :                 destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
     510            2 :                 src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
     511            2 :                 if err != nil {
     512            0 :                         return err
     513            0 :                 }
     514            2 :                 defer src.Close()
     515            2 : 
     516            2 :                 dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
     517            2 :                 if err != nil {
     518            0 :                         return err
     519            0 :                 }
     520            2 :                 defer dst.Close()
     521            2 : 
     522            2 :                 // Copy all existing records. We need to copy at the record level in case we
     523            2 :                 // need to append another record with the excluded files (we cannot simply
     524            2 :                 // append a record after a raw data copy; see
     525            2 :                 // https://github.com/cockroachdb/cockroach/issues/100935).
     526            2 :                 r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
     527            2 :                 w := record.NewWriter(dst)
     528            2 :                 for {
     529            2 :                         rr, err := r.Next()
     530            2 :                         if err != nil {
     531            2 :                                 if err == io.EOF {
     532            2 :                                         break
     533              :                                 }
     534            0 :                                 return err
     535              :                         }
     536              : 
     537            2 :                         rw, err := w.Next()
     538            2 :                         if err != nil {
     539            0 :                                 return err
     540            0 :                         }
     541            2 :                         if _, err := io.Copy(rw, rr); err != nil {
     542            0 :                                 return err
     543            0 :                         }
     544              :                 }
     545              : 
     546            2 :                 if len(excludedTables) > 0 || len(excludedBlobFiles) > 0 {
     547            2 :                         // Write out an additional VersionEdit that deletes the excluded SST files.
     548            2 :                         ve := manifest.VersionEdit{
     549            2 :                                 DeletedTables:        excludedTables,
     550            2 :                                 RemovedBackingTables: removeBackingTables,
     551            2 :                                 DeletedBlobFiles:     excludedBlobFiles,
     552            2 :                         }
     553            2 : 
     554            2 :                         rw, err := w.Next()
     555            2 :                         if err != nil {
     556            0 :                                 return err
     557            0 :                         }
     558            2 :                         if err := ve.Encode(rw); err != nil {
     559            0 :                                 return err
     560            0 :                         }
     561              :                 }
     562            2 :                 if err := w.Close(); err != nil {
     563            0 :                         return err
     564            0 :                 }
     565            2 :                 return dst.Sync()
     566            0 :         }(); err != nil {
     567            0 :                 return err
     568            0 :         }
     569              : 
     570            2 :         var manifestMarker *atomicfs.Marker
     571            2 :         manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
     572            2 :         if err != nil {
     573            0 :                 return err
     574            0 :         }
     575            2 :         if err := manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, manifestFileNum)); err != nil {
     576            0 :                 return err
     577            0 :         }
     578            2 :         return manifestMarker.Close()
     579              : }
        

Generated by: LCOV version 2.0-1