Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "io"
10 : "os"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/errors/oserror"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/manifest"
16 : "github.com/cockroachdb/pebble/record"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/cockroachdb/pebble/vfs/atomicfs"
19 : "github.com/cockroachdb/pebble/wal"
20 : )
21 :
22 : // checkpointOptions hold the optional parameters to construct checkpoint
23 : // snapshots.
24 : type checkpointOptions struct {
25 : // flushWAL set to true will force a flush and sync of the WAL prior to
26 : // checkpointing.
27 : flushWAL bool
28 :
29 : // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
30 : restrictToSpans []CheckpointSpan
31 : }
32 :
33 : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
34 : type CheckpointOption func(*checkpointOptions)
35 :
36 : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
37 : // checkpoint. This guarantees that any writes committed before calling
38 : // DB.Checkpoint will be part of that checkpoint.
39 : //
40 : // Note that this setting can only be useful in cases when some writes are
41 : // performed with Sync = false. Otherwise, the guarantee will already be met.
42 : //
43 : // Passing this option is functionally equivalent to calling
44 : // DB.LogData(nil, Sync) right before DB.Checkpoint.
45 1 : func WithFlushedWAL() CheckpointOption {
46 1 : return func(opt *checkpointOptions) {
47 1 : opt.flushWAL = true
48 1 : }
49 : }
50 :
51 : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
52 : // that don't overlap with any of these spans are excluded from the checkpoint.
53 : //
54 : // Note that the checkpoint can still surface keys outside of these spans (from
55 : // the WAL and from SSTs that partially overlap with these spans). Moreover,
56 : // these surface keys aren't necessarily "valid" in that they could have been
57 : // modified but the SST containing the modification is excluded.
58 2 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
59 2 : return func(opt *checkpointOptions) {
60 2 : opt.restrictToSpans = spans
61 2 : }
62 : }
63 :
64 : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
65 : // End) of interest for a checkpoint.
66 : type CheckpointSpan struct {
67 : Start []byte
68 : End []byte
69 : }
70 :
71 : // excludeFromCheckpoint returns true if an SST file should be excluded from the
72 : // checkpoint because it does not overlap with the spans of interest
73 : // (opt.restrictToSpans).
74 2 : func excludeFromCheckpoint(f *manifest.TableMetadata, opt *checkpointOptions, cmp Compare) bool {
75 2 : if len(opt.restrictToSpans) == 0 {
76 2 : // Option not set; don't exclude anything.
77 2 : return false
78 2 : }
79 2 : for _, s := range opt.restrictToSpans {
80 2 : spanBounds := base.UserKeyBoundsEndExclusive(s.Start, s.End)
81 2 : if f.Overlaps(cmp, &spanBounds) {
82 2 : return false
83 2 : }
84 : }
85 : // None of the restrictToSpans overlapped; we can exclude this file.
86 2 : return true
87 : }
88 :
89 : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
90 : // Those missing parents, as well as the closest existing ancestor, are synced.
91 : // Returns a handle to the directory created at destDir.
92 2 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
93 2 : // Collect paths for all directories between destDir (excluded) and its
94 2 : // closest existing ancestor (included).
95 2 : var parentPaths []string
96 2 : for parentPath := fs.PathDir(destDir); ; parentPath = fs.PathDir(parentPath) {
97 2 : parentPaths = append(parentPaths, parentPath)
98 2 : if fs.PathDir(parentPath) == parentPath {
99 1 : break
100 : }
101 2 : _, err := fs.Stat(parentPath)
102 2 : if err == nil {
103 2 : // Exit loop at the closest existing ancestor.
104 2 : break
105 : }
106 2 : if !oserror.IsNotExist(err) {
107 0 : return nil, err
108 0 : }
109 : }
110 : // Create destDir and any of its missing parents.
111 2 : if err := fs.MkdirAll(destDir, 0755); err != nil {
112 1 : return nil, err
113 1 : }
114 : // Sync all the parent directories up to the closest existing ancestor,
115 : // included.
116 2 : for _, parentPath := range parentPaths {
117 2 : parentDir, err := fs.OpenDir(parentPath)
118 2 : if err != nil {
119 1 : return nil, err
120 1 : }
121 2 : err = parentDir.Sync()
122 2 : if err != nil {
123 1 : _ = parentDir.Close()
124 1 : return nil, err
125 1 : }
126 2 : err = parentDir.Close()
127 2 : if err != nil {
128 0 : return nil, err
129 0 : }
130 : }
131 2 : return fs.OpenDir(destDir)
132 : }
133 :
134 : // Checkpoint constructs a snapshot of the DB instance in the specified
135 : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
136 : // snapshot. Hard links will be used when possible. Beware of the significant
137 : // space overhead for a checkpoint if hard links are disabled. Also beware that
138 : // even if hard links are used, the space overhead for the checkpoint will
139 : // increase over time as the DB performs compactions.
140 : //
141 : // Note that shared files in a checkpoint could get deleted if the DB is
142 : // restarted after a checkpoint operation, as the reference for the checkpoint
143 : // is only maintained in memory. This is okay as long as users of Checkpoint
144 : // crash shortly afterwards with a "poison file" preventing further restarts.
145 : func (d *DB) Checkpoint(
146 : destDir string, opts ...CheckpointOption,
147 : ) (
148 : ckErr error, /* used in deferred cleanup */
149 2 : ) {
150 2 : opt := &checkpointOptions{}
151 2 : for _, fn := range opts {
152 2 : fn(opt)
153 2 : }
154 :
155 2 : if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
156 1 : if err == nil {
157 1 : return &os.PathError{
158 1 : Op: "checkpoint",
159 1 : Path: destDir,
160 1 : Err: oserror.ErrExist,
161 1 : }
162 1 : }
163 0 : return err
164 : }
165 :
166 2 : if opt.flushWAL && !d.opts.DisableWAL {
167 1 : // Write an empty log-data record to flush and sync the WAL.
168 1 : if err := d.LogData(nil /* data */, Sync); err != nil {
169 0 : return err
170 0 : }
171 : }
172 :
173 : // Disable file deletions.
174 : // We acquire a reference on the version down below that will prevent any
175 : // sstables or blob files from becoming "obsolete" and potentially deleted,
176 : // but this doesn't protect the current WALs or manifests.
177 2 : d.mu.Lock()
178 2 : d.disableFileDeletions()
179 2 : defer func() {
180 2 : d.mu.Lock()
181 2 : defer d.mu.Unlock()
182 2 : d.enableFileDeletions()
183 2 : }()
184 :
185 : // TODO(peter): RocksDB provides the option to roll the manifest if the
186 : // MANIFEST size is too large. Should we do this too?
187 :
188 : // Lock the manifest before getting the current version. We need the
189 : // length of the manifest that we read to match the current version that
190 : // we read, otherwise we might copy a versionEdit not reflected in the
191 : // sstables we copy/link.
192 2 : d.mu.versions.logLock()
193 2 : // Get the the current version and the current manifest file number.
194 2 : current := d.mu.versions.currentVersion()
195 2 : formatVers := d.FormatMajorVersion()
196 2 : manifestFileNum := d.mu.versions.manifestFileNum
197 2 : manifestSize := d.mu.versions.manifest.Size()
198 2 : optionsFileNum := d.optionsFileNum
199 2 :
200 2 : virtualBackingFiles := make(map[base.DiskFileNum]struct{})
201 2 : for backing := range d.mu.versions.latest.virtualBackings.All() {
202 2 : virtualBackingFiles[backing.DiskFileNum] = struct{}{}
203 2 : }
204 2 : versionBlobFiles := d.mu.versions.latest.blobFiles.Metadatas()
205 2 :
206 2 : // Acquire the logs while holding mutexes to ensure we don't race with a
207 2 : // flush that might mark a log that's relevant to `current` as obsolete
208 2 : // before our call to List.
209 2 : allLogicalLogs := d.mu.log.manager.List()
210 2 :
211 2 : // Grab the visible sequence number. The checkpoint's view of the state of
212 2 : // the database will be equivalent to an iterator that acquired this same
213 2 : // visible sequence number.
214 2 : visibleSeqNum := d.mu.versions.visibleSeqNum.Load()
215 2 :
216 2 : // Release the manifest and DB.mu so we don't block other operations on the
217 2 : // database.
218 2 : //
219 2 : // But first reference the version to ensure that the version's in-memory
220 2 : // state and its physical files remain available for the checkpoint. In
221 2 : // particular, the Version.BlobFileSet is only valid while a version is
222 2 : // referenced.
223 2 : current.Ref()
224 2 : d.mu.versions.logUnlock()
225 2 : d.mu.Unlock()
226 2 : defer current.Unref()
227 2 :
228 2 : // Wrap the normal filesystem with one which wraps newly created files with
229 2 : // vfs.NewSyncingFile.
230 2 : fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
231 2 : NoSyncOnClose: d.opts.NoSyncOnClose,
232 2 : BytesPerSync: d.opts.BytesPerSync,
233 2 : })
234 2 :
235 2 : // Create the dir and its parents (if necessary), and sync them.
236 2 : var dir vfs.File
237 2 : defer func() {
238 2 : if dir != nil {
239 0 : _ = dir.Close()
240 0 : }
241 2 : if ckErr != nil {
242 0 : // Attempt to cleanup on error.
243 0 : _ = fs.RemoveAll(destDir)
244 0 : }
245 : }()
246 2 : dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
247 2 : if ckErr != nil {
248 0 : return ckErr
249 0 : }
250 :
251 2 : {
252 2 : // Copy the OPTIONS.
253 2 : srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeOptions, optionsFileNum)
254 2 : destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
255 2 : ckErr = copyCheckpointOptions(fs, srcPath, destPath)
256 2 : if ckErr != nil {
257 0 : return ckErr
258 0 : }
259 : }
260 :
261 2 : {
262 2 : // Set the format major version in the destination directory.
263 2 : var versionMarker *atomicfs.Marker
264 2 : versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
265 2 : if ckErr != nil {
266 0 : return ckErr
267 0 : }
268 :
269 : // We use the marker to encode the active format version in the
270 : // marker filename. Unlike other uses of the atomic marker,
271 : // there is no file with the filename `formatVers.String()` on
272 : // the filesystem.
273 2 : ckErr = versionMarker.Move(formatVers.String())
274 2 : if ckErr != nil {
275 0 : return ckErr
276 0 : }
277 2 : ckErr = versionMarker.Close()
278 2 : if ckErr != nil {
279 0 : return ckErr
280 0 : }
281 : }
282 :
283 2 : var excludedTables map[manifest.DeletedTableEntry]*manifest.TableMetadata
284 2 : var includedBlobFiles map[base.BlobFileID]struct{}
285 2 : var remoteFiles []base.DiskFileNum
286 2 : // Set of TableBacking.DiskFileNum which will be required by virtual sstables
287 2 : // in the checkpoint.
288 2 : requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
289 2 :
290 2 : copyFile := func(typ base.FileType, fileNum base.DiskFileNum) error {
291 2 : meta, err := d.objProvider.Lookup(typ, fileNum)
292 2 : if err != nil {
293 0 : return err
294 0 : }
295 2 : if meta.IsRemote() {
296 1 : // We don't copy remote files. This is desirable as checkpointing is
297 1 : // supposed to be a fast operation, and references to remote files can
298 1 : // always be resolved by any checkpoint readers by reading the object
299 1 : // catalog. We don't add this file to excludedFiles either, as that'd
300 1 : // cause it to be deleted in the second manifest entry which is also
301 1 : // inaccurate.
302 1 : remoteFiles = append(remoteFiles, meta.DiskFileNum)
303 1 : return nil
304 1 : }
305 2 : srcPath := base.MakeFilepath(fs, d.dirname, typ, fileNum)
306 2 : destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
307 2 : return vfs.LinkOrCopy(fs, srcPath, destPath)
308 : }
309 :
310 : // Link or copy the sstables.
311 2 : for l := range current.Levels {
312 2 : iter := current.Levels[l].Iter()
313 2 : for f := iter.First(); f != nil; f = iter.Next() {
314 2 : if excludeFromCheckpoint(f, opt, d.cmp) {
315 2 : if excludedTables == nil {
316 2 : excludedTables = make(map[manifest.DeletedTableEntry]*manifest.TableMetadata)
317 2 : }
318 2 : excludedTables[manifest.DeletedTableEntry{
319 2 : Level: l,
320 2 : FileNum: f.TableNum,
321 2 : }] = f
322 2 : continue
323 : }
324 :
325 : // Copy any referenced blob files that have not already been copied.
326 2 : if len(f.BlobReferences) > 0 {
327 2 : if includedBlobFiles == nil {
328 2 : includedBlobFiles = make(map[base.BlobFileID]struct{})
329 2 : }
330 2 : for _, ref := range f.BlobReferences {
331 2 : if _, ok := includedBlobFiles[ref.FileID]; !ok {
332 2 : includedBlobFiles[ref.FileID] = struct{}{}
333 2 :
334 2 : // Map the BlobFileID to a DiskFileNum in the current version.
335 2 : obj, ok := current.BlobFiles.Lookup(ref.FileID)
336 2 : if !ok {
337 0 : return errors.Errorf("blob file %s not found", ref.FileID)
338 0 : }
339 2 : ckErr = copyFile(obj.FileInfo())
340 2 : if ckErr != nil {
341 0 : return ckErr
342 0 : }
343 : }
344 : }
345 : }
346 :
347 2 : tableBacking := f.TableBacking
348 2 : if f.Virtual {
349 2 : if _, ok := requiredVirtualBackingFiles[tableBacking.DiskFileNum]; ok {
350 2 : continue
351 : }
352 2 : requiredVirtualBackingFiles[tableBacking.DiskFileNum] = struct{}{}
353 : }
354 2 : ckErr = copyFile(base.FileTypeTable, tableBacking.DiskFileNum)
355 2 : if ckErr != nil {
356 0 : return ckErr
357 0 : }
358 : }
359 : }
360 :
361 2 : var removeBackingTables []base.DiskFileNum
362 2 : for diskFileNum := range virtualBackingFiles {
363 2 : if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
364 2 : // The backing sstable associated with fileNum is no longer
365 2 : // required.
366 2 : removeBackingTables = append(removeBackingTables, diskFileNum)
367 2 : }
368 : }
369 : // Record the blob files that are not referenced by any included sstables.
370 : // When we write the MANIFEST of the checkpoint, we'll include a final
371 : // VersionEdit that removes these blob files so that the checkpointed
372 : // manifest is consistent.
373 2 : var excludedBlobFiles map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile
374 2 : if len(includedBlobFiles) < len(versionBlobFiles) {
375 2 : excludedBlobFiles = make(map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile, len(versionBlobFiles)-len(includedBlobFiles))
376 2 : for _, meta := range versionBlobFiles {
377 2 : if _, ok := includedBlobFiles[meta.FileID]; !ok {
378 2 : excludedBlobFiles[manifest.DeletedBlobFileEntry{
379 2 : FileID: meta.FileID,
380 2 : FileNum: meta.Physical.FileNum,
381 2 : }] = meta.Physical
382 2 : }
383 : }
384 : }
385 :
386 2 : ckErr = d.writeCheckpointManifest(
387 2 : fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
388 2 : excludedTables, removeBackingTables, excludedBlobFiles,
389 2 : )
390 2 : if ckErr != nil {
391 0 : return ckErr
392 0 : }
393 2 : if len(remoteFiles) > 0 {
394 1 : ckErr = d.objProvider.CheckpointState(fs, destDir, remoteFiles)
395 1 : if ckErr != nil {
396 0 : return ckErr
397 0 : }
398 : }
399 :
400 : // Copy the WAL files. We copy rather than link for a few reasons:
401 : // - WAL file recycling will cause the WAL files to be reused which
402 : // would invalidate the checkpoint.
403 : // - While we're performing our checkpoint, the latest WAL may still be
404 : // receiving writes. We must exclude these writes, otherwise we'll
405 : // violate the guarantee that the checkpoint is a consistent snapshot
406 : // (we could copy a write that was committed after an ingest that was
407 : // committed after we acquired our version).
408 : //
409 : // It's possible allLogicalLogs includes logs that are not relevant (beneath
410 : // the version's MinUnflushedLogNum). These extra files are harmless. The
411 : // earlier (wal.Manager).List call will not include obsolete logs that are
412 : // sitting in the recycler or have already been passed off to the cleanup
413 : // manager for deletion.
414 : //
415 : // TODO(jackson): It would be desirable to copy all recycling and obsolete
416 : // WALs to aid corruption postmortem debugging should we need them.
417 2 : for _, log := range allLogicalLogs {
418 2 : ckErr = wal.Copy(fs, destDir, log, visibleSeqNum, record.LogWriterConfig{
419 2 : WriteWALSyncOffsets: func() bool { return formatVers > FormatWALSyncChunks },
420 : })
421 2 : if ckErr != nil {
422 0 : return ckErr
423 0 : }
424 : }
425 :
426 : // Sync and close the checkpoint directory.
427 2 : ckErr = dir.Sync()
428 2 : if ckErr != nil {
429 0 : return ckErr
430 0 : }
431 2 : ckErr = dir.Close()
432 2 : dir = nil
433 2 : return ckErr
434 : }
435 :
436 : // copyCheckpointOptions copies an OPTIONS file, commenting out some options
437 : // that existed on the original database but no longer apply to the checkpointed
438 : // database. For example, the entire [WAL Failover] stanza is commented out
439 : // because Checkpoint will copy all WAL segment files from both the primary and
440 : // secondary WAL directories into the checkpoint.
441 2 : func copyCheckpointOptions(fs vfs.FS, srcPath, dstPath string) error {
442 2 : var buf bytes.Buffer
443 2 : f, err := fs.Open(srcPath)
444 2 : if err != nil {
445 0 : return err
446 0 : }
447 2 : defer f.Close()
448 2 : b, err := io.ReadAll(f)
449 2 : if err != nil {
450 0 : return err
451 0 : }
452 : // Copy the OPTIONS file verbatim, but commenting out the [WAL Failover]
453 : // section.
454 2 : err = parseOptions(string(b), parseOptionsFuncs{
455 2 : visitNewSection: func(startOff, endOff int, section string) error {
456 2 : if section == "WAL Failover" {
457 2 : buf.WriteString("# ")
458 2 : }
459 2 : buf.Write(b[startOff:endOff])
460 2 : return nil
461 : },
462 2 : visitKeyValue: func(startOff, endOff int, section, key, value string) error {
463 2 : if section == "WAL Failover" {
464 2 : buf.WriteString("# ")
465 2 : }
466 2 : buf.Write(b[startOff:endOff])
467 2 : return nil
468 : },
469 2 : visitCommentOrWhitespace: func(startOff, endOff int, line string) error {
470 2 : buf.Write(b[startOff:endOff])
471 2 : return nil
472 2 : },
473 : })
474 2 : if err != nil {
475 0 : return err
476 0 : }
477 2 : nf, err := fs.Create(dstPath, vfs.WriteCategoryUnspecified)
478 2 : if err != nil {
479 0 : return err
480 0 : }
481 2 : _, err = io.Copy(nf, &buf)
482 2 : if err != nil {
483 0 : return err
484 0 : }
485 2 : return errors.CombineErrors(nf.Sync(), nf.Close())
486 : }
487 :
488 : func (d *DB) writeCheckpointManifest(
489 : fs vfs.FS,
490 : formatVers FormatMajorVersion,
491 : destDirPath string,
492 : destDir vfs.File,
493 : manifestFileNum base.DiskFileNum,
494 : manifestSize int64,
495 : excludedTables map[manifest.DeletedTableEntry]*manifest.TableMetadata,
496 : removeBackingTables []base.DiskFileNum,
497 : excludedBlobFiles map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile,
498 2 : ) error {
499 2 : // Copy the MANIFEST, and create a pointer to it. We copy rather
500 2 : // than link because additional version edits added to the
501 2 : // MANIFEST after we took our snapshot of the sstables will
502 2 : // reference sstables that aren't in our checkpoint. For a
503 2 : // similar reason, we need to limit how much of the MANIFEST we
504 2 : // copy.
505 2 : // If some files are excluded from the checkpoint, also append a block that
506 2 : // records those files as deleted.
507 2 : if err := func() error {
508 2 : srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeManifest, manifestFileNum)
509 2 : destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
510 2 : src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
511 2 : if err != nil {
512 0 : return err
513 0 : }
514 2 : defer src.Close()
515 2 :
516 2 : dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
517 2 : if err != nil {
518 0 : return err
519 0 : }
520 2 : defer dst.Close()
521 2 :
522 2 : // Copy all existing records. We need to copy at the record level in case we
523 2 : // need to append another record with the excluded files (we cannot simply
524 2 : // append a record after a raw data copy; see
525 2 : // https://github.com/cockroachdb/cockroach/issues/100935).
526 2 : r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
527 2 : w := record.NewWriter(dst)
528 2 : for {
529 2 : rr, err := r.Next()
530 2 : if err != nil {
531 2 : if err == io.EOF {
532 2 : break
533 : }
534 0 : return err
535 : }
536 :
537 2 : rw, err := w.Next()
538 2 : if err != nil {
539 0 : return err
540 0 : }
541 2 : if _, err := io.Copy(rw, rr); err != nil {
542 0 : return err
543 0 : }
544 : }
545 :
546 2 : if len(excludedTables) > 0 || len(excludedBlobFiles) > 0 {
547 2 : // Write out an additional VersionEdit that deletes the excluded SST files.
548 2 : ve := manifest.VersionEdit{
549 2 : DeletedTables: excludedTables,
550 2 : RemovedBackingTables: removeBackingTables,
551 2 : DeletedBlobFiles: excludedBlobFiles,
552 2 : }
553 2 :
554 2 : rw, err := w.Next()
555 2 : if err != nil {
556 0 : return err
557 0 : }
558 2 : if err := ve.Encode(rw); err != nil {
559 0 : return err
560 0 : }
561 : }
562 2 : if err := w.Close(); err != nil {
563 0 : return err
564 0 : }
565 2 : return dst.Sync()
566 0 : }(); err != nil {
567 0 : return err
568 0 : }
569 :
570 2 : var manifestMarker *atomicfs.Marker
571 2 : manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
572 2 : if err != nil {
573 0 : return err
574 0 : }
575 2 : if err := manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, manifestFileNum)); err != nil {
576 0 : return err
577 0 : }
578 2 : return manifestMarker.Close()
579 : }
|