Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "io"
10 : "os"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/errors/oserror"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/manifest"
16 : "github.com/cockroachdb/pebble/record"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/cockroachdb/pebble/vfs/atomicfs"
19 : )
20 :
21 : // checkpointOptions hold the optional parameters to construct checkpoint
22 : // snapshots.
23 : type checkpointOptions struct {
24 : // flushWAL set to true will force a flush and sync of the WAL prior to
25 : // checkpointing.
26 : flushWAL bool
27 :
28 : // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
29 : restrictToSpans []CheckpointSpan
30 : }
31 :
32 : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
33 : type CheckpointOption func(*checkpointOptions)
34 :
35 : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
36 : // checkpoint. This guarantees that any writes committed before calling
37 : // DB.Checkpoint will be part of that checkpoint.
38 : //
39 : // Note that this setting can only be useful in cases when some writes are
40 : // performed with Sync = false. Otherwise, the guarantee will already be met.
41 : //
42 : // Passing this option is functionally equivalent to calling
43 : // DB.LogData(nil, Sync) right before DB.Checkpoint.
44 1 : func WithFlushedWAL() CheckpointOption {
45 1 : return func(opt *checkpointOptions) {
46 1 : opt.flushWAL = true
47 1 : }
48 : }
49 :
50 : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
51 : // that don't overlap with any of these spans are excluded from the checkpoint.
52 : //
53 : // Note that the checkpoint can still surface keys outside of these spans (from
54 : // the WAL and from SSTs that partially overlap with these spans). Moreover,
55 : // these surface keys aren't necessarily "valid" in that they could have been
56 : // modified but the SST containing the modification is excluded.
57 1 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
58 1 : return func(opt *checkpointOptions) {
59 1 : opt.restrictToSpans = spans
60 1 : }
61 : }
62 :
63 : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
64 : // End) of interest for a checkpoint.
65 : type CheckpointSpan struct {
66 : Start []byte
67 : End []byte
68 : }
69 :
70 : // excludeFromCheckpoint returns true if an SST file should be excluded from the
71 : // checkpoint because it does not overlap with the spans of interest
72 : // (opt.restrictToSpans).
73 1 : func excludeFromCheckpoint(f *manifest.TableMetadata, opt *checkpointOptions, cmp Compare) bool {
74 1 : if len(opt.restrictToSpans) == 0 {
75 1 : // Option not set; don't exclude anything.
76 1 : return false
77 1 : }
78 1 : for _, s := range opt.restrictToSpans {
79 1 : spanBounds := base.UserKeyBoundsEndExclusive(s.Start, s.End)
80 1 : if f.Overlaps(cmp, &spanBounds) {
81 1 : return false
82 1 : }
83 : }
84 : // None of the restrictToSpans overlapped; we can exclude this file.
85 1 : return true
86 : }
87 :
88 : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
89 : // Those missing parents, as well as the closest existing ancestor, are synced.
90 : // Returns a handle to the directory created at destDir.
91 1 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
92 1 : // Collect paths for all directories between destDir (excluded) and its
93 1 : // closest existing ancestor (included).
94 1 : var parentPaths []string
95 1 : for parentPath := fs.PathDir(destDir); ; parentPath = fs.PathDir(parentPath) {
96 1 : parentPaths = append(parentPaths, parentPath)
97 1 : if fs.PathDir(parentPath) == parentPath {
98 1 : break
99 : }
100 1 : _, err := fs.Stat(parentPath)
101 1 : if err == nil {
102 1 : // Exit loop at the closest existing ancestor.
103 1 : break
104 : }
105 1 : if !oserror.IsNotExist(err) {
106 0 : return nil, err
107 0 : }
108 : }
109 : // Create destDir and any of its missing parents.
110 1 : if err := fs.MkdirAll(destDir, 0755); err != nil {
111 1 : return nil, err
112 1 : }
113 : // Sync all the parent directories up to the closest existing ancestor,
114 : // included.
115 1 : for _, parentPath := range parentPaths {
116 1 : parentDir, err := fs.OpenDir(parentPath)
117 1 : if err != nil {
118 1 : return nil, err
119 1 : }
120 1 : err = parentDir.Sync()
121 1 : if err != nil {
122 1 : _ = parentDir.Close()
123 1 : return nil, err
124 1 : }
125 1 : err = parentDir.Close()
126 1 : if err != nil {
127 0 : return nil, err
128 0 : }
129 : }
130 1 : return fs.OpenDir(destDir)
131 : }
132 :
133 : // Checkpoint constructs a snapshot of the DB instance in the specified
134 : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
135 : // snapshot. Hard links will be used when possible. Beware of the significant
136 : // space overhead for a checkpoint if hard links are disabled. Also beware that
137 : // even if hard links are used, the space overhead for the checkpoint will
138 : // increase over time as the DB performs compactions.
139 : //
140 : // Note that shared files in a checkpoint could get deleted if the DB is
141 : // restarted after a checkpoint operation, as the reference for the checkpoint
142 : // is only maintained in memory. This is okay as long as users of Checkpoint
143 : // crash shortly afterwards with a "poison file" preventing further restarts.
144 : func (d *DB) Checkpoint(
145 : destDir string, opts ...CheckpointOption,
146 : ) (
147 : ckErr error, /* used in deferred cleanup */
148 1 : ) {
149 1 : opt := &checkpointOptions{}
150 1 : for _, fn := range opts {
151 1 : fn(opt)
152 1 : }
153 :
154 1 : if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
155 1 : if err == nil {
156 1 : return &os.PathError{
157 1 : Op: "checkpoint",
158 1 : Path: destDir,
159 1 : Err: oserror.ErrExist,
160 1 : }
161 1 : }
162 0 : return err
163 : }
164 :
165 1 : if opt.flushWAL && !d.opts.DisableWAL {
166 1 : // Write an empty log-data record to flush and sync the WAL.
167 1 : if err := d.LogData(nil /* data */, Sync); err != nil {
168 0 : return err
169 0 : }
170 : }
171 :
172 : // Disable file deletions.
173 : // We acquire a reference on the version down below that will prevent any
174 : // sstables or blob files from becoming "obsolete" and potentially deleted,
175 : // but this doesn't protect the current WALs or manifests.
176 1 : d.mu.Lock()
177 1 : d.disableFileDeletions()
178 1 : defer func() {
179 1 : d.mu.Lock()
180 1 : defer d.mu.Unlock()
181 1 : d.enableFileDeletions()
182 1 : }()
183 :
184 : // TODO(peter): RocksDB provides the option to roll the manifest if the
185 : // MANIFEST size is too large. Should we do this too?
186 :
187 : // Lock the manifest before getting the current version. We need the
188 : // length of the manifest that we read to match the current version that
189 : // we read, otherwise we might copy a versionEdit not reflected in the
190 : // sstables we copy/link.
191 1 : d.mu.versions.logLock()
192 1 : // Get the the current version and the current manifest file number.
193 1 : current := d.mu.versions.currentVersion()
194 1 : formatVers := d.FormatMajorVersion()
195 1 : manifestFileNum := d.mu.versions.manifestFileNum
196 1 : manifestSize := d.mu.versions.manifest.Size()
197 1 : optionsFileNum := d.optionsFileNum
198 1 :
199 1 : virtualBackingFiles := make(map[base.DiskFileNum]struct{})
200 1 : d.mu.versions.latest.virtualBackings.ForEach(func(backing *manifest.TableBacking) {
201 1 : virtualBackingFiles[backing.DiskFileNum] = struct{}{}
202 1 : })
203 1 : versionBlobFiles := d.mu.versions.latest.blobFiles.Metadatas()
204 1 :
205 1 : // Acquire the logs while holding mutexes to ensure we don't race with a
206 1 : // flush that might mark a log that's relevant to `current` as obsolete
207 1 : // before our call to List.
208 1 : allLogicalLogs := d.mu.log.manager.List()
209 1 :
210 1 : // Release the manifest and DB.mu so we don't block other operations on the
211 1 : // database.
212 1 : //
213 1 : // But first reference the version to ensure that the version's in-memory
214 1 : // state and its physical files remain available for the checkpoint. In
215 1 : // particular, the Version.BlobFileSet is only valid while a version is
216 1 : // referenced.
217 1 : current.Ref()
218 1 : d.mu.versions.logUnlock()
219 1 : d.mu.Unlock()
220 1 : defer current.Unref()
221 1 :
222 1 : // Wrap the normal filesystem with one which wraps newly created files with
223 1 : // vfs.NewSyncingFile.
224 1 : fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
225 1 : NoSyncOnClose: d.opts.NoSyncOnClose,
226 1 : BytesPerSync: d.opts.BytesPerSync,
227 1 : })
228 1 :
229 1 : // Create the dir and its parents (if necessary), and sync them.
230 1 : var dir vfs.File
231 1 : defer func() {
232 1 : if dir != nil {
233 0 : _ = dir.Close()
234 0 : }
235 1 : if ckErr != nil {
236 0 : // Attempt to cleanup on error.
237 0 : _ = fs.RemoveAll(destDir)
238 0 : }
239 : }()
240 1 : dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
241 1 : if ckErr != nil {
242 0 : return ckErr
243 0 : }
244 :
245 1 : {
246 1 : // Copy the OPTIONS.
247 1 : srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeOptions, optionsFileNum)
248 1 : destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
249 1 : ckErr = copyCheckpointOptions(fs, srcPath, destPath)
250 1 : if ckErr != nil {
251 0 : return ckErr
252 0 : }
253 : }
254 :
255 1 : {
256 1 : // Set the format major version in the destination directory.
257 1 : var versionMarker *atomicfs.Marker
258 1 : versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
259 1 : if ckErr != nil {
260 0 : return ckErr
261 0 : }
262 :
263 : // We use the marker to encode the active format version in the
264 : // marker filename. Unlike other uses of the atomic marker,
265 : // there is no file with the filename `formatVers.String()` on
266 : // the filesystem.
267 1 : ckErr = versionMarker.Move(formatVers.String())
268 1 : if ckErr != nil {
269 0 : return ckErr
270 0 : }
271 1 : ckErr = versionMarker.Close()
272 1 : if ckErr != nil {
273 0 : return ckErr
274 0 : }
275 : }
276 :
277 1 : var excludedTables map[manifest.DeletedTableEntry]*manifest.TableMetadata
278 1 : var includedBlobFiles map[base.BlobFileID]struct{}
279 1 : var remoteFiles []base.DiskFileNum
280 1 : // Set of TableBacking.DiskFileNum which will be required by virtual sstables
281 1 : // in the checkpoint.
282 1 : requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
283 1 :
284 1 : copyFile := func(typ base.FileType, fileNum base.DiskFileNum) error {
285 1 : meta, err := d.objProvider.Lookup(typ, fileNum)
286 1 : if err != nil {
287 0 : return err
288 0 : }
289 1 : if meta.IsRemote() {
290 1 : // We don't copy remote files. This is desirable as checkpointing is
291 1 : // supposed to be a fast operation, and references to remote files can
292 1 : // always be resolved by any checkpoint readers by reading the object
293 1 : // catalog. We don't add this file to excludedFiles either, as that'd
294 1 : // cause it to be deleted in the second manifest entry which is also
295 1 : // inaccurate.
296 1 : remoteFiles = append(remoteFiles, meta.DiskFileNum)
297 1 : return nil
298 1 : }
299 1 : srcPath := base.MakeFilepath(fs, d.dirname, typ, fileNum)
300 1 : destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
301 1 : return vfs.LinkOrCopy(fs, srcPath, destPath)
302 : }
303 :
304 : // Link or copy the sstables.
305 1 : for l := range current.Levels {
306 1 : iter := current.Levels[l].Iter()
307 1 : for f := iter.First(); f != nil; f = iter.Next() {
308 1 : if excludeFromCheckpoint(f, opt, d.cmp) {
309 1 : if excludedTables == nil {
310 1 : excludedTables = make(map[manifest.DeletedTableEntry]*manifest.TableMetadata)
311 1 : }
312 1 : excludedTables[manifest.DeletedTableEntry{
313 1 : Level: l,
314 1 : FileNum: f.TableNum,
315 1 : }] = f
316 1 : continue
317 : }
318 :
319 : // Copy any referenced blob files that have not already been copied.
320 1 : if len(f.BlobReferences) > 0 {
321 1 : if includedBlobFiles == nil {
322 1 : includedBlobFiles = make(map[base.BlobFileID]struct{})
323 1 : }
324 1 : for _, ref := range f.BlobReferences {
325 1 : if _, ok := includedBlobFiles[ref.FileID]; !ok {
326 1 : includedBlobFiles[ref.FileID] = struct{}{}
327 1 :
328 1 : // Map the BlobFileID to a DiskFileNum in the current version.
329 1 : diskFileNum, ok := current.BlobFiles.Lookup(ref.FileID)
330 1 : if !ok {
331 0 : return errors.Errorf("blob file %s not found", ref.FileID)
332 0 : }
333 1 : ckErr = copyFile(base.FileTypeBlob, diskFileNum)
334 1 : if ckErr != nil {
335 0 : return ckErr
336 0 : }
337 : }
338 : }
339 : }
340 :
341 1 : tableBacking := f.TableBacking
342 1 : if f.Virtual {
343 1 : if _, ok := requiredVirtualBackingFiles[tableBacking.DiskFileNum]; ok {
344 1 : continue
345 : }
346 1 : requiredVirtualBackingFiles[tableBacking.DiskFileNum] = struct{}{}
347 : }
348 1 : ckErr = copyFile(base.FileTypeTable, tableBacking.DiskFileNum)
349 1 : if ckErr != nil {
350 0 : return ckErr
351 0 : }
352 : }
353 : }
354 :
355 1 : var removeBackingTables []base.DiskFileNum
356 1 : for diskFileNum := range virtualBackingFiles {
357 1 : if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
358 1 : // The backing sstable associated with fileNum is no longer
359 1 : // required.
360 1 : removeBackingTables = append(removeBackingTables, diskFileNum)
361 1 : }
362 : }
363 : // Record the blob files that are not referenced by any included sstables.
364 : // When we write the MANIFEST of the checkpoint, we'll include a final
365 : // VersionEdit that removes these blob files so that the checkpointed
366 : // manifest is consistent.
367 1 : var excludedBlobFiles map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile
368 1 : if len(includedBlobFiles) < len(versionBlobFiles) {
369 1 : excludedBlobFiles = make(map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile, len(versionBlobFiles)-len(includedBlobFiles))
370 1 : for _, meta := range versionBlobFiles {
371 1 : if _, ok := includedBlobFiles[meta.FileID]; !ok {
372 1 : excludedBlobFiles[manifest.DeletedBlobFileEntry{
373 1 : FileID: meta.FileID,
374 1 : FileNum: meta.Physical.FileNum,
375 1 : }] = meta.Physical
376 1 : }
377 : }
378 : }
379 :
380 1 : ckErr = d.writeCheckpointManifest(
381 1 : fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
382 1 : excludedTables, removeBackingTables, excludedBlobFiles,
383 1 : )
384 1 : if ckErr != nil {
385 0 : return ckErr
386 0 : }
387 1 : if len(remoteFiles) > 0 {
388 1 : ckErr = d.objProvider.CheckpointState(fs, destDir, remoteFiles)
389 1 : if ckErr != nil {
390 0 : return ckErr
391 0 : }
392 : }
393 :
394 : // Copy the WAL files. We copy rather than link because WAL file recycling
395 : // will cause the WAL files to be reused which would invalidate the
396 : // checkpoint. It's possible allLogicalLogs includes logs that are not
397 : // relevant (beneath the version's MinUnflushedLogNum). These extra files
398 : // are harmless. The earlier (wal.Manager).List call will not include
399 : // obsolete logs that are sitting in the recycler or have already been
400 : // passed off to the cleanup manager for deletion.
401 : //
402 : // TODO(jackson): It would be desirable to copy all recycling and obsolete
403 : // WALs to aid corruption postmortem debugging should we need them.
404 1 : for _, log := range allLogicalLogs {
405 1 : for i := 0; i < log.NumSegments(); i++ {
406 1 : srcFS, srcPath := log.SegmentLocation(i)
407 1 : destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath))
408 1 : ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath)
409 1 : if ckErr != nil {
410 0 : return ckErr
411 0 : }
412 : }
413 : }
414 :
415 : // Sync and close the checkpoint directory.
416 1 : ckErr = dir.Sync()
417 1 : if ckErr != nil {
418 0 : return ckErr
419 0 : }
420 1 : ckErr = dir.Close()
421 1 : dir = nil
422 1 : return ckErr
423 : }
424 :
425 : // copyCheckpointOptions copies an OPTIONS file, commenting out some options
426 : // that existed on the original database but no longer apply to the checkpointed
427 : // database. For example, the entire [WAL Failover] stanza is commented out
428 : // because Checkpoint will copy all WAL segment files from both the primary and
429 : // secondary WAL directories into the checkpoint.
430 1 : func copyCheckpointOptions(fs vfs.FS, srcPath, dstPath string) error {
431 1 : var buf bytes.Buffer
432 1 : f, err := fs.Open(srcPath)
433 1 : if err != nil {
434 0 : return err
435 0 : }
436 1 : defer f.Close()
437 1 : b, err := io.ReadAll(f)
438 1 : if err != nil {
439 0 : return err
440 0 : }
441 : // Copy the OPTIONS file verbatim, but commenting out the [WAL Failover]
442 : // section.
443 1 : err = parseOptions(string(b), parseOptionsFuncs{
444 1 : visitNewSection: func(startOff, endOff int, section string) error {
445 1 : if section == "WAL Failover" {
446 1 : buf.WriteString("# ")
447 1 : }
448 1 : buf.Write(b[startOff:endOff])
449 1 : return nil
450 : },
451 1 : visitKeyValue: func(startOff, endOff int, section, key, value string) error {
452 1 : if section == "WAL Failover" {
453 1 : buf.WriteString("# ")
454 1 : }
455 1 : buf.Write(b[startOff:endOff])
456 1 : return nil
457 : },
458 1 : visitCommentOrWhitespace: func(startOff, endOff int, line string) error {
459 1 : buf.Write(b[startOff:endOff])
460 1 : return nil
461 1 : },
462 : })
463 1 : if err != nil {
464 0 : return err
465 0 : }
466 1 : nf, err := fs.Create(dstPath, vfs.WriteCategoryUnspecified)
467 1 : if err != nil {
468 0 : return err
469 0 : }
470 1 : _, err = io.Copy(nf, &buf)
471 1 : if err != nil {
472 0 : return err
473 0 : }
474 1 : return errors.CombineErrors(nf.Sync(), nf.Close())
475 : }
476 :
477 : func (d *DB) writeCheckpointManifest(
478 : fs vfs.FS,
479 : formatVers FormatMajorVersion,
480 : destDirPath string,
481 : destDir vfs.File,
482 : manifestFileNum base.DiskFileNum,
483 : manifestSize int64,
484 : excludedTables map[manifest.DeletedTableEntry]*manifest.TableMetadata,
485 : removeBackingTables []base.DiskFileNum,
486 : excludedBlobFiles map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile,
487 1 : ) error {
488 1 : // Copy the MANIFEST, and create a pointer to it. We copy rather
489 1 : // than link because additional version edits added to the
490 1 : // MANIFEST after we took our snapshot of the sstables will
491 1 : // reference sstables that aren't in our checkpoint. For a
492 1 : // similar reason, we need to limit how much of the MANIFEST we
493 1 : // copy.
494 1 : // If some files are excluded from the checkpoint, also append a block that
495 1 : // records those files as deleted.
496 1 : if err := func() error {
497 1 : srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeManifest, manifestFileNum)
498 1 : destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
499 1 : src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
500 1 : if err != nil {
501 0 : return err
502 0 : }
503 1 : defer src.Close()
504 1 :
505 1 : dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
506 1 : if err != nil {
507 0 : return err
508 0 : }
509 1 : defer dst.Close()
510 1 :
511 1 : // Copy all existing records. We need to copy at the record level in case we
512 1 : // need to append another record with the excluded files (we cannot simply
513 1 : // append a record after a raw data copy; see
514 1 : // https://github.com/cockroachdb/cockroach/issues/100935).
515 1 : r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
516 1 : w := record.NewWriter(dst)
517 1 : for {
518 1 : rr, err := r.Next()
519 1 : if err != nil {
520 1 : if err == io.EOF {
521 1 : break
522 : }
523 0 : return err
524 : }
525 :
526 1 : rw, err := w.Next()
527 1 : if err != nil {
528 0 : return err
529 0 : }
530 1 : if _, err := io.Copy(rw, rr); err != nil {
531 0 : return err
532 0 : }
533 : }
534 :
535 1 : if len(excludedTables) > 0 || len(excludedBlobFiles) > 0 {
536 1 : // Write out an additional VersionEdit that deletes the excluded SST files.
537 1 : ve := manifest.VersionEdit{
538 1 : DeletedTables: excludedTables,
539 1 : RemovedBackingTables: removeBackingTables,
540 1 : DeletedBlobFiles: excludedBlobFiles,
541 1 : }
542 1 :
543 1 : rw, err := w.Next()
544 1 : if err != nil {
545 0 : return err
546 0 : }
547 1 : if err := ve.Encode(rw); err != nil {
548 0 : return err
549 0 : }
550 : }
551 1 : if err := w.Close(); err != nil {
552 0 : return err
553 0 : }
554 1 : return dst.Sync()
555 0 : }(); err != nil {
556 0 : return err
557 0 : }
558 :
559 1 : var manifestMarker *atomicfs.Marker
560 1 : manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
561 1 : if err != nil {
562 0 : return err
563 0 : }
564 1 : if err := manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, manifestFileNum)); err != nil {
565 0 : return err
566 0 : }
567 1 : return manifestMarker.Close()
568 : }
|