Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "io"
9 : "os"
10 :
11 : "github.com/cockroachdb/errors/oserror"
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/record"
14 : "github.com/cockroachdb/pebble/vfs"
15 : "github.com/cockroachdb/pebble/vfs/atomicfs"
16 : )
17 :
18 : // checkpointOptions hold the optional parameters to construct checkpoint
19 : // snapshots.
20 : type checkpointOptions struct {
21 : // flushWAL set to true will force a flush and sync of the WAL prior to
22 : // checkpointing.
23 : flushWAL bool
24 :
25 : // If set, any SSTs that don't overlap with these spans are excluded from a checkpoint.
26 : restrictToSpans []CheckpointSpan
27 : }
28 :
29 : // CheckpointOption set optional parameters used by `DB.Checkpoint`.
30 : type CheckpointOption func(*checkpointOptions)
31 :
32 : // WithFlushedWAL enables flushing and syncing the WAL prior to constructing a
33 : // checkpoint. This guarantees that any writes committed before calling
34 : // DB.Checkpoint will be part of that checkpoint.
35 : //
36 : // Note that this setting can only be useful in cases when some writes are
37 : // performed with Sync = false. Otherwise, the guarantee will already be met.
38 : //
39 : // Passing this option is functionally equivalent to calling
40 : // DB.LogData(nil, Sync) right before DB.Checkpoint.
41 0 : func WithFlushedWAL() CheckpointOption {
42 0 : return func(opt *checkpointOptions) {
43 0 : opt.flushWAL = true
44 0 : }
45 : }
46 :
47 : // WithRestrictToSpans specifies spans of interest for the checkpoint. Any SSTs
48 : // that don't overlap with any of these spans are excluded from the checkpoint.
49 : //
50 : // Note that the checkpoint can still surface keys outside of these spans (from
51 : // the WAL and from SSTs that partially overlap with these spans). Moreover,
52 : // these surface keys aren't necessarily "valid" in that they could have been
53 : // modified but the SST containing the modification is excluded.
54 1 : func WithRestrictToSpans(spans []CheckpointSpan) CheckpointOption {
55 1 : return func(opt *checkpointOptions) {
56 1 : opt.restrictToSpans = spans
57 1 : }
58 : }
59 :
60 : // CheckpointSpan is a key range [Start, End) (inclusive on Start, exclusive on
61 : // End) of interest for a checkpoint.
62 : type CheckpointSpan struct {
63 : Start []byte
64 : End []byte
65 : }
66 :
67 : // excludeFromCheckpoint returns true if an SST file should be excluded from the
68 : // checkpoint because it does not overlap with the spans of interest
69 : // (opt.restrictToSpans).
70 1 : func excludeFromCheckpoint(f *fileMetadata, opt *checkpointOptions, cmp Compare) bool {
71 1 : if len(opt.restrictToSpans) == 0 {
72 1 : // Option not set; don't exclude anything.
73 1 : return false
74 1 : }
75 1 : for _, s := range opt.restrictToSpans {
76 1 : if f.Overlaps(cmp, s.Start, s.End, true /* exclusiveEnd */) {
77 1 : return false
78 1 : }
79 : }
80 : // None of the restrictToSpans overlapped; we can exclude this file.
81 1 : return true
82 : }
83 :
84 : // mkdirAllAndSyncParents creates destDir and any of its missing parents.
85 : // Those missing parents, as well as the closest existing ancestor, are synced.
86 : // Returns a handle to the directory created at destDir.
87 1 : func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
88 1 : // Collect paths for all directories between destDir (excluded) and its
89 1 : // closest existing ancestor (included).
90 1 : var parentPaths []string
91 1 : foundExistingAncestor := false
92 1 : for parentPath := fs.PathDir(destDir); parentPath != "."; parentPath = fs.PathDir(parentPath) {
93 1 : parentPaths = append(parentPaths, parentPath)
94 1 : _, err := fs.Stat(parentPath)
95 1 : if err == nil {
96 1 : // Exit loop at the closest existing ancestor.
97 1 : foundExistingAncestor = true
98 1 : break
99 : }
100 1 : if !oserror.IsNotExist(err) {
101 0 : return nil, err
102 0 : }
103 : }
104 : // Handle empty filesystem edge case.
105 1 : if !foundExistingAncestor {
106 0 : parentPaths = append(parentPaths, "")
107 0 : }
108 : // Create destDir and any of its missing parents.
109 1 : if err := fs.MkdirAll(destDir, 0755); err != nil {
110 0 : return nil, err
111 0 : }
112 : // Sync all the parent directories up to the closest existing ancestor,
113 : // included.
114 1 : for _, parentPath := range parentPaths {
115 1 : parentDir, err := fs.OpenDir(parentPath)
116 1 : if err != nil {
117 0 : return nil, err
118 0 : }
119 1 : err = parentDir.Sync()
120 1 : if err != nil {
121 0 : _ = parentDir.Close()
122 0 : return nil, err
123 0 : }
124 1 : err = parentDir.Close()
125 1 : if err != nil {
126 0 : return nil, err
127 0 : }
128 : }
129 1 : return fs.OpenDir(destDir)
130 : }
131 :
132 : // Checkpoint constructs a snapshot of the DB instance in the specified
133 : // directory. The WAL, MANIFEST, OPTIONS, and sstables will be copied into the
134 : // snapshot. Hard links will be used when possible. Beware of the significant
135 : // space overhead for a checkpoint if hard links are disabled. Also beware that
136 : // even if hard links are used, the space overhead for the checkpoint will
137 : // increase over time as the DB performs compactions.
138 : //
139 : // TODO(bananabrick): Test checkpointing of virtual sstables once virtual
140 : // sstables is running e2e.
141 : func (d *DB) Checkpoint(
142 : destDir string, opts ...CheckpointOption,
143 : ) (
144 : ckErr error, /* used in deferred cleanup */
145 1 : ) {
146 1 : opt := &checkpointOptions{}
147 1 : for _, fn := range opts {
148 1 : fn(opt)
149 1 : }
150 :
151 1 : if _, err := d.opts.FS.Stat(destDir); !oserror.IsNotExist(err) {
152 0 : if err == nil {
153 0 : return &os.PathError{
154 0 : Op: "checkpoint",
155 0 : Path: destDir,
156 0 : Err: oserror.ErrExist,
157 0 : }
158 0 : }
159 0 : return err
160 : }
161 :
162 1 : if opt.flushWAL && !d.opts.DisableWAL {
163 0 : // Write an empty log-data record to flush and sync the WAL.
164 0 : if err := d.LogData(nil /* data */, Sync); err != nil {
165 0 : return err
166 0 : }
167 : }
168 :
169 : // Disable file deletions.
170 1 : d.mu.Lock()
171 1 : d.disableFileDeletions()
172 1 : defer func() {
173 1 : d.mu.Lock()
174 1 : defer d.mu.Unlock()
175 1 : d.enableFileDeletions()
176 1 : }()
177 :
178 : // TODO(peter): RocksDB provides the option to roll the manifest if the
179 : // MANIFEST size is too large. Should we do this too?
180 :
181 : // Lock the manifest before getting the current version. We need the
182 : // length of the manifest that we read to match the current version that
183 : // we read, otherwise we might copy a versionEdit not reflected in the
184 : // sstables we copy/link.
185 1 : d.mu.versions.logLock()
186 1 : // Get the unflushed log files, the current version, and the current manifest
187 1 : // file number.
188 1 : memQueue := d.mu.mem.queue
189 1 : current := d.mu.versions.currentVersion()
190 1 : formatVers := d.FormatMajorVersion()
191 1 : manifestFileNum := d.mu.versions.manifestFileNum
192 1 : manifestSize := d.mu.versions.manifest.Size()
193 1 : optionsFileNum := d.optionsFileNum
194 1 : virtualBackingFiles := make(map[base.DiskFileNum]struct{})
195 1 : for diskFileNum := range d.mu.versions.fileBackingMap {
196 0 : virtualBackingFiles[diskFileNum] = struct{}{}
197 0 : }
198 : // Release the manifest and DB.mu so we don't block other operations on
199 : // the database.
200 1 : d.mu.versions.logUnlock()
201 1 : d.mu.Unlock()
202 1 :
203 1 : // Wrap the normal filesystem with one which wraps newly created files with
204 1 : // vfs.NewSyncingFile.
205 1 : fs := vfs.NewSyncingFS(d.opts.FS, vfs.SyncingFileOptions{
206 1 : NoSyncOnClose: d.opts.NoSyncOnClose,
207 1 : BytesPerSync: d.opts.BytesPerSync,
208 1 : })
209 1 :
210 1 : // Create the dir and its parents (if necessary), and sync them.
211 1 : var dir vfs.File
212 1 : defer func() {
213 1 : if dir != nil {
214 0 : _ = dir.Close()
215 0 : }
216 1 : if ckErr != nil {
217 0 : // Attempt to cleanup on error.
218 0 : _ = fs.RemoveAll(destDir)
219 0 : }
220 : }()
221 1 : dir, ckErr = mkdirAllAndSyncParents(fs, destDir)
222 1 : if ckErr != nil {
223 0 : return ckErr
224 0 : }
225 :
226 1 : {
227 1 : // Link or copy the OPTIONS.
228 1 : srcPath := base.MakeFilepath(fs, d.dirname, fileTypeOptions, optionsFileNum)
229 1 : destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
230 1 : ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
231 1 : if ckErr != nil {
232 0 : return ckErr
233 0 : }
234 : }
235 :
236 1 : {
237 1 : // Set the format major version in the destination directory.
238 1 : var versionMarker *atomicfs.Marker
239 1 : versionMarker, _, ckErr = atomicfs.LocateMarker(fs, destDir, formatVersionMarkerName)
240 1 : if ckErr != nil {
241 0 : return ckErr
242 0 : }
243 :
244 : // We use the marker to encode the active format version in the
245 : // marker filename. Unlike other uses of the atomic marker,
246 : // there is no file with the filename `formatVers.String()` on
247 : // the filesystem.
248 1 : ckErr = versionMarker.Move(formatVers.String())
249 1 : if ckErr != nil {
250 0 : return ckErr
251 0 : }
252 1 : ckErr = versionMarker.Close()
253 1 : if ckErr != nil {
254 0 : return ckErr
255 0 : }
256 : }
257 :
258 1 : var excludedFiles map[deletedFileEntry]*fileMetadata
259 1 : // Set of FileBacking.DiskFileNum which will be required by virtual sstables
260 1 : // in the checkpoint.
261 1 : requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
262 1 : // Link or copy the sstables.
263 1 : for l := range current.Levels {
264 1 : iter := current.Levels[l].Iter()
265 1 : for f := iter.First(); f != nil; f = iter.Next() {
266 1 : if excludeFromCheckpoint(f, opt, d.cmp) {
267 1 : if excludedFiles == nil {
268 1 : excludedFiles = make(map[deletedFileEntry]*fileMetadata)
269 1 : }
270 1 : excludedFiles[deletedFileEntry{
271 1 : Level: l,
272 1 : FileNum: f.FileNum,
273 1 : }] = f
274 1 : continue
275 : }
276 :
277 1 : fileBacking := f.FileBacking
278 1 : if f.Virtual {
279 0 : if _, ok := requiredVirtualBackingFiles[fileBacking.DiskFileNum]; ok {
280 0 : continue
281 : }
282 0 : requiredVirtualBackingFiles[fileBacking.DiskFileNum] = struct{}{}
283 : }
284 :
285 1 : srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, fileBacking.DiskFileNum)
286 1 : destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
287 1 : ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
288 1 : if ckErr != nil {
289 0 : return ckErr
290 0 : }
291 : }
292 : }
293 :
294 1 : var removeBackingTables []base.DiskFileNum
295 1 : for diskFileNum := range virtualBackingFiles {
296 0 : if _, ok := requiredVirtualBackingFiles[diskFileNum]; !ok {
297 0 : // The backing sstable associated with fileNum is no longer
298 0 : // required.
299 0 : removeBackingTables = append(removeBackingTables, diskFileNum)
300 0 : }
301 : }
302 :
303 1 : ckErr = d.writeCheckpointManifest(
304 1 : fs, formatVers, destDir, dir, manifestFileNum.DiskFileNum(), manifestSize,
305 1 : excludedFiles, removeBackingTables,
306 1 : )
307 1 : if ckErr != nil {
308 0 : return ckErr
309 0 : }
310 :
311 : // Copy the WAL files. We copy rather than link because WAL file recycling
312 : // will cause the WAL files to be reused which would invalidate the
313 : // checkpoint.
314 1 : for i := range memQueue {
315 1 : logNum := memQueue[i].logNum
316 1 : if logNum == 0 {
317 1 : continue
318 : }
319 1 : srcPath := base.MakeFilepath(fs, d.walDirname, fileTypeLog, logNum.DiskFileNum())
320 1 : destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
321 1 : ckErr = vfs.Copy(fs, srcPath, destPath)
322 1 : if ckErr != nil {
323 0 : return ckErr
324 0 : }
325 : }
326 :
327 : // Sync and close the checkpoint directory.
328 1 : ckErr = dir.Sync()
329 1 : if ckErr != nil {
330 0 : return ckErr
331 0 : }
332 1 : ckErr = dir.Close()
333 1 : dir = nil
334 1 : return ckErr
335 : }
336 :
337 : func (d *DB) writeCheckpointManifest(
338 : fs vfs.FS,
339 : formatVers FormatMajorVersion,
340 : destDirPath string,
341 : destDir vfs.File,
342 : manifestFileNum base.DiskFileNum,
343 : manifestSize int64,
344 : excludedFiles map[deletedFileEntry]*fileMetadata,
345 : removeBackingTables []base.DiskFileNum,
346 1 : ) error {
347 1 : // Copy the MANIFEST, and create a pointer to it. We copy rather
348 1 : // than link because additional version edits added to the
349 1 : // MANIFEST after we took our snapshot of the sstables will
350 1 : // reference sstables that aren't in our checkpoint. For a
351 1 : // similar reason, we need to limit how much of the MANIFEST we
352 1 : // copy.
353 1 : // If some files are excluded from the checkpoint, also append a block that
354 1 : // records those files as deleted.
355 1 : if err := func() error {
356 1 : srcPath := base.MakeFilepath(fs, d.dirname, fileTypeManifest, manifestFileNum)
357 1 : destPath := fs.PathJoin(destDirPath, fs.PathBase(srcPath))
358 1 : src, err := fs.Open(srcPath, vfs.SequentialReadsOption)
359 1 : if err != nil {
360 0 : return err
361 0 : }
362 1 : defer src.Close()
363 1 :
364 1 : dst, err := fs.Create(destPath)
365 1 : if err != nil {
366 0 : return err
367 0 : }
368 1 : defer dst.Close()
369 1 :
370 1 : // Copy all existing records. We need to copy at the record level in case we
371 1 : // need to append another record with the excluded files (we cannot simply
372 1 : // append a record after a raw data copy; see
373 1 : // https://github.com/cockroachdb/cockroach/issues/100935).
374 1 : r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum.FileNum())
375 1 : w := record.NewWriter(dst)
376 1 : for {
377 1 : rr, err := r.Next()
378 1 : if err != nil {
379 1 : if err == io.EOF {
380 1 : break
381 : }
382 0 : return err
383 : }
384 :
385 1 : rw, err := w.Next()
386 1 : if err != nil {
387 0 : return err
388 0 : }
389 1 : if _, err := io.Copy(rw, rr); err != nil {
390 0 : return err
391 0 : }
392 : }
393 :
394 1 : if len(excludedFiles) > 0 {
395 1 : // Write out an additional VersionEdit that deletes the excluded SST files.
396 1 : ve := versionEdit{
397 1 : DeletedFiles: excludedFiles,
398 1 : RemovedBackingTables: removeBackingTables,
399 1 : }
400 1 :
401 1 : rw, err := w.Next()
402 1 : if err != nil {
403 0 : return err
404 0 : }
405 1 : if err := ve.Encode(rw); err != nil {
406 0 : return err
407 0 : }
408 : }
409 1 : if err := w.Close(); err != nil {
410 0 : return err
411 0 : }
412 1 : return dst.Sync()
413 0 : }(); err != nil {
414 0 : return err
415 0 : }
416 :
417 : // Recent format versions use an atomic marker for setting the
418 : // active manifest. Older versions use the CURRENT file. The
419 : // setCurrentFunc function will return a closure that will
420 : // take the appropriate action for the database's format
421 : // version.
422 1 : var manifestMarker *atomicfs.Marker
423 1 : manifestMarker, _, err := atomicfs.LocateMarker(fs, destDirPath, manifestMarkerName)
424 1 : if err != nil {
425 0 : return err
426 0 : }
427 1 : if err := setCurrentFunc(formatVers, manifestMarker, fs, destDirPath, destDir)(manifestFileNum.FileNum()); err != nil {
428 0 : return err
429 0 : }
430 1 : return manifestMarker.Close()
431 : }
|