Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "io"
10 : "os"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/errors/oserror"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/record"
16 : "github.com/cockroachdb/pebble/vfs"
17 : "github.com/cockroachdb/pebble/vfs/atomicfs"
18 : )
19 :
20 : // checkpointOptions hold the optional parameters to construct checkpoint
21 : // snapshots.
22 : type checkpointOptions struct {
23 : // flushWAL set to true will force a flush and sync of the WAL prior to
24 : // checkpointing.
25 : flushWAL bool
26 :
27 : // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
28 : restrictToSpans []CheckpointSpan
29 : }
30 :
31 : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
32 : type CheckpointOption func(*checkpointOptions)
33 :
34 : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
35 : // checkpoint. This guarantees that any writes committed before calling
36 : // DB.Checkpoint will be part of that checkpoint.
37 : //
38 : // Note that this setting can only be useful in cases when some writes are
39 : // performed with Sync = false. Otherwise, the guarantee will already be met.
40 : //
41 : // Passing this option is functionally equivalent to calling
42 : // DB.LogData(nil, Sync) right before DB.Checkpoint.
43 0 : func WithFlushedWAL() CheckpointOption {
44 0 : return func(opt *checkpointOptions) {
45 0 : opt.flushWAL = true
46 0 : }
47 : }
48 :
49 : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
50 : // that don't overlap with any of these spans are excluded from the checkpoint.
51 : //
52 : // Note that the checkpoint can still surface keys outside of these spans (from
53 : // the WAL and from SSTs that partially overlap with these spans). Moreover,
54 : // these surface keys aren't necessarily "valid" in that they could have been
55 : // modified but the SST containing the modification is excluded.
56 1 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
57 1 : return func(opt *checkpointOptions) {
58 1 : opt.restrictToSpans = spans
59 1 : }
60 : }
61 :
62 : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
63 : // End) of interest for a checkpoint.
64 : type CheckpointSpan struct {
65 : Start []byte
66 : End []byte
67 : }
68 :
69 : // excludeFromCheckpoint returns true if an SST file should be excluded from the
70 : // checkpoint because it does not overlap with the spans of interest
71 : // (opt.restrictToSpans).
72 1 : func excludeFromCheckpoint(f *fileMetadata, opt *checkpointOptions, cmp Compare) bool {
73 1 : if len(opt.restrictToSpans) == 0 {
74 1 : // Option not set; don't exclude anything.
75 1 : return false
76 1 : }
77 1 : for _, s := range opt.restrictToSpans {
78 1 : spanBounds := base.UserKeyBoundsEndExclusive(s.Start, s.End)
79 1 : if f.Overlaps(cmp, &spanBounds) {
80 1 : return false
81 1 : }
82 : }
83 : // None of the restrictToSpans overlapped; we can exclude this file.
84 1 : return true
85 : }
86 :
87 : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
88 : // Those missing parents, as well as the closest existing ancestor, are synced.
89 : // Returns a handle to the directory created at destDir.
90 1 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
91 1 : // Collect paths for all directories between destDir (excluded) and its
92 1 : // closest existing ancestor (included).
93 1 : var parentPaths []string
94 1 : for parentPath := fs.PathDir(destDir); ; parentPath = fs.PathDir(parentPath) {
95 1 : parentPaths = append(parentPaths, parentPath)
96 1 : if fs.PathDir(parentPath) == parentPath {
97 1 : break
98 : }
99 1 : _, err := fs.Stat(parentPath)
100 1 : if err == nil {
101 1 : // Exit loop at the closest existing ancestor.
102 1 : break
103 : }
104 1 : if !oserror.IsNotExist(err) {
105 0 : return nil, err
106 0 : }
107 : }
108 : // Create destDir and any of its missing parents.
109 1 : if err := fs.MkdirAll(destDir, 0755); err != nil {
110 0 : return nil, err
111 0 : }
112 : // Sync all the parent directories up to the closest existing ancestor,
113 : // included.
114 1 : for _, parentPath := range parentPaths {
115 1 : parentDir, err := fs.OpenDir(parentPath)
116 1 : if err != nil {
117 0 : return nil, err
118 0 : }
119 1 : err = parentDir.Sync()
120 1 : if err != nil {
121 0 : _ = parentDir.Close()
122 0 : return nil, err
123 0 : }
124 1 : err = parentDir.Close()
125 1 : if err != nil {
126 0 : return nil, err
127 0 : }
128 : }
129 1 : return fs.OpenDir(destDir)
130 : }
131 :
132 : // Checkpoint constructs a snapshot of the DB instance in the specified
133 : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
134 : // snapshot. Hard links will be used when possible. Beware of the significant
135 : // space overhead for a checkpoint if hard links are disabled. Also beware that
136 : // even if hard links are used, the space overhead for the checkpoint will
137 : // increase over time as the DB performs compactions.
138 : //
139 : // Note that shared files in a checkpoint could get deleted if the DB is
140 : // restarted after a checkpoint operation, as the reference for the checkpoint
141 : // is only maintained in memory. This is okay as long as users of Checkpoint
142 : // crash shortly afterwards with a "poison file" preventing further restarts.
143 : func (d *DB) Checkpoint(
144 : destDir string, opts ...CheckpointOption,
145 : ) (
146 : ckErr error, /* used in deferred cleanup */
147 1 : ) {
148 1 : opt := &checkpointOptions{}
149 1 : for _, fn := range opts {
150 1 : fn(opt)
151 1 : }
152 :
153 1 : if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
154 0 : if err == nil {
155 0 : return &os.PathError{
156 0 : Op: "checkpoint",
157 0 : Path: destDir,
158 0 : Err: oserror.ErrExist,
159 0 : }
160 0 : }
161 0 : return err
162 : }
163 :
164 1 : if opt.flushWAL && !d.opts.DisableWAL {
165 0 : // Write an empty log-data record to flush and sync the WAL.
166 0 : if err := d.LogData(nil /* data */, Sync); err != nil {
167 0 : return err
168 0 : }
169 : }
170 :
171 : // Disable file deletions.
172 1 : d.mu.Lock()
173 1 : d.disableFileDeletions()
174 1 : defer func() {
175 1 : d.mu.Lock()
176 1 : defer d.mu.Unlock()
177 1 : d.enableFileDeletions()
178 1 : }()
179 :
180 : // TODO(peter): RocksDB provides the option to roll the manifest if the
181 : // MANIFEST size is too large. Should we do this too?
182 :
183 : // Lock the manifest before getting the current version. We need the
184 : // length of the manifest that we read to match the current version that
185 : // we read, otherwise we might copy a versionEdit not reflected in the
186 : // sstables we copy/link.
187 1 : d.mu.versions.logLock()
188 1 : // Get the the current version and the current manifest file number.
189 1 : current := d.mu.versions.currentVersion()
190 1 : formatVers := d.FormatMajorVersion()
191 1 : manifestFileNum := d.mu.versions.manifestFileNum
192 1 : manifestSize := d.mu.versions.manifest.Size()
193 1 : optionsFileNum := d.optionsFileNum
194 1 :
195 1 : virtualBackingFiles := make(map[base.DiskFileNum]struct{})
196 1 : d.mu.versions.virtualBackings.ForEach(func(backing *fileBacking) {
197 1 : virtualBackingFiles[backing.DiskFileNum] = struct{}{}
198 1 : })
199 :
200 : // Acquire the logs while holding mutexes to ensure we don't race with a
201 : // flush that might mark a log that's relevant to `current` as obsolete
202 : // before our call to List.
203 1 : allLogicalLogs := d.mu.log.manager.List()
204 1 :
205 1 : // Release the manifest and DB.mu so we don't block other operations on
206 1 : // the database.
207 1 : d.mu.versions.logUnlock()
208 1 : d.mu.Unlock()
209 1 :
210 1 : // Wrap the normal filesystem with one which wraps newly created files with
211 1 : // vfs.NewSyncingFile.
212 1 : fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
213 1 : NoSyncOnClose: d.opts.NoSyncOnClose,
214 1 : BytesPerSync: d.opts.BytesPerSync,
215 1 : })
216 1 :
217 1 : // Create the dir and its parents (if necessary), and sync them.
218 1 : var dir vfs.File
219 1 : defer func() {
220 1 : if dir != nil {
221 0 : _ = dir.Close()
222 0 : }
223 1 : if ckErr != nil {
224 0 : // Attempt to cleanup on error.
225 0 : _ = fs.RemoveAll(destDir)
226 0 : }
227 : }()
228 1 : dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
229 1 : if ckErr != nil {
230 0 : return ckErr
231 0 : }
232 :
233 1 : {
234 1 : // Copy the OPTIONS.
235 1 : srcPath := base.MakeFilepath(fs, d.dirname, fileTypeOptions, optionsFileNum)
236 1 : destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
237 1 : ckErr = copyCheckpointOptions(fs, srcPath, destPath)
238 1 : if ckErr != nil {
239 0 : return ckErr
240 0 : }
241 : }
242 :
243 1 : {
244 1 : // Set the format major version in the destination directory.
245 1 : var versionMarker *atomicfs.Marker
246 1 : versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
247 1 : if ckErr != nil {
248 0 : return ckErr
249 0 : }
250 :
251 : // We use the marker to encode the active format version in the
252 : // marker filename. Unlike other uses of the atomic marker,
253 : // there is no file with the filename `formatVers.String()` on
254 : // the filesystem.
255 1 : ckErr = versionMarker.Move(formatVers.String())
256 1 : if ckErr != nil {
257 0 : return ckErr
258 0 : }
259 1 : ckErr = versionMarker.Close()
260 1 : if ckErr != nil {
261 0 : return ckErr
262 0 : }
263 : }
264 :
265 1 : var excludedFiles map[deletedFileEntry]*fileMetadata
266 1 : var remoteFiles []base.DiskFileNum
267 1 : // Set of FileBacking.DiskFileNum which will be required by virtual sstables
268 1 : // in the checkpoint.
269 1 : requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
270 1 : // Link or copy the sstables.
271 1 : for l := range current.Levels {
272 1 : iter := current.Levels[l].Iter()
273 1 : for f := iter.First(); f != nil; f = iter.Next() {
274 1 : if excludeFromCheckpoint(f, opt, d.cmp) {
275 1 : if excludedFiles == nil {
276 1 : excludedFiles = make(map[deletedFileEntry]*fileMetadata)
277 1 : }
278 1 : excludedFiles[deletedFileEntry{
279 1 : Level: l,
280 1 : FileNum: f.FileNum,
281 1 : }] = f
282 1 : continue
283 : }
284 :
285 1 : fileBacking := f.FileBacking
286 1 : if f.Virtual {
287 1 : if _, ok := requiredVirtualBackingFiles[fileBacking.DiskFileNum]; ok {
288 1 : continue
289 : }
290 1 : requiredVirtualBackingFiles[fileBacking.DiskFileNum] = struct{}{}
291 : }
292 1 : meta, err := d.objProvider.Lookup(fileTypeTable, fileBacking.DiskFileNum)
293 1 : if err != nil {
294 0 : ckErr = err
295 0 : return ckErr
296 0 : }
297 1 : if meta.IsRemote() {
298 0 : // We don't copy remote files. This is desirable as checkpointing is
299 0 : // supposed to be a fast operation, and references to remote files can
300 0 : // always be resolved by any checkpoint readers by reading the object
301 0 : // catalog. We don't add this file to excludedFiles either, as that'd
302 0 : // cause it to be deleted in the second manifest entry which is also
303 0 : // inaccurate.
304 0 : remoteFiles = append(remoteFiles, meta.DiskFileNum)
305 0 : continue
306 : }
307 :
308 1 : srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, fileBacking.DiskFileNum)
309 1 : destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
310 1 : ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
311 1 : if ckErr != nil {
312 0 : return ckErr
313 0 : }
314 : }
315 : }
316 :
317 1 : var removeBackingTables []base.DiskFileNum
318 1 : for diskFileNum := range virtualBackingFiles {
319 1 : if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
320 1 : // The backing sstable associated with fileNum is no longer
321 1 : // required.
322 1 : removeBackingTables = append(removeBackingTables, diskFileNum)
323 1 : }
324 : }
325 :
326 1 : ckErr = d.writeCheckpointManifest(
327 1 : fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
328 1 : excludedFiles, removeBackingTables,
329 1 : )
330 1 : if ckErr != nil {
331 0 : return ckErr
332 0 : }
333 1 : if len(remoteFiles) > 0 {
334 0 : ckErr = d.objProvider.CheckpointState(fs, destDir, fileTypeTable, remoteFiles)
335 0 : if ckErr != nil {
336 0 : return ckErr
337 0 : }
338 : }
339 :
340 : // Copy the WAL files. We copy rather than link because WAL file recycling
341 : // will cause the WAL files to be reused which would invalidate the
342 : // checkpoint. It's possible allLogicalLogs includes logs that are not
343 : // relevant (beneath the version's MinUnflushedLogNum). These extra files
344 : // are harmless. The earlier (wal.Manager).List call will not include
345 : // obsolete logs that are sitting in the recycler or have already been
346 : // passed off to the cleanup manager for deletion.
347 : //
348 : // TODO(jackson): It would be desirable to copy all recycling and obsolete
349 : // WALs to aid corruption postmortem debugging should we need them.
350 1 : for _, log := range allLogicalLogs {
351 1 : for i := 0; i < log.NumSegments(); i++ {
352 1 : srcFS, srcPath := log.SegmentLocation(i)
353 1 : destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath))
354 1 : ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath)
355 1 : if ckErr != nil {
356 0 : return ckErr
357 0 : }
358 : }
359 : }
360 :
361 : // Sync and close the checkpoint directory.
362 1 : ckErr = dir.Sync()
363 1 : if ckErr != nil {
364 0 : return ckErr
365 0 : }
366 1 : ckErr = dir.Close()
367 1 : dir = nil
368 1 : return ckErr
369 : }
370 :
371 : // copyCheckpointOptions copies an OPTIONS file, commenting out some options
372 : // that existed on the original database but no longer apply to the checkpointed
373 : // database. For example, the entire [WAL Failover] stanza is commented out
374 : // because Checkpoint will copy all WAL segment files from both the primary and
375 : // secondary WAL directories into the checkpoint.
376 1 : func copyCheckpointOptions(fs vfs.FS, srcPath, dstPath string) error {
377 1 : var buf bytes.Buffer
378 1 : f, err := fs.Open(srcPath)
379 1 : if err != nil {
380 0 : return err
381 0 : }
382 1 : defer f.Close()
383 1 : b, err := io.ReadAll(f)
384 1 : if err != nil {
385 0 : return err
386 0 : }
387 : // Copy the OPTIONS file verbatim, but commenting out the [WAL Failover]
388 : // section.
389 1 : err = parseOptions(string(b), parseOptionsFuncs{
390 1 : visitNewSection: func(startOff, endOff int, section string) error {
391 1 : if section == "WAL Failover" {
392 1 : buf.WriteString("# ")
393 1 : }
394 1 : buf.Write(b[startOff:endOff])
395 1 : return nil
396 : },
397 1 : visitKeyValue: func(startOff, endOff int, section, key, value string) error {
398 1 : if section == "WAL Failover" {
399 1 : buf.WriteString("# ")
400 1 : }
401 1 : buf.Write(b[startOff:endOff])
402 1 : return nil
403 : },
404 1 : visitCommentOrWhitespace: func(startOff, endOff int, line string) error {
405 1 : buf.Write(b[startOff:endOff])
406 1 : return nil
407 1 : },
408 : })
409 1 : if err != nil {
410 0 : return err
411 0 : }
412 1 : nf, err := fs.Create(dstPath, vfs.WriteCategoryUnspecified)
413 1 : if err != nil {
414 0 : return err
415 0 : }
416 1 : _, err = io.Copy(nf, &buf)
417 1 : if err != nil {
418 0 : return err
419 0 : }
420 1 : return errors.CombineErrors(nf.Sync(), nf.Close())
421 : }
422 :
423 : func (d *DB) writeCheckpointManifest(
424 : fs vfs.FS,
425 : formatVers FormatMajorVersion,
426 : destDirPath string,
427 : destDir vfs.File,
428 : manifestFileNum base.DiskFileNum,
429 : manifestSize int64,
430 : excludedFiles map[deletedFileEntry]*fileMetadata,
431 : removeBackingTables []base.DiskFileNum,
432 1 : ) error {
433 1 : // Copy the MANIFEST, and create a pointer to it. We copy rather
434 1 : // than link because additional version edits added to the
435 1 : // MANIFEST after we took our snapshot of the sstables will
436 1 : // reference sstables that aren't in our checkpoint. For a
437 1 : // similar reason, we need to limit how much of the MANIFEST we
438 1 : // copy.
439 1 : // If some files are excluded from the checkpoint, also append a block that
440 1 : // records those files as deleted.
441 1 : if err := func() error {
442 1 : srcPath := base.MakeFilepath(fs, d.dirname, fileTypeManifest, manifestFileNum)
443 1 : destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
444 1 : src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
445 1 : if err != nil {
446 0 : return err
447 0 : }
448 1 : defer src.Close()
449 1 :
450 1 : dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
451 1 : if err != nil {
452 0 : return err
453 0 : }
454 1 : defer dst.Close()
455 1 :
456 1 : // Copy all existing records. We need to copy at the record level in case we
457 1 : // need to append another record with the excluded files (we cannot simply
458 1 : // append a record after a raw data copy; see
459 1 : // https://github.com/cockroachdb/cockroach/issues/100935).
460 1 : r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
461 1 : w := record.NewWriter(dst)
462 1 : for {
463 1 : rr, err := r.Next()
464 1 : if err != nil {
465 1 : if err == io.EOF {
466 1 : break
467 : }
468 0 : return err
469 : }
470 :
471 1 : rw, err := w.Next()
472 1 : if err != nil {
473 0 : return err
474 0 : }
475 1 : if _, err := io.Copy(rw, rr); err != nil {
476 0 : return err
477 0 : }
478 : }
479 :
480 1 : if len(excludedFiles) > 0 {
481 1 : // Write out an additional VersionEdit that deletes the excluded SST files.
482 1 : ve := versionEdit{
483 1 : DeletedFiles: excludedFiles,
484 1 : RemovedBackingTables: removeBackingTables,
485 1 : }
486 1 :
487 1 : rw, err := w.Next()
488 1 : if err != nil {
489 0 : return err
490 0 : }
491 1 : if err := ve.Encode(rw); err != nil {
492 0 : return err
493 0 : }
494 : }
495 1 : if err := w.Close(); err != nil {
496 0 : return err
497 0 : }
498 1 : return dst.Sync()
499 0 : }(); err != nil {
500 0 : return err
501 0 : }
502 :
503 1 : var manifestMarker *atomicfs.Marker
504 1 : manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
505 1 : if err != nil {
506 0 : return err
507 0 : }
508 1 : if err := manifestMarker.Move(base.MakeFilename(fileTypeManifest, manifestFileNum)); err != nil {
509 0 : return err
510 0 : }
511 1 : return manifestMarker.Close()
512 : }
|