LCOV - code coverage report
Current view: top level - pebble - checkpoint.go (source / functions) Coverage Total Hit
Test: 2025-11-24 08:19Z d7ce913e - tests only.lcov Lines: 81.4 % 377 307
Test Date: 2025-11-24 08:20:23 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "io"
      10              :         "os"
      11              : 
      12              :         "github.com/cockroachdb/errors"
      13              :         "github.com/cockroachdb/errors/oserror"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/manifest"
      16              :         "github.com/cockroachdb/pebble/record"
      17              :         "github.com/cockroachdb/pebble/vfs"
      18              :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      19              :         "github.com/cockroachdb/pebble/wal"
      20              : )
      21              : 
      22              : // checkpointOptions hold the optional parameters to construct checkpoint
      23              : // snapshots.
      24              : type checkpointOptions struct {
      25              :         // flushWAL set to true will force a flush and sync of the WAL prior to
      26              :         // checkpointing.
      27              :         flushWAL bool
      28              : 
      29              :         // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
      30              :         restrictToSpans []CheckpointSpan
      31              : }
      32              : 
      33              : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
      34              : type CheckpointOption func(*checkpointOptions)
      35              : 
      36              : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
      37              : // checkpoint. This guarantees that any writes committed before calling
      38              : // DB.Checkpoint will be part of that checkpoint.
      39              : //
      40              : // Note that this setting can only be useful in cases when some writes are
      41              : // performed with Sync = false. Otherwise, the guarantee will already be met.
      42              : //
      43              : // Passing this option is functionally equivalent to calling
      44              : // DB.LogData(nil, Sync) right before DB.Checkpoint.
      45            1 : func WithFlushedWAL() CheckpointOption {
      46            1 :         return func(opt *checkpointOptions) {
      47            1 :                 opt.flushWAL = true
      48            1 :         }
      49              : }
      50              : 
      51              : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
      52              : // that don't overlap with any of these spans are excluded from the checkpoint.
      53              : //
      54              : // Note that the checkpoint can still surface keys outside of these spans (from
      55              : // the WAL and from SSTs that partially overlap with these spans). Moreover,
      56              : // these surface keys aren't necessarily "valid" in that they could have been
      57              : // modified but the SST containing the modification is excluded.
      58            1 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
      59            1 :         return func(opt *checkpointOptions) {
      60            1 :                 opt.restrictToSpans = spans
      61            1 :         }
      62              : }
      63              : 
      64              : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
      65              : // End) of interest for a checkpoint.
      66              : type CheckpointSpan struct {
      67              :         Start []byte
      68              :         End   []byte
      69              : }
      70              : 
      71              : // excludeFromCheckpoint returns true if an SST file should be excluded from the
      72              : // checkpoint because it does not overlap with the spans of interest
      73              : // (opt.restrictToSpans).
      74            1 : func excludeFromCheckpoint(f *manifest.TableMetadata, opt *checkpointOptions, cmp Compare) bool {
      75            1 :         if len(opt.restrictToSpans) == 0 {
      76            1 :                 // Option not set; don't exclude anything.
      77            1 :                 return false
      78            1 :         }
      79            1 :         for _, s := range opt.restrictToSpans {
      80            1 :                 spanBounds := base.UserKeyBoundsEndExclusive(s.Start, s.End)
      81            1 :                 if f.Overlaps(cmp, &spanBounds) {
      82            1 :                         return false
      83            1 :                 }
      84              :         }
      85              :         // None of the restrictToSpans overlapped; we can exclude this file.
      86            1 :         return true
      87              : }
      88              : 
      89              : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
      90              : // Those missing parents, as well as the closest existing ancestor, are synced.
      91              : // Returns a handle to the directory created at destDir.
      92            1 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
      93            1 :         // Collect paths for all directories between destDir (excluded) and its
      94            1 :         // closest existing ancestor (included).
      95            1 :         var parentPaths []string
      96            1 :         for parentPath := fs.PathDir(destDir); ; parentPath = fs.PathDir(parentPath) {
      97            1 :                 parentPaths = append(parentPaths, parentPath)
      98            1 :                 if fs.PathDir(parentPath) == parentPath {
      99            1 :                         break
     100              :                 }
     101            1 :                 _, err := fs.Stat(parentPath)
     102            1 :                 if err == nil {
     103            1 :                         // Exit loop at the closest existing ancestor.
     104            1 :                         break
     105              :                 }
     106            1 :                 if !oserror.IsNotExist(err) {
     107            0 :                         return nil, err
     108            0 :                 }
     109              :         }
     110              :         // Create destDir and any of its missing parents.
     111            1 :         if err := fs.MkdirAll(destDir, 0755); err != nil {
     112            1 :                 return nil, err
     113            1 :         }
     114              :         // Sync all the parent directories up to the closest existing ancestor,
     115              :         // included.
     116            1 :         for _, parentPath := range parentPaths {
     117            1 :                 parentDir, err := fs.OpenDir(parentPath)
     118            1 :                 if err != nil {
     119            1 :                         return nil, err
     120            1 :                 }
     121            1 :                 err = parentDir.Sync()
     122            1 :                 if err != nil {
     123            1 :                         _ = parentDir.Close()
     124            1 :                         return nil, err
     125            1 :                 }
     126            1 :                 err = parentDir.Close()
     127            1 :                 if err != nil {
     128            0 :                         return nil, err
     129            0 :                 }
     130              :         }
     131            1 :         return fs.OpenDir(destDir)
     132              : }
     133              : 
     134              : // Checkpoint constructs a snapshot of the DB instance in the specified
     135              : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
     136              : // snapshot. Hard links will be used when possible. Beware of the significant
     137              : // space overhead for a checkpoint if hard links are disabled. Also beware that
     138              : // even if hard links are used, the space overhead for the checkpoint will
     139              : // increase over time as the DB performs compactions.
     140              : //
     141              : // Note that shared files in a checkpoint could get deleted if the DB is
     142              : // restarted after a checkpoint operation, as the reference for the checkpoint
     143              : // is only maintained in memory. This is okay as long as users of Checkpoint
     144              : // crash shortly afterwards with a "poison file" preventing further restarts.
     145              : func (d *DB) Checkpoint(
     146              :         destDir string, opts ...CheckpointOption,
     147              : ) (
     148              :         ckErr error, /* used in deferred cleanup */
     149            1 : ) {
     150            1 :         opt := &checkpointOptions{}
     151            1 :         for _, fn := range opts {
     152            1 :                 fn(opt)
     153            1 :         }
     154              : 
     155            1 :         if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
     156            1 :                 if err == nil {
     157            1 :                         return &os.PathError{
     158            1 :                                 Op:   "checkpoint",
     159            1 :                                 Path: destDir,
     160            1 :                                 Err:  oserror.ErrExist,
     161            1 :                         }
     162            1 :                 }
     163            0 :                 return err
     164              :         }
     165              : 
     166            1 :         if opt.flushWAL && !d.opts.DisableWAL {
     167            1 :                 // Write an empty log-data record to flush and sync the WAL.
     168            1 :                 if err := d.LogData(nil /* data */, Sync); err != nil {
     169            0 :                         return err
     170            0 :                 }
     171              :         }
     172              : 
     173              :         // Disable file deletions.
     174              :         // We acquire a reference on the version down below that will prevent any
     175              :         // sstables or blob files from becoming "obsolete" and potentially deleted,
     176              :         // but this doesn't protect the current WALs or manifests.
     177            1 :         d.mu.Lock()
     178            1 :         d.disableFileDeletions()
     179            1 :         defer func() {
     180            1 :                 d.mu.Lock()
     181            1 :                 defer d.mu.Unlock()
     182            1 :                 d.enableFileDeletions()
     183            1 :         }()
     184              : 
     185              :         // TODO(peter): RocksDB provides the option to roll the manifest if the
     186              :         // MANIFEST size is too large. Should we do this too?
     187              : 
     188              :         // Lock the manifest before getting the current version. We need the
     189              :         // length of the manifest that we read to match the current version that
     190              :         // we read, otherwise we might copy a versionEdit not reflected in the
     191              :         // sstables we copy/link.
     192            1 :         d.mu.versions.logLock()
     193            1 :         // Get the the current version and the current manifest file number.
     194            1 :         current := d.mu.versions.currentVersion()
     195            1 :         formatVers := d.FormatMajorVersion()
     196            1 :         manifestFileNum := d.mu.versions.manifestFileNum
     197            1 :         manifestSize := d.mu.versions.manifest.Size()
     198            1 :         optionsFileNum := d.optionsFileNum
     199            1 : 
     200            1 :         virtualBackingFiles := make(map[base.DiskFileNum]struct{})
     201            1 :         for backing := range d.mu.versions.latest.virtualBackings.All() {
     202            1 :                 virtualBackingFiles[backing.DiskFileNum] = struct{}{}
     203            1 :         }
     204            1 :         versionBlobFiles := d.mu.versions.latest.blobFiles.Metadatas()
     205            1 : 
     206            1 :         // Acquire the logs while holding mutexes to ensure we don't race with a
     207            1 :         // flush that might mark a log that's relevant to `current` as obsolete
     208            1 :         // before our call to List.
     209            1 :         allLogicalLogs := d.mu.log.manager.List()
     210            1 : 
     211            1 :         // Grab the visible sequence number. The checkpoint's view of the state of
     212            1 :         // the database will be equivalent to an iterator that acquired this same
     213            1 :         // visible sequence number.
     214            1 :         visibleSeqNum := d.mu.versions.visibleSeqNum.Load()
     215            1 : 
     216            1 :         // Release the manifest and DB.mu so we don't block other operations on the
     217            1 :         // database.
     218            1 :         //
     219            1 :         // But first reference the version to ensure that the version's in-memory
     220            1 :         // state and its physical files remain available for the checkpoint. In
     221            1 :         // particular, the Version.BlobFileSet is only valid while a version is
     222            1 :         // referenced.
     223            1 :         current.Ref()
     224            1 :         d.mu.versions.logUnlock()
     225            1 :         d.mu.Unlock()
     226            1 :         defer current.Unref()
     227            1 : 
     228            1 :         // Wrap the normal filesystem with one which wraps newly created files with
     229            1 :         // vfs.NewSyncingFile.
     230            1 :         fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
     231            1 :                 NoSyncOnClose: d.opts.NoSyncOnClose,
     232            1 :                 BytesPerSync:  d.opts.BytesPerSync,
     233            1 :         })
     234            1 : 
     235            1 :         // Create the dir and its parents (if necessary), and sync them.
     236            1 :         var dir vfs.File
     237            1 :         defer func() {
     238            1 :                 if dir != nil {
     239            0 :                         _ = dir.Close()
     240            0 :                 }
     241            1 :                 if ckErr != nil {
     242            0 :                         // Attempt to cleanup on error.
     243            0 :                         _ = fs.RemoveAll(destDir)
     244            0 :                 }
     245              :         }()
     246            1 :         dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
     247            1 :         if ckErr != nil {
     248            0 :                 return ckErr
     249            0 :         }
     250              : 
     251            1 :         {
     252            1 :                 // Copy the OPTIONS.
     253            1 :                 srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeOptions, optionsFileNum)
     254            1 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     255            1 :                 ckErr = copyCheckpointOptions(fs, srcPath, destPath)
     256            1 :                 if ckErr != nil {
     257            0 :                         return ckErr
     258            0 :                 }
     259              :         }
     260              : 
     261            1 :         {
     262            1 :                 // Set the format major version in the destination directory.
     263            1 :                 var versionMarker *atomicfs.Marker
     264            1 :                 versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
     265            1 :                 if ckErr != nil {
     266            0 :                         return ckErr
     267            0 :                 }
     268              : 
     269              :                 // We use the marker to encode the active format version in the
     270              :                 // marker filename. Unlike other uses of the atomic marker,
     271              :                 // there is no file with the filename `formatVers.String()` on
     272              :                 // the filesystem.
     273            1 :                 ckErr = versionMarker.Move(formatVers.String())
     274            1 :                 if ckErr != nil {
     275            0 :                         return ckErr
     276            0 :                 }
     277            1 :                 ckErr = versionMarker.Close()
     278            1 :                 if ckErr != nil {
     279            0 :                         return ckErr
     280            0 :                 }
     281              :         }
     282              : 
     283            1 :         var excludedTables map[manifest.DeletedTableEntry]*manifest.TableMetadata
     284            1 :         var includedBlobFiles map[base.BlobFileID]struct{}
     285            1 :         var remoteFiles []base.DiskFileNum
     286            1 :         // Set of TableBacking.DiskFileNum which will be required by virtual sstables
     287            1 :         // in the checkpoint.
     288            1 :         requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
     289            1 : 
     290            1 :         copyFile := func(typ base.FileType, fileNum base.DiskFileNum) error {
     291            1 :                 meta, err := d.objProvider.Lookup(typ, fileNum)
     292            1 :                 if err != nil {
     293            0 :                         return err
     294            0 :                 }
     295            1 :                 if meta.IsRemote() {
     296            1 :                         // We don't copy remote files. This is desirable as checkpointing is
     297            1 :                         // supposed to be a fast operation, and references to remote files can
     298            1 :                         // always be resolved by any checkpoint readers by reading the object
     299            1 :                         // catalog. We don't add this file to excludedFiles either, as that'd
     300            1 :                         // cause it to be deleted in the second manifest entry which is also
     301            1 :                         // inaccurate.
     302            1 :                         remoteFiles = append(remoteFiles, meta.DiskFileNum)
     303            1 :                         return nil
     304            1 :                 }
     305            1 :                 srcPath := base.MakeFilepath(fs, d.dirname, typ, fileNum)
     306            1 :                 destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
     307            1 :                 return vfs.LinkOrCopy(fs, srcPath, destPath)
     308              :         }
     309              : 
     310              :         // Link or copy the sstables.
     311            1 :         for l := range current.Levels {
     312            1 :                 iter := current.Levels[l].Iter()
     313            1 :                 for f := iter.First(); f != nil; f = iter.Next() {
     314            1 :                         if excludeFromCheckpoint(f, opt, d.cmp) {
     315            1 :                                 if excludedTables == nil {
     316            1 :                                         excludedTables = make(map[manifest.DeletedTableEntry]*manifest.TableMetadata)
     317            1 :                                 }
     318            1 :                                 excludedTables[manifest.DeletedTableEntry{
     319            1 :                                         Level:   l,
     320            1 :                                         FileNum: f.TableNum,
     321            1 :                                 }] = f
     322            1 :                                 continue
     323              :                         }
     324              : 
     325              :                         // Copy any referenced blob files that have not already been copied.
     326            1 :                         if len(f.BlobReferences) > 0 {
     327            1 :                                 if includedBlobFiles == nil {
     328            1 :                                         includedBlobFiles = make(map[base.BlobFileID]struct{})
     329            1 :                                 }
     330            1 :                                 for _, ref := range f.BlobReferences {
     331            1 :                                         if _, ok := includedBlobFiles[ref.FileID]; !ok {
     332            1 :                                                 includedBlobFiles[ref.FileID] = struct{}{}
     333            1 : 
     334            1 :                                                 // Map the BlobFileID to a DiskFileNum in the current version.
     335            1 :                                                 obj, ok := current.BlobFiles.Lookup(ref.FileID)
     336            1 :                                                 if !ok {
     337            0 :                                                         return errors.Errorf("blob file %s not found", ref.FileID)
     338            0 :                                                 }
     339            1 :                                                 ckErr = copyFile(obj.FileInfo())
     340            1 :                                                 if ckErr != nil {
     341            0 :                                                         return ckErr
     342            0 :                                                 }
     343              :                                         }
     344              :                                 }
     345              :                         }
     346              : 
     347            1 :                         tableBacking := f.TableBacking
     348            1 :                         if f.Virtual {
     349            1 :                                 if _, ok := requiredVirtualBackingFiles[tableBacking.DiskFileNum]; ok {
     350            1 :                                         continue
     351              :                                 }
     352            1 :                                 requiredVirtualBackingFiles[tableBacking.DiskFileNum] = struct{}{}
     353              :                         }
     354            1 :                         ckErr = copyFile(base.FileTypeTable, tableBacking.DiskFileNum)
     355            1 :                         if ckErr != nil {
     356            0 :                                 return ckErr
     357            0 :                         }
     358              :                 }
     359              :         }
     360              : 
     361            1 :         var removeBackingTables []base.DiskFileNum
     362            1 :         for diskFileNum := range virtualBackingFiles {
     363            1 :                 if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
     364            1 :                         // The backing sstable associated with fileNum is no longer
     365            1 :                         // required.
     366            1 :                         removeBackingTables = append(removeBackingTables, diskFileNum)
     367            1 :                 }
     368              :         }
     369              :         // Record the blob files that are not referenced by any included sstables.
     370              :         // When we write the MANIFEST of the checkpoint, we'll include a final
     371              :         // VersionEdit that removes these blob files so that the checkpointed
     372              :         // manifest is consistent.
     373            1 :         var excludedBlobFiles map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile
     374            1 :         if len(includedBlobFiles) < len(versionBlobFiles) {
     375            1 :                 excludedBlobFiles = make(map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile, len(versionBlobFiles)-len(includedBlobFiles))
     376            1 :                 for _, meta := range versionBlobFiles {
     377            1 :                         if _, ok := includedBlobFiles[meta.FileID]; !ok {
     378            1 :                                 excludedBlobFiles[manifest.DeletedBlobFileEntry{
     379            1 :                                         FileID:  meta.FileID,
     380            1 :                                         FileNum: meta.Physical.FileNum,
     381            1 :                                 }] = meta.Physical
     382            1 :                         }
     383              :                 }
     384              :         }
     385              : 
     386            1 :         ckErr = d.writeCheckpointManifest(
     387            1 :                 fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
     388            1 :                 excludedTables, removeBackingTables, excludedBlobFiles,
     389            1 :         )
     390            1 :         if ckErr != nil {
     391            0 :                 return ckErr
     392            0 :         }
     393            1 :         if len(remoteFiles) > 0 {
     394            1 :                 ckErr = d.objProvider.CheckpointState(fs, destDir, remoteFiles)
     395            1 :                 if ckErr != nil {
     396            0 :                         return ckErr
     397            0 :                 }
     398              :         }
     399              : 
     400              :         // Copy the WAL files. We copy rather than link for a few reasons:
     401              :         // - WAL file recycling will cause the WAL files to be reused which
     402              :         //   would invalidate the checkpoint.
     403              :         // - While we're performing our checkpoint, the latest WAL may still be
     404              :         //   receiving writes. We must exclude these writes, otherwise we'll
     405              :         //   violate the guarantee that the checkpoint is a consistent snapshot
     406              :         //   (we could copy a write that was committed after an ingest that was
     407              :         //    committed after we acquired our version).
     408              :         //
     409              :         // It's possible allLogicalLogs includes logs that are not relevant (beneath
     410              :         // the version's MinUnflushedLogNum). These extra files are harmless. The
     411              :         // earlier (wal.Manager).List call will not include obsolete logs that are
     412              :         // sitting in the recycler or have already been passed off to the cleanup
     413              :         // manager for deletion.
     414              :         //
     415              :         // TODO(jackson): It would be desirable to copy all recycling and obsolete
     416              :         // WALs to aid corruption postmortem debugging should we need them.
     417            1 :         for _, log := range allLogicalLogs {
     418            1 :                 ckErr = wal.Copy(fs, destDir, log, visibleSeqNum, record.LogWriterConfig{
     419            1 :                         WriteWALSyncOffsets: func() bool { return formatVers > FormatWALSyncChunks },
     420              :                 })
     421            1 :                 if ckErr != nil {
     422            0 :                         return ckErr
     423            0 :                 }
     424              :         }
     425              : 
     426              :         // Sync and close the checkpoint directory.
     427            1 :         ckErr = dir.Sync()
     428            1 :         if ckErr != nil {
     429            0 :                 return ckErr
     430            0 :         }
     431            1 :         ckErr = dir.Close()
     432            1 :         dir = nil
     433            1 :         return ckErr
     434              : }
     435              : 
     436              : // copyCheckpointOptions copies an OPTIONS file, commenting out some options
     437              : // that existed on the original database but no longer apply to the checkpointed
     438              : // database. For example, the entire [WAL Failover] stanza is commented out
     439              : // because Checkpoint will copy all WAL segment files from both the primary and
     440              : // secondary WAL directories into the checkpoint.
     441            1 : func copyCheckpointOptions(fs vfs.FS, srcPath, dstPath string) error {
     442            1 :         var buf bytes.Buffer
     443            1 :         f, err := fs.Open(srcPath)
     444            1 :         if err != nil {
     445            0 :                 return err
     446            0 :         }
     447            1 :         defer f.Close()
     448            1 :         b, err := io.ReadAll(f)
     449            1 :         if err != nil {
     450            0 :                 return err
     451            0 :         }
     452              :         // Copy the OPTIONS file verbatim, but commenting out the [WAL Failover]
     453              :         // section.
     454            1 :         err = parseOptions(string(b), parseOptionsFuncs{
     455            1 :                 visitNewSection: func(startOff, endOff int, section string) error {
     456            1 :                         if section == "WAL Failover" {
     457            1 :                                 buf.WriteString("# ")
     458            1 :                         }
     459            1 :                         buf.Write(b[startOff:endOff])
     460            1 :                         return nil
     461              :                 },
     462            1 :                 visitKeyValue: func(startOff, endOff int, section, key, value string) error {
     463            1 :                         if section == "WAL Failover" {
     464            1 :                                 buf.WriteString("# ")
     465            1 :                         }
     466            1 :                         buf.Write(b[startOff:endOff])
     467            1 :                         return nil
     468              :                 },
     469            1 :                 visitCommentOrWhitespace: func(startOff, endOff int, line string) error {
     470            1 :                         buf.Write(b[startOff:endOff])
     471            1 :                         return nil
     472            1 :                 },
     473              :         })
     474            1 :         if err != nil {
     475            0 :                 return err
     476            0 :         }
     477            1 :         nf, err := fs.Create(dstPath, vfs.WriteCategoryUnspecified)
     478            1 :         if err != nil {
     479            0 :                 return err
     480            0 :         }
     481            1 :         _, err = io.Copy(nf, &buf)
     482            1 :         if err != nil {
     483            0 :                 return err
     484            0 :         }
     485            1 :         return errors.CombineErrors(nf.Sync(), nf.Close())
     486              : }
     487              : 
     488              : func (d *DB) writeCheckpointManifest(
     489              :         fs vfs.FS,
     490              :         formatVers FormatMajorVersion,
     491              :         destDirPath string,
     492              :         destDir vfs.File,
     493              :         manifestFileNum base.DiskFileNum,
     494              :         manifestSize int64,
     495              :         excludedTables map[manifest.DeletedTableEntry]*manifest.TableMetadata,
     496              :         removeBackingTables []base.DiskFileNum,
     497              :         excludedBlobFiles map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile,
     498            1 : ) error {
     499            1 :         // Copy the MANIFEST, and create a pointer to it. We copy rather
     500            1 :         // than link because additional version edits added to the
     501            1 :         // MANIFEST after we took our snapshot of the sstables will
     502            1 :         // reference sstables that aren't in our checkpoint. For a
     503            1 :         // similar reason, we need to limit how much of the MANIFEST we
     504            1 :         // copy.
     505            1 :         // If some files are excluded from the checkpoint, also append a block that
     506            1 :         // records those files as deleted.
     507            1 :         if err := func() error {
     508            1 :                 srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeManifest, manifestFileNum)
     509            1 :                 destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
     510            1 :                 src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
     511            1 :                 if err != nil {
     512            0 :                         return err
     513            0 :                 }
     514            1 :                 defer src.Close()
     515            1 : 
     516            1 :                 dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
     517            1 :                 if err != nil {
     518            0 :                         return err
     519            0 :                 }
     520            1 :                 defer dst.Close()
     521            1 : 
     522            1 :                 // Copy all existing records. We need to copy at the record level in case we
     523            1 :                 // need to append another record with the excluded files (we cannot simply
     524            1 :                 // append a record after a raw data copy; see
     525            1 :                 // https://github.com/cockroachdb/cockroach/issues/100935).
     526            1 :                 r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
     527            1 :                 w := record.NewWriter(dst)
     528            1 :                 for {
     529            1 :                         rr, err := r.Next()
     530            1 :                         if err != nil {
     531            1 :                                 if err == io.EOF {
     532            1 :                                         break
     533              :                                 }
     534            0 :                                 return err
     535              :                         }
     536              : 
     537            1 :                         rw, err := w.Next()
     538            1 :                         if err != nil {
     539            0 :                                 return err
     540            0 :                         }
     541            1 :                         if _, err := io.Copy(rw, rr); err != nil {
     542            0 :                                 return err
     543            0 :                         }
     544              :                 }
     545              : 
     546            1 :                 if len(excludedTables) > 0 || len(excludedBlobFiles) > 0 {
     547            1 :                         // Write out an additional VersionEdit that deletes the excluded SST files.
     548            1 :                         ve := manifest.VersionEdit{
     549            1 :                                 DeletedTables:        excludedTables,
     550            1 :                                 RemovedBackingTables: removeBackingTables,
     551            1 :                                 DeletedBlobFiles:     excludedBlobFiles,
     552            1 :                         }
     553            1 : 
     554            1 :                         rw, err := w.Next()
     555            1 :                         if err != nil {
     556            0 :                                 return err
     557            0 :                         }
     558            1 :                         if err := ve.Encode(rw); err != nil {
     559            0 :                                 return err
     560            0 :                         }
     561              :                 }
     562            1 :                 if err := w.Close(); err != nil {
     563            0 :                         return err
     564            0 :                 }
     565            1 :                 return dst.Sync()
     566            0 :         }(); err != nil {
     567            0 :                 return err
     568            0 :         }
     569              : 
     570            1 :         var manifestMarker *atomicfs.Marker
     571            1 :         manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
     572            1 :         if err != nil {
     573            0 :                 return err
     574            0 :         }
     575            1 :         if err := manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, manifestFileNum)); err != nil {
     576            0 :                 return err
     577            0 :         }
     578            1 :         return manifestMarker.Close()
     579              : }
        

Generated by: LCOV version 2.0-1