Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "io"
9 : "os"
10 :
11 : "github.com/cockroachdb/errors"
12 : "github.com/cockroachdb/errors/oserror"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/record"
15 : "github.com/cockroachdb/pebble/vfs"
16 : "github.com/cockroachdb/pebble/vfs/atomicfs"
17 : "github.com/cockroachdb/pebble/wal"
18 : )
19 :
20 : // checkpointOptions hold the optional parameters to construct checkpoint
21 : // snapshots.
22 : type checkpointOptions struct {
23 : // flushWAL set to true will force a flush and sync of the WAL prior to
24 : // checkpointing.
25 : flushWAL bool
26 :
27 : // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
28 : restrictToSpans []CheckpointSpan
29 : }
30 :
31 : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
32 : type CheckpointOption func(*checkpointOptions)
33 :
34 : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
35 : // checkpoint. This guarantees that any writes committed before calling
36 : // DB.Checkpoint will be part of that checkpoint.
37 : //
38 : // Note that this setting can only be useful in cases when some writes are
39 : // performed with Sync = false. Otherwise, the guarantee will already be met.
40 : //
41 : // Passing this option is functionally equivalent to calling
42 : // DB.LogData(nil, Sync) right before DB.Checkpoint.
43 1 : func WithFlushedWAL() CheckpointOption {
44 1 : return func(opt *checkpointOptions) {
45 1 : opt.flushWAL = true
46 1 : }
47 : }
48 :
49 : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
50 : // that don't overlap with any of these spans are excluded from the checkpoint.
51 : //
52 : // Note that the checkpoint can still surface keys outside of these spans (from
53 : // the WAL and from SSTs that partially overlap with these spans). Moreover,
54 : // these surface keys aren't necessarily "valid" in that they could have been
55 : // modified but the SST containing the modification is excluded.
56 2 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
57 2 : return func(opt *checkpointOptions) {
58 2 : opt.restrictToSpans = spans
59 2 : }
60 : }
61 :
62 : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
63 : // End) of interest for a checkpoint.
64 : type CheckpointSpan struct {
65 : Start []byte
66 : End []byte
67 : }
68 :
69 : // excludeFromCheckpoint returns true if an SST file should be excluded from the
70 : // checkpoint because it does not overlap with the spans of interest
71 : // (opt.restrictToSpans).
72 2 : func excludeFromCheckpoint(f *fileMetadata, opt *checkpointOptions, cmp Compare) bool {
73 2 : if len(opt.restrictToSpans) == 0 {
74 2 : // Option not set; don't exclude anything.
75 2 : return false
76 2 : }
77 2 : for _, s := range opt.restrictToSpans {
78 2 : spanBounds := base.UserKeyBoundsEndExclusive(s.Start, s.End)
79 2 : if f.Overlaps(cmp, &spanBounds) {
80 2 : return false
81 2 : }
82 : }
83 : // None of the restrictToSpans overlapped; we can exclude this file.
84 2 : return true
85 : }
86 :
87 : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
88 : // Those missing parents, as well as the closest existing ancestor, are synced.
89 : // Returns a handle to the directory created at destDir.
90 2 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
91 2 : // Collect paths for all directories between destDir (excluded) and its
92 2 : // closest existing ancestor (included).
93 2 : var parentPaths []string
94 2 : for parentPath := fs.PathDir(destDir); ; parentPath = fs.PathDir(parentPath) {
95 2 : parentPaths = append(parentPaths, parentPath)
96 2 : if fs.PathDir(parentPath) == parentPath {
97 2 : break
98 : }
99 2 : _, err := fs.Stat(parentPath)
100 2 : if err == nil {
101 2 : // Exit loop at the closest existing ancestor.
102 2 : break
103 : }
104 2 : if !oserror.IsNotExist(err) {
105 0 : return nil, err
106 0 : }
107 : }
108 : // Create destDir and any of its missing parents.
109 2 : if err := fs.MkdirAll(destDir, 0755); err != nil {
110 1 : return nil, err
111 1 : }
112 : // Sync all the parent directories up to the closest existing ancestor,
113 : // included.
114 2 : for _, parentPath := range parentPaths {
115 2 : parentDir, err := fs.OpenDir(parentPath)
116 2 : if err != nil {
117 1 : return nil, err
118 1 : }
119 2 : err = parentDir.Sync()
120 2 : if err != nil {
121 1 : _ = parentDir.Close()
122 1 : return nil, err
123 1 : }
124 2 : err = parentDir.Close()
125 2 : if err != nil {
126 0 : return nil, err
127 0 : }
128 : }
129 2 : return fs.OpenDir(destDir)
130 : }
131 :
132 : // Checkpoint constructs a snapshot of the DB instance in the specified
133 : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
134 : // snapshot. Hard links will be used when possible. Beware of the significant
135 : // space overhead for a checkpoint if hard links are disabled. Also beware that
136 : // even if hard links are used, the space overhead for the checkpoint will
137 : // increase over time as the DB performs compactions.
138 : //
139 : // Note that shared files in a checkpoint could get deleted if the DB is
140 : // restarted after a checkpoint operation, as the reference for the checkpoint
141 : // is only maintained in memory. This is okay as long as users of Checkpoint
142 : // crash shortly afterwards with a "poison file" preventing further restarts.
143 : func (d *DB) Checkpoint(
144 : destDir string, opts ...CheckpointOption,
145 : ) (
146 : ckErr error, /* used in deferred cleanup */
147 2 : ) {
148 2 : opt := &checkpointOptions{}
149 2 : for _, fn := range opts {
150 2 : fn(opt)
151 2 : }
152 :
153 2 : if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
154 1 : if err == nil {
155 1 : return &os.PathError{
156 1 : Op: "checkpoint",
157 1 : Path: destDir,
158 1 : Err: oserror.ErrExist,
159 1 : }
160 1 : }
161 0 : return err
162 : }
163 :
164 2 : if opt.flushWAL && !d.opts.DisableWAL {
165 1 : // Write an empty log-data record to flush and sync the WAL.
166 1 : if err := d.LogData(nil /* data */, Sync); err != nil {
167 0 : return err
168 0 : }
169 : }
170 :
171 : // Disable file deletions.
172 2 : d.mu.Lock()
173 2 : d.disableFileDeletions()
174 2 : defer func() {
175 2 : d.mu.Lock()
176 2 : defer d.mu.Unlock()
177 2 : d.enableFileDeletions()
178 2 : }()
179 :
180 : // TODO(peter): RocksDB provides the option to roll the manifest if the
181 : // MANIFEST size is too large. Should we do this too?
182 :
183 : // Lock the manifest before getting the current version. We need the
184 : // length of the manifest that we read to match the current version that
185 : // we read, otherwise we might copy a versionEdit not reflected in the
186 : // sstables we copy/link.
187 2 : d.mu.versions.logLock()
188 2 : // Get the unflushed log files, the current version, and the current manifest
189 2 : // file number.
190 2 : memQueue := d.mu.mem.queue
191 2 : current := d.mu.versions.currentVersion()
192 2 : formatVers := d.FormatMajorVersion()
193 2 : manifestFileNum := d.mu.versions.manifestFileNum
194 2 : manifestSize := d.mu.versions.manifest.Size()
195 2 : optionsFileNum := d.optionsFileNum
196 2 :
197 2 : virtualBackingFiles := make(map[base.DiskFileNum]struct{})
198 2 : d.mu.versions.virtualBackings.ForEach(func(backing *fileBacking) {
199 2 : virtualBackingFiles[backing.DiskFileNum] = struct{}{}
200 2 : })
201 :
202 2 : queuedLogNums := make([]wal.NumWAL, 0, len(memQueue))
203 2 : for i := range memQueue {
204 2 : if logNum := memQueue[i].logNum; logNum != 0 {
205 2 : queuedLogNums = append(queuedLogNums, wal.NumWAL(logNum))
206 2 : }
207 : }
208 : // Release the manifest and DB.mu so we don't block other operations on
209 : // the database.
210 2 : d.mu.versions.logUnlock()
211 2 : d.mu.Unlock()
212 2 :
213 2 : allLogicalLogs, err := d.mu.log.manager.List()
214 2 : if err != nil {
215 0 : return err
216 0 : }
217 :
218 : // Wrap the normal filesystem with one which wraps newly created files with
219 : // vfs.NewSyncingFile.
220 2 : fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
221 2 : NoSyncOnClose: d.opts.NoSyncOnClose,
222 2 : BytesPerSync: d.opts.BytesPerSync,
223 2 : })
224 2 :
225 2 : // Create the dir and its parents (if necessary), and sync them.
226 2 : var dir vfs.File
227 2 : defer func() {
228 2 : if dir != nil {
229 0 : _ = dir.Close()
230 0 : }
231 2 : if ckErr != nil {
232 0 : // Attempt to cleanup on error.
233 0 : _ = fs.RemoveAll(destDir)
234 0 : }
235 : }()
236 2 : dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
237 2 : if ckErr != nil {
238 0 : return ckErr
239 0 : }
240 :
241 2 : {
242 2 : // Link or copy the OPTIONS.
243 2 : srcPath := base.MakeFilepath(fs, d.dirname, fileTypeOptions, optionsFileNum)
244 2 : destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
245 2 : ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
246 2 : if ckErr != nil {
247 0 : return ckErr
248 0 : }
249 : }
250 :
251 2 : {
252 2 : // Set the format major version in the destination directory.
253 2 : var versionMarker *atomicfs.Marker
254 2 : versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
255 2 : if ckErr != nil {
256 0 : return ckErr
257 0 : }
258 :
259 : // We use the marker to encode the active format version in the
260 : // marker filename. Unlike other uses of the atomic marker,
261 : // there is no file with the filename `formatVers.String()` on
262 : // the filesystem.
263 2 : ckErr = versionMarker.Move(formatVers.String())
264 2 : if ckErr != nil {
265 0 : return ckErr
266 0 : }
267 2 : ckErr = versionMarker.Close()
268 2 : if ckErr != nil {
269 0 : return ckErr
270 0 : }
271 : }
272 :
273 2 : var excludedFiles map[deletedFileEntry]*fileMetadata
274 2 : var remoteFiles []base.DiskFileNum
275 2 : // Set of FileBacking.DiskFileNum which will be required by virtual sstables
276 2 : // in the checkpoint.
277 2 : requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
278 2 : // Link or copy the sstables.
279 2 : for l := range current.Levels {
280 2 : iter := current.Levels[l].Iter()
281 2 : for f := iter.First(); f != nil; f = iter.Next() {
282 2 : if excludeFromCheckpoint(f, opt, d.cmp) {
283 2 : if excludedFiles == nil {
284 2 : excludedFiles = make(map[deletedFileEntry]*fileMetadata)
285 2 : }
286 2 : excludedFiles[deletedFileEntry{
287 2 : Level: l,
288 2 : FileNum: f.FileNum,
289 2 : }] = f
290 2 : continue
291 : }
292 :
293 2 : fileBacking := f.FileBacking
294 2 : if f.Virtual {
295 2 : if _, ok := requiredVirtualBackingFiles[fileBacking.DiskFileNum]; ok {
296 2 : continue
297 : }
298 2 : requiredVirtualBackingFiles[fileBacking.DiskFileNum] = struct{}{}
299 : }
300 2 : meta, err := d.objProvider.Lookup(fileTypeTable, fileBacking.DiskFileNum)
301 2 : if err != nil {
302 0 : ckErr = err
303 0 : return ckErr
304 0 : }
305 2 : if meta.IsRemote() {
306 1 : // We don't copy remote files. This is desirable as checkpointing is
307 1 : // supposed to be a fast operation, and references to remote files can
308 1 : // always be resolved by any checkpoint readers by reading the object
309 1 : // catalog. We don't add this file to excludedFiles either, as that'd
310 1 : // cause it to be deleted in the second manifest entry which is also
311 1 : // inaccurate.
312 1 : remoteFiles = append(remoteFiles, meta.DiskFileNum)
313 1 : continue
314 : }
315 :
316 2 : srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, fileBacking.DiskFileNum)
317 2 : destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
318 2 : ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
319 2 : if ckErr != nil {
320 0 : return ckErr
321 0 : }
322 : }
323 : }
324 :
325 2 : var removeBackingTables []base.DiskFileNum
326 2 : for diskFileNum := range virtualBackingFiles {
327 2 : if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
328 2 : // The backing sstable associated with fileNum is no longer
329 2 : // required.
330 2 : removeBackingTables = append(removeBackingTables, diskFileNum)
331 2 : }
332 : }
333 :
334 2 : ckErr = d.writeCheckpointManifest(
335 2 : fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
336 2 : excludedFiles, removeBackingTables,
337 2 : )
338 2 : if ckErr != nil {
339 0 : return ckErr
340 0 : }
341 2 : if len(remoteFiles) > 0 {
342 1 : ckErr = d.objProvider.CheckpointState(fs, destDir, fileTypeTable, remoteFiles)
343 1 : if ckErr != nil {
344 0 : return ckErr
345 0 : }
346 : }
347 :
348 : // Copy the WAL files. We copy rather than link because WAL file recycling
349 : // will cause the WAL files to be reused which would invalidate the
350 : // checkpoint.
351 2 : for _, logNum := range queuedLogNums {
352 2 : log, ok := allLogicalLogs.Get(logNum)
353 2 : if !ok {
354 0 : return errors.Newf("log %s not found", logNum)
355 0 : }
356 2 : for i := 0; i < log.NumSegments(); i++ {
357 2 : srcFS, srcPath := log.SegmentLocation(i)
358 2 : destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath))
359 2 : ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath)
360 2 : if ckErr != nil {
361 0 : return ckErr
362 0 : }
363 : }
364 : }
365 :
366 : // Sync and close the checkpoint directory.
367 2 : ckErr = dir.Sync()
368 2 : if ckErr != nil {
369 0 : return ckErr
370 0 : }
371 2 : ckErr = dir.Close()
372 2 : dir = nil
373 2 : return ckErr
374 : }
375 :
376 : func (d *DB) writeCheckpointManifest(
377 : fs vfs.FS,
378 : formatVers FormatMajorVersion,
379 : destDirPath string,
380 : destDir vfs.File,
381 : manifestFileNum base.DiskFileNum,
382 : manifestSize int64,
383 : excludedFiles map[deletedFileEntry]*fileMetadata,
384 : removeBackingTables []base.DiskFileNum,
385 2 : ) error {
386 2 : // Copy the MANIFEST, and create a pointer to it. We copy rather
387 2 : // than link because additional version edits added to the
388 2 : // MANIFEST after we took our snapshot of the sstables will
389 2 : // reference sstables that aren't in our checkpoint. For a
390 2 : // similar reason, we need to limit how much of the MANIFEST we
391 2 : // copy.
392 2 : // If some files are excluded from the checkpoint, also append a block that
393 2 : // records those files as deleted.
394 2 : if err := func() error {
395 2 : srcPath := base.MakeFilepath(fs, d.dirname, fileTypeManifest, manifestFileNum)
396 2 : destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
397 2 : src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
398 2 : if err != nil {
399 0 : return err
400 0 : }
401 2 : defer src.Close()
402 2 :
403 2 : dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
404 2 : if err != nil {
405 0 : return err
406 0 : }
407 2 : defer dst.Close()
408 2 :
409 2 : // Copy all existing records. We need to copy at the record level in case we
410 2 : // need to append another record with the excluded files (we cannot simply
411 2 : // append a record after a raw data copy; see
412 2 : // https://github.com/cockroachdb/cockroach/issues/100935).
413 2 : r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
414 2 : w := record.NewWriter(dst)
415 2 : for {
416 2 : rr, err := r.Next()
417 2 : if err != nil {
418 2 : if err == io.EOF {
419 2 : break
420 : }
421 0 : return err
422 : }
423 :
424 2 : rw, err := w.Next()
425 2 : if err != nil {
426 0 : return err
427 0 : }
428 2 : if _, err := io.Copy(rw, rr); err != nil {
429 0 : return err
430 0 : }
431 : }
432 :
433 2 : if len(excludedFiles) > 0 {
434 2 : // Write out an additional VersionEdit that deletes the excluded SST files.
435 2 : ve := versionEdit{
436 2 : DeletedFiles: excludedFiles,
437 2 : RemovedBackingTables: removeBackingTables,
438 2 : }
439 2 :
440 2 : rw, err := w.Next()
441 2 : if err != nil {
442 0 : return err
443 0 : }
444 2 : if err := ve.Encode(rw); err != nil {
445 0 : return err
446 0 : }
447 : }
448 2 : if err := w.Close(); err != nil {
449 0 : return err
450 0 : }
451 2 : return dst.Sync()
452 0 : }(); err != nil {
453 0 : return err
454 0 : }
455 :
456 2 : var manifestMarker *atomicfs.Marker
457 2 : manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
458 2 : if err != nil {
459 0 : return err
460 0 : }
461 2 : if err := manifestMarker.Move(base.MakeFilename(fileTypeManifest, manifestFileNum)); err != nil {
462 0 : return err
463 0 : }
464 2 : return manifestMarker.Close()
465 : }
|