Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "io"
11 : "sync"
12 : "sync/atomic"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/errors/oserror"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/invariants"
18 : "github.com/cockroachdb/pebble/internal/manifest"
19 : "github.com/cockroachdb/pebble/record"
20 : "github.com/cockroachdb/pebble/vfs"
21 : "github.com/cockroachdb/pebble/vfs/atomicfs"
22 : )
23 :
24 : const numLevels = manifest.NumLevels
25 :
26 : const manifestMarkerName = `manifest`
27 :
28 : // Provide type aliases for the various manifest structs.
29 : type bulkVersionEdit = manifest.BulkVersionEdit
30 : type deletedFileEntry = manifest.DeletedFileEntry
31 : type fileMetadata = manifest.FileMetadata
32 : type physicalMeta = manifest.PhysicalFileMeta
33 : type virtualMeta = manifest.VirtualFileMeta
34 : type fileBacking = manifest.FileBacking
35 : type newFileEntry = manifest.NewFileEntry
36 : type version = manifest.Version
37 : type versionEdit = manifest.VersionEdit
38 : type versionList = manifest.VersionList
39 :
40 : // versionSet manages a collection of immutable versions, and manages the
41 : // creation of a new version from the most recent version. A new version is
42 : // created from an existing version by applying a version edit which is just
43 : // like it sounds: a delta from the previous version. Version edits are logged
44 : // to the MANIFEST file, which is replayed at startup.
45 : type versionSet struct {
46 : // Next seqNum to use for WAL writes.
47 : logSeqNum atomic.Uint64
48 :
49 : // The upper bound on sequence numbers that have been assigned so far. A
50 : // suffix of these sequence numbers may not have been written to a WAL. Both
51 : // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
52 : // visibleSeqNum is <= logSeqNum.
53 : visibleSeqNum atomic.Uint64
54 :
55 : // Number of bytes present in sstables being written by in-progress
56 : // compactions. This value will be zero if there are no in-progress
57 : // compactions. Updated and read atomically.
58 : atomicInProgressBytes atomic.Int64
59 :
60 : // Immutable fields.
61 : dirname string
62 : // Set to DB.mu.
63 : mu *sync.Mutex
64 : opts *Options
65 : fs vfs.FS
66 : cmp Compare
67 : cmpName string
68 : // Dynamic base level allows the dynamic base level computation to be
69 : // disabled. Used by tests which want to create specific LSM structures.
70 : dynamicBaseLevel bool
71 :
72 : // Mutable fields.
73 : versions versionList
74 : picker compactionPicker
75 :
76 : metrics Metrics
77 :
78 : // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
79 : // on the creation of every version.
80 : obsoleteFn func(obsolete []*fileBacking)
81 : obsoleteTables []fileInfo
82 : obsoleteManifests []fileInfo
83 : obsoleteOptions []fileInfo
84 :
85 : // Zombie tables which have been removed from the current version but are
86 : // still referenced by an inuse iterator.
87 : zombieTables map[base.DiskFileNum]uint64 // filenum -> size
88 :
89 : // backingState is protected by the versionSet.logLock. It's populated
90 : // during Open in versionSet.load, but it's not used concurrently during
91 : // load.
92 : backingState struct {
93 : // fileBackingMap is a map for the FileBacking which is supporting virtual
94 : // sstables in the latest version. Once the file backing is backing no
95 : // virtual sstables in the latest version, it is removed from this map and
96 : // the corresponding state is added to the zombieTables map. Note that we
97 : // don't keep track of file backing which supports a virtual sstable
98 : // which is not in the latest version.
99 : fileBackingMap map[base.DiskFileNum]*fileBacking
100 : // fileBackingSize is the sum of the sizes of the fileBackings in the
101 : // fileBackingMap.
102 : fileBackingSize uint64
103 : }
104 :
105 : // minUnflushedLogNum is the smallest WAL log file number corresponding to
106 : // mutations that have not been flushed to an sstable.
107 : minUnflushedLogNum base.DiskFileNum
108 :
109 : // The next file number. A single counter is used to assign file
110 : // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
111 : nextFileNum uint64
112 :
113 : // The current manifest file number.
114 : manifestFileNum base.DiskFileNum
115 : manifestMarker *atomicfs.Marker
116 :
117 : manifestFile vfs.File
118 : manifest *record.Writer
119 : setCurrent func(base.DiskFileNum) error
120 : getFormatMajorVersion func() FormatMajorVersion
121 :
122 : writing bool
123 : writerCond sync.Cond
124 : // State for deciding when to write a snapshot. Protected by mu.
125 : rotationHelper record.RotationHelper
126 : }
127 :
128 : func (vs *versionSet) init(
129 : dirname string,
130 : opts *Options,
131 : marker *atomicfs.Marker,
132 : setCurrent func(base.DiskFileNum) error,
133 : getFMV func() FormatMajorVersion,
134 : mu *sync.Mutex,
135 1 : ) {
136 1 : vs.dirname = dirname
137 1 : vs.mu = mu
138 1 : vs.writerCond.L = mu
139 1 : vs.opts = opts
140 1 : vs.fs = opts.FS
141 1 : vs.cmp = opts.Comparer.Compare
142 1 : vs.cmpName = opts.Comparer.Name
143 1 : vs.dynamicBaseLevel = true
144 1 : vs.versions.Init(mu)
145 1 : vs.obsoleteFn = vs.addObsoleteLocked
146 1 : vs.zombieTables = make(map[base.DiskFileNum]uint64)
147 1 : vs.backingState.fileBackingMap = make(map[base.DiskFileNum]*fileBacking)
148 1 : vs.backingState.fileBackingSize = 0
149 1 : vs.nextFileNum = 1
150 1 : vs.manifestMarker = marker
151 1 : vs.setCurrent = setCurrent
152 1 : vs.getFormatMajorVersion = getFMV
153 1 : }
154 :
155 : // create creates a version set for a fresh DB.
156 : func (vs *versionSet) create(
157 : jobID int,
158 : dirname string,
159 : opts *Options,
160 : marker *atomicfs.Marker,
161 : setCurrent func(base.DiskFileNum) error,
162 : getFormatMajorVersion func() FormatMajorVersion,
163 : mu *sync.Mutex,
164 1 : ) error {
165 1 : vs.init(dirname, opts, marker, setCurrent, getFormatMajorVersion, mu)
166 1 : newVersion := &version{}
167 1 : vs.append(newVersion)
168 1 : var err error
169 1 :
170 1 : vs.picker = newCompactionPicker(newVersion, vs.opts, nil)
171 1 : // Note that a "snapshot" version edit is written to the manifest when it is
172 1 : // created.
173 1 : vs.manifestFileNum = vs.getNextDiskFileNum()
174 1 : err = vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum)
175 1 : if err == nil {
176 1 : if err = vs.manifest.Flush(); err != nil {
177 1 : vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
178 1 : }
179 : }
180 1 : if err == nil {
181 1 : if err = vs.manifestFile.Sync(); err != nil {
182 1 : vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
183 1 : }
184 : }
185 1 : if err == nil {
186 1 : // NB: setCurrent is responsible for syncing the data directory.
187 1 : if err = vs.setCurrent(vs.manifestFileNum); err != nil {
188 1 : vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
189 1 : }
190 : }
191 :
192 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
193 1 : JobID: jobID,
194 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, vs.manifestFileNum),
195 1 : FileNum: vs.manifestFileNum,
196 1 : Err: err,
197 1 : })
198 1 : if err != nil {
199 1 : return err
200 1 : }
201 1 : return nil
202 : }
203 :
204 : // load loads the version set from the manifest file.
205 : func (vs *versionSet) load(
206 : dirname string,
207 : opts *Options,
208 : manifestFileNum base.DiskFileNum,
209 : marker *atomicfs.Marker,
210 : setCurrent func(base.DiskFileNum) error,
211 : getFormatMajorVersion func() FormatMajorVersion,
212 : mu *sync.Mutex,
213 1 : ) error {
214 1 : vs.init(dirname, opts, marker, setCurrent, getFormatMajorVersion, mu)
215 1 :
216 1 : vs.manifestFileNum = manifestFileNum
217 1 : manifestPath := base.MakeFilepath(opts.FS, dirname, fileTypeManifest, vs.manifestFileNum)
218 1 : manifestFilename := opts.FS.PathBase(manifestPath)
219 1 :
220 1 : // Read the versionEdits in the manifest file.
221 1 : var bve bulkVersionEdit
222 1 : bve.AddedByFileNum = make(map[base.FileNum]*fileMetadata)
223 1 : manifest, err := vs.fs.Open(manifestPath)
224 1 : if err != nil {
225 0 : return errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
226 0 : errors.Safe(manifestFilename), dirname)
227 0 : }
228 1 : defer manifest.Close()
229 1 : rr := record.NewReader(manifest, 0 /* logNum */)
230 1 : for {
231 1 : r, err := rr.Next()
232 1 : if err == io.EOF || record.IsInvalidRecord(err) {
233 1 : break
234 : }
235 1 : if err != nil {
236 0 : return errors.Wrapf(err, "pebble: error when loading manifest file %q",
237 0 : errors.Safe(manifestFilename))
238 0 : }
239 1 : var ve versionEdit
240 1 : err = ve.Decode(r)
241 1 : if err != nil {
242 0 : // Break instead of returning an error if the record is corrupted
243 0 : // or invalid.
244 0 : if err == io.EOF || record.IsInvalidRecord(err) {
245 0 : break
246 : }
247 0 : return err
248 : }
249 1 : if ve.ComparerName != "" {
250 1 : if ve.ComparerName != vs.cmpName {
251 1 : return errors.Errorf("pebble: manifest file %q for DB %q: "+
252 1 : "comparer name from file %q != comparer name from Options %q",
253 1 : errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(vs.cmpName))
254 1 : }
255 : }
256 1 : if err := bve.Accumulate(&ve); err != nil {
257 0 : return err
258 0 : }
259 1 : if ve.MinUnflushedLogNum != 0 {
260 1 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
261 1 : }
262 1 : if ve.NextFileNum != 0 {
263 1 : vs.nextFileNum = ve.NextFileNum
264 1 : }
265 1 : if ve.LastSeqNum != 0 {
266 1 : // logSeqNum is the _next_ sequence number that will be assigned,
267 1 : // while LastSeqNum is the last assigned sequence number. Note that
268 1 : // this behaviour mimics that in RocksDB; the first sequence number
269 1 : // assigned is one greater than the one present in the manifest
270 1 : // (assuming no WALs contain higher sequence numbers than the
271 1 : // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
272 1 : // next sequence number that will be assigned.
273 1 : //
274 1 : // If LastSeqNum is less than SeqNumStart, increase it to at least
275 1 : // SeqNumStart to leave ample room for reserved sequence numbers.
276 1 : if ve.LastSeqNum+1 < base.SeqNumStart {
277 1 : vs.logSeqNum.Store(base.SeqNumStart)
278 1 : } else {
279 1 : vs.logSeqNum.Store(ve.LastSeqNum + 1)
280 1 : }
281 : }
282 : }
283 : // We have already set vs.nextFileNum = 2 at the beginning of the
284 : // function and could have only updated it to some other non-zero value,
285 : // so it cannot be 0 here.
286 1 : if vs.minUnflushedLogNum == 0 {
287 1 : if vs.nextFileNum >= 2 {
288 1 : // We either have a freshly created DB, or a DB created by RocksDB
289 1 : // that has not had a single flushed SSTable yet. This is because
290 1 : // RocksDB bumps up nextFileNum in this case without bumping up
291 1 : // minUnflushedLogNum, even if WALs with non-zero file numbers are
292 1 : // present in the directory.
293 1 : } else {
294 0 : return base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
295 0 : errors.Safe(manifestFilename), dirname)
296 0 : }
297 : }
298 1 : vs.markFileNumUsed(vs.minUnflushedLogNum)
299 1 :
300 1 : // Populate the fileBackingMap and the FileBacking for virtual sstables since
301 1 : // we have finished version edit accumulation.
302 1 : for _, s := range bve.AddedFileBacking {
303 1 : vs.addFileBacking(s)
304 1 : }
305 :
306 1 : for _, fileNum := range bve.RemovedFileBacking {
307 1 : vs.removeFileBacking(fileNum)
308 1 : }
309 :
310 1 : newVersion, err := bve.Apply(
311 1 : nil, vs.cmp, opts.Comparer.FormatKey, opts.FlushSplitBytes,
312 1 : opts.Experimental.ReadCompactionRate, nil, /* zombies */
313 1 : )
314 1 : if err != nil {
315 0 : return err
316 0 : }
317 1 : newVersion.L0Sublevels.InitCompactingFileInfo(nil /* in-progress compactions */)
318 1 : vs.append(newVersion)
319 1 :
320 1 : for i := range vs.metrics.Levels {
321 1 : l := &vs.metrics.Levels[i]
322 1 : l.NumFiles = int64(newVersion.Levels[i].Len())
323 1 : files := newVersion.Levels[i].Slice()
324 1 : l.Size = int64(files.SizeSum())
325 1 : }
326 :
327 1 : vs.picker = newCompactionPicker(newVersion, vs.opts, nil)
328 1 : return nil
329 : }
330 :
331 1 : func (vs *versionSet) close() error {
332 1 : if vs.manifestFile != nil {
333 1 : if err := vs.manifestFile.Close(); err != nil {
334 0 : return err
335 0 : }
336 : }
337 1 : if vs.manifestMarker != nil {
338 1 : if err := vs.manifestMarker.Close(); err != nil {
339 0 : return err
340 0 : }
341 : }
342 1 : return nil
343 : }
344 :
345 : // logLock locks the manifest for writing. The lock must be released by either
346 : // a call to logUnlock or logAndApply.
347 : //
348 : // DB.mu must be held when calling this method, but the mutex may be dropped and
349 : // re-acquired during the course of this method.
350 1 : func (vs *versionSet) logLock() {
351 1 : // Wait for any existing writing to the manifest to complete, then mark the
352 1 : // manifest as busy.
353 1 : for vs.writing {
354 1 : vs.writerCond.Wait()
355 1 : }
356 1 : vs.writing = true
357 : }
358 :
359 : // logUnlock releases the lock for manifest writing.
360 : //
361 : // DB.mu must be held when calling this method.
362 1 : func (vs *versionSet) logUnlock() {
363 1 : if !vs.writing {
364 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
365 0 : }
366 1 : vs.writing = false
367 1 : vs.writerCond.Signal()
368 : }
369 :
370 : // Only call if the DiskFileNum doesn't exist in the fileBackingMap.
371 1 : func (vs *versionSet) addFileBacking(backing *manifest.FileBacking) {
372 1 : _, ok := vs.backingState.fileBackingMap[backing.DiskFileNum]
373 1 : if ok {
374 0 : panic("pebble: trying to add an existing file backing")
375 : }
376 1 : vs.backingState.fileBackingMap[backing.DiskFileNum] = backing
377 1 : vs.backingState.fileBackingSize += backing.Size
378 : }
379 :
380 : // Only call if the the DiskFileNum exists in the fileBackingMap.
381 1 : func (vs *versionSet) removeFileBacking(dfn base.DiskFileNum) {
382 1 : backing, ok := vs.backingState.fileBackingMap[dfn]
383 1 : if !ok {
384 0 : panic("pebble: trying to remove an unknown file backing")
385 : }
386 1 : delete(vs.backingState.fileBackingMap, dfn)
387 1 : vs.backingState.fileBackingSize -= backing.Size
388 : }
389 :
390 : // logAndApply logs the version edit to the manifest, applies the version edit
391 : // to the current version, and installs the new version.
392 : //
393 : // DB.mu must be held when calling this method and will be released temporarily
394 : // while performing file I/O. Requires that the manifest is locked for writing
395 : // (see logLock). Will unconditionally release the manifest lock (via
396 : // logUnlock) even if an error occurs.
397 : //
398 : // inProgressCompactions is called while DB.mu is held, to get the list of
399 : // in-progress compactions.
400 : func (vs *versionSet) logAndApply(
401 : jobID int,
402 : ve *versionEdit,
403 : metrics map[int]*LevelMetrics,
404 : forceRotation bool,
405 : inProgressCompactions func() []compactionInfo,
406 1 : ) error {
407 1 : if !vs.writing {
408 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
409 0 : }
410 1 : defer vs.logUnlock()
411 1 :
412 1 : if ve.MinUnflushedLogNum != 0 {
413 1 : if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
414 1 : vs.nextFileNum <= uint64(ve.MinUnflushedLogNum) {
415 0 : panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
416 0 : ve.MinUnflushedLogNum))
417 : }
418 : }
419 :
420 : // This is the next manifest filenum, but if the current file is too big we
421 : // will write this ve to the next file which means what ve encodes is the
422 : // current filenum and not the next one.
423 : //
424 : // TODO(sbhola): figure out why this is correct and update comment.
425 1 : ve.NextFileNum = vs.nextFileNum
426 1 :
427 1 : // LastSeqNum is set to the current upper bound on the assigned sequence
428 1 : // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
429 1 : // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
430 1 : // replay. It must be higher than or equal to any than any sequence number
431 1 : // written to an sstable, including sequence numbers in ingested files.
432 1 : // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
433 1 : // number. This is fallout from ingestion which allows a sequence number X to
434 1 : // be assigned to an ingested sstable even though sequence number X-1 resides
435 1 : // in an unflushed memtable. logSeqNum is the _next_ sequence number that
436 1 : // will be assigned, so subtract that by 1 to get the upper bound on the
437 1 : // last assigned sequence number.
438 1 : logSeqNum := vs.logSeqNum.Load()
439 1 : ve.LastSeqNum = logSeqNum - 1
440 1 : if logSeqNum == 0 {
441 0 : // logSeqNum is initialized to 1 in Open() if there are no previous WAL
442 0 : // or manifest records, so this case should never happen.
443 0 : vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
444 0 : }
445 :
446 1 : currentVersion := vs.currentVersion()
447 1 : var newVersion *version
448 1 :
449 1 : // Generate a new manifest if we don't currently have one, or forceRotation
450 1 : // is true, or the current one is too large.
451 1 : //
452 1 : // For largeness, we do not exclusively use MaxManifestFileSize size
453 1 : // threshold since we have had incidents where due to either large keys or
454 1 : // large numbers of files, each edit results in a snapshot + write of the
455 1 : // edit. This slows the system down since each flush or compaction is
456 1 : // writing a new manifest snapshot. The primary goal of the size-based
457 1 : // rollover logic is to ensure that when reopening a DB, the number of edits
458 1 : // that need to be replayed on top of the snapshot is "sane". Rolling over
459 1 : // to a new manifest after each edit is not relevant to that goal.
460 1 : //
461 1 : // Consider the following cases:
462 1 : // - The number of live files F in the DB is roughly stable: after writing
463 1 : // the snapshot (with F files), say we require that there be enough edits
464 1 : // such that the cumulative number of files in those edits, E, be greater
465 1 : // than F. This will ensure that the total amount of time in logAndApply
466 1 : // that is spent in snapshot writing is ~50%.
467 1 : //
468 1 : // - The number of live files F in the DB is shrinking drastically, say from
469 1 : // F to F/10: This can happen for various reasons, like wide range
470 1 : // tombstones, or large numbers of smaller than usual files that are being
471 1 : // merged together into larger files. And say the new files generated
472 1 : // during this shrinkage is insignificant compared to F/10, and so for
473 1 : // this example we will assume it is effectively 0. After this shrinking,
474 1 : // E = 0.9F, and so if we used the previous snapshot file count, F, as the
475 1 : // threshold that needs to be exceeded, we will further delay the snapshot
476 1 : // writing. Which means on DB reopen we will need to replay 0.9F edits to
477 1 : // get to a version with 0.1F files. It would be better to create a new
478 1 : // snapshot when E exceeds the number of files in the current version.
479 1 : //
480 1 : // - The number of live files F in the DB is growing via perfect ingests
481 1 : // into L6: Say we wrote the snapshot when there were F files and now we
482 1 : // have 10F files, so E = 9F. We will further delay writing a new
483 1 : // snapshot. This case can be critiqued as contrived, but we consider it
484 1 : // nonetheless.
485 1 : //
486 1 : // The logic below uses the min of the last snapshot file count and the file
487 1 : // count in the current version.
488 1 : vs.rotationHelper.AddRecord(int64(len(ve.DeletedFiles) + len(ve.NewFiles)))
489 1 : sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
490 1 : requireRotation := forceRotation || vs.manifest == nil
491 1 :
492 1 : var nextSnapshotFilecount int64
493 1 : for i := range vs.metrics.Levels {
494 1 : nextSnapshotFilecount += vs.metrics.Levels[i].NumFiles
495 1 : }
496 1 : if sizeExceeded && !requireRotation {
497 1 : requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
498 1 : }
499 1 : var newManifestFileNum base.DiskFileNum
500 1 : var prevManifestFileSize uint64
501 1 : if requireRotation {
502 1 : newManifestFileNum = vs.getNextDiskFileNum()
503 1 : prevManifestFileSize = uint64(vs.manifest.Size())
504 1 : }
505 :
506 : // Grab certain values before releasing vs.mu, in case createManifest() needs
507 : // to be called.
508 1 : minUnflushedLogNum := vs.minUnflushedLogNum
509 1 : nextFileNum := vs.nextFileNum
510 1 :
511 1 : var zombies map[base.DiskFileNum]uint64
512 1 : if err := func() error {
513 1 : vs.mu.Unlock()
514 1 : defer vs.mu.Lock()
515 1 :
516 1 : var err error
517 1 : if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
518 0 : return errors.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
519 0 : }
520 1 : newVersion, zombies, err = manifest.AccumulateIncompleteAndApplySingleVE(
521 1 : ve, currentVersion, vs.cmp, vs.opts.Comparer.FormatKey,
522 1 : vs.opts.FlushSplitBytes, vs.opts.Experimental.ReadCompactionRate,
523 1 : vs.backingState.fileBackingMap, vs.addFileBacking, vs.removeFileBacking,
524 1 : )
525 1 : if err != nil {
526 0 : return errors.Wrap(err, "MANIFEST apply failed")
527 0 : }
528 :
529 1 : if newManifestFileNum != 0 {
530 1 : if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum); err != nil {
531 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
532 1 : JobID: jobID,
533 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
534 1 : FileNum: newManifestFileNum,
535 1 : Err: err,
536 1 : })
537 1 : return errors.Wrap(err, "MANIFEST create failed")
538 1 : }
539 : }
540 :
541 1 : w, err := vs.manifest.Next()
542 1 : if err != nil {
543 0 : return errors.Wrap(err, "MANIFEST next record write failed")
544 0 : }
545 :
546 : // NB: Any error from this point on is considered fatal as we don't know if
547 : // the MANIFEST write occurred or not. Trying to determine that is
548 : // fraught. Instead we rely on the standard recovery mechanism run when a
549 : // database is open. In particular, that mechanism generates a new MANIFEST
550 : // and ensures it is synced.
551 1 : if err := ve.Encode(w); err != nil {
552 0 : return errors.Wrap(err, "MANIFEST write failed")
553 0 : }
554 1 : if err := vs.manifest.Flush(); err != nil {
555 1 : return errors.Wrap(err, "MANIFEST flush failed")
556 1 : }
557 1 : if err := vs.manifestFile.Sync(); err != nil {
558 1 : return errors.Wrap(err, "MANIFEST sync failed")
559 1 : }
560 1 : if newManifestFileNum != 0 {
561 1 : // NB: setCurrent is responsible for syncing the data directory.
562 1 : if err := vs.setCurrent(newManifestFileNum); err != nil {
563 1 : return errors.Wrap(err, "MANIFEST set current failed")
564 1 : }
565 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
566 1 : JobID: jobID,
567 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
568 1 : FileNum: newManifestFileNum,
569 1 : })
570 : }
571 1 : return nil
572 1 : }(); err != nil {
573 1 : // Any error encountered during any of the operations in the previous
574 1 : // closure are considered fatal. Treating such errors as fatal is preferred
575 1 : // to attempting to unwind various file and b-tree reference counts, and
576 1 : // re-generating L0 sublevel metadata. This may change in the future, if
577 1 : // certain manifest / WAL operations become retryable. For more context, see
578 1 : // #1159 and #1792.
579 1 : vs.opts.Logger.Fatalf("%s", err)
580 1 : return err
581 1 : }
582 :
583 1 : if requireRotation {
584 1 : // Successfully rotated.
585 1 : vs.rotationHelper.Rotate(nextSnapshotFilecount)
586 1 : }
587 : // Now that DB.mu is held again, initialize compacting file info in
588 : // L0Sublevels.
589 1 : inProgress := inProgressCompactions()
590 1 :
591 1 : newVersion.L0Sublevels.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
592 1 :
593 1 : // Update the zombie tables set first, as installation of the new version
594 1 : // will unref the previous version which could result in addObsoleteLocked
595 1 : // being called.
596 1 : for fileNum, size := range zombies {
597 1 : vs.zombieTables[fileNum] = size
598 1 : }
599 :
600 : // Install the new version.
601 1 : vs.append(newVersion)
602 1 : if ve.MinUnflushedLogNum != 0 {
603 1 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
604 1 : }
605 1 : if newManifestFileNum != 0 {
606 1 : if vs.manifestFileNum != 0 {
607 1 : vs.obsoleteManifests = append(vs.obsoleteManifests, fileInfo{
608 1 : fileNum: vs.manifestFileNum,
609 1 : fileSize: prevManifestFileSize,
610 1 : })
611 1 : }
612 1 : vs.manifestFileNum = newManifestFileNum
613 : }
614 :
615 1 : for level, update := range metrics {
616 1 : vs.metrics.Levels[level].Add(update)
617 1 : }
618 1 : for i := range vs.metrics.Levels {
619 1 : l := &vs.metrics.Levels[i]
620 1 : l.NumFiles = int64(newVersion.Levels[i].Len())
621 1 : l.NumVirtualFiles = newVersion.Levels[i].NumVirtual
622 1 : l.VirtualSize = newVersion.Levels[i].VirtualSize
623 1 : l.Size = int64(newVersion.Levels[i].Size())
624 1 :
625 1 : l.Sublevels = 0
626 1 : if l.NumFiles > 0 {
627 1 : l.Sublevels = 1
628 1 : }
629 1 : if invariants.Enabled {
630 1 : levelFiles := newVersion.Levels[i].Slice()
631 1 : if size := int64(levelFiles.SizeSum()); l.Size != size {
632 0 : vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.Size, size)
633 0 : }
634 1 : if nVirtual := levelFiles.NumVirtual(); nVirtual != l.NumVirtualFiles {
635 0 : vs.opts.Logger.Fatalf(
636 0 : "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
637 0 : i, l.NumVirtualFiles, nVirtual,
638 0 : )
639 0 : }
640 1 : if vSize := levelFiles.VirtualSizeSum(); vSize != l.VirtualSize {
641 0 : vs.opts.Logger.Fatalf(
642 0 : "versionSet metrics L%d Virtual size = %d, actual size = %d",
643 0 : i, l.VirtualSize, vSize,
644 0 : )
645 0 : }
646 : }
647 : }
648 1 : vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
649 1 :
650 1 : vs.picker = newCompactionPicker(newVersion, vs.opts, inProgress)
651 1 : if !vs.dynamicBaseLevel {
652 1 : vs.picker.forceBaseLevel1()
653 1 : }
654 1 : return nil
655 : }
656 :
657 : func (vs *versionSet) incrementCompactions(
658 : kind compactionKind, extraLevels []*compactionLevel, pickerMetrics compactionPickerMetrics,
659 1 : ) {
660 1 : switch kind {
661 1 : case compactionKindDefault:
662 1 : vs.metrics.Compact.Count++
663 1 : vs.metrics.Compact.DefaultCount++
664 :
665 1 : case compactionKindFlush, compactionKindIngestedFlushable:
666 1 : vs.metrics.Flush.Count++
667 :
668 1 : case compactionKindMove:
669 1 : vs.metrics.Compact.Count++
670 1 : vs.metrics.Compact.MoveCount++
671 :
672 1 : case compactionKindDeleteOnly:
673 1 : vs.metrics.Compact.Count++
674 1 : vs.metrics.Compact.DeleteOnlyCount++
675 :
676 1 : case compactionKindElisionOnly:
677 1 : vs.metrics.Compact.Count++
678 1 : vs.metrics.Compact.ElisionOnlyCount++
679 :
680 1 : case compactionKindRead:
681 1 : vs.metrics.Compact.Count++
682 1 : vs.metrics.Compact.ReadCount++
683 :
684 1 : case compactionKindRewrite:
685 1 : vs.metrics.Compact.Count++
686 1 : vs.metrics.Compact.RewriteCount++
687 : }
688 1 : if len(extraLevels) > 0 {
689 1 : vs.metrics.Compact.MultiLevelCount++
690 1 : }
691 : }
692 :
693 1 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
694 1 : vs.atomicInProgressBytes.Add(numBytes)
695 1 : }
696 :
697 : // createManifest creates a manifest file that contains a snapshot of vs.
698 : func (vs *versionSet) createManifest(
699 : dirname string, fileNum, minUnflushedLogNum base.DiskFileNum, nextFileNum uint64,
700 1 : ) (err error) {
701 1 : var (
702 1 : filename = base.MakeFilepath(vs.fs, dirname, fileTypeManifest, fileNum)
703 1 : manifestFile vfs.File
704 1 : manifest *record.Writer
705 1 : )
706 1 : defer func() {
707 1 : if manifest != nil {
708 0 : manifest.Close()
709 0 : }
710 1 : if manifestFile != nil {
711 0 : manifestFile.Close()
712 0 : }
713 1 : if err != nil {
714 1 : vs.fs.Remove(filename)
715 1 : }
716 : }()
717 1 : manifestFile, err = vs.fs.Create(filename)
718 1 : if err != nil {
719 1 : return err
720 1 : }
721 1 : manifest = record.NewWriter(manifestFile)
722 1 :
723 1 : snapshot := versionEdit{
724 1 : ComparerName: vs.cmpName,
725 1 : }
726 1 : dedup := make(map[base.DiskFileNum]struct{})
727 1 : for level, levelMetadata := range vs.currentVersion().Levels {
728 1 : iter := levelMetadata.Iter()
729 1 : for meta := iter.First(); meta != nil; meta = iter.Next() {
730 1 : snapshot.NewFiles = append(snapshot.NewFiles, newFileEntry{
731 1 : Level: level,
732 1 : Meta: meta,
733 1 : })
734 1 : if _, ok := dedup[meta.FileBacking.DiskFileNum]; meta.Virtual && !ok {
735 1 : dedup[meta.FileBacking.DiskFileNum] = struct{}{}
736 1 : snapshot.CreatedBackingTables = append(
737 1 : snapshot.CreatedBackingTables,
738 1 : meta.FileBacking,
739 1 : )
740 1 : }
741 : }
742 : }
743 :
744 : // When creating a version snapshot for an existing DB, this snapshot VersionEdit will be
745 : // immediately followed by another VersionEdit (being written in logAndApply()). That
746 : // VersionEdit always contains a LastSeqNum, so we don't need to include that in the snapshot.
747 : // But it does not necessarily include MinUnflushedLogNum, NextFileNum, so we initialize those
748 : // using the corresponding fields in the versionSet (which came from the latest preceding
749 : // VersionEdit that had those fields).
750 1 : snapshot.MinUnflushedLogNum = minUnflushedLogNum
751 1 : snapshot.NextFileNum = nextFileNum
752 1 :
753 1 : w, err1 := manifest.Next()
754 1 : if err1 != nil {
755 0 : return err1
756 0 : }
757 1 : if err := snapshot.Encode(w); err != nil {
758 0 : return err
759 0 : }
760 :
761 1 : if vs.manifest != nil {
762 1 : vs.manifest.Close()
763 1 : vs.manifest = nil
764 1 : }
765 1 : if vs.manifestFile != nil {
766 1 : if err := vs.manifestFile.Close(); err != nil {
767 0 : return err
768 0 : }
769 1 : vs.manifestFile = nil
770 : }
771 :
772 1 : vs.manifest, manifest = manifest, nil
773 1 : vs.manifestFile, manifestFile = manifestFile, nil
774 1 : return nil
775 : }
776 :
777 1 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
778 1 : if vs.nextFileNum <= uint64(fileNum) {
779 0 : vs.nextFileNum = uint64(fileNum + 1)
780 0 : }
781 : }
782 :
783 1 : func (vs *versionSet) getNextFileNum() base.FileNum {
784 1 : x := vs.nextFileNum
785 1 : vs.nextFileNum++
786 1 : return base.FileNum(x)
787 1 : }
788 :
789 1 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
790 1 : x := vs.nextFileNum
791 1 : vs.nextFileNum++
792 1 : return base.DiskFileNum(x)
793 1 : }
794 :
795 1 : func (vs *versionSet) append(v *version) {
796 1 : if v.Refs() != 0 {
797 0 : panic("pebble: version should be unreferenced")
798 : }
799 1 : if !vs.versions.Empty() {
800 1 : vs.versions.Back().UnrefLocked()
801 1 : }
802 1 : v.Deleted = vs.obsoleteFn
803 1 : v.Ref()
804 1 : vs.versions.PushBack(v)
805 : }
806 :
807 1 : func (vs *versionSet) currentVersion() *version {
808 1 : return vs.versions.Back()
809 1 : }
810 :
811 1 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
812 1 : current := vs.currentVersion()
813 1 : for v := vs.versions.Front(); true; v = v.Next() {
814 1 : for _, lm := range v.Levels {
815 1 : iter := lm.Iter()
816 1 : for f := iter.First(); f != nil; f = iter.Next() {
817 1 : m[f.FileBacking.DiskFileNum] = struct{}{}
818 1 : }
819 : }
820 1 : if v == current {
821 1 : break
822 : }
823 : }
824 : }
825 :
826 : // addObsoleteLocked will add the fileInfo associated with obsolete backing
827 : // sstables to the obsolete tables list.
828 : //
829 : // The file backings in the obsolete list must not appear more than once.
830 : //
831 : // DB.mu must be held when addObsoleteLocked is called.
832 1 : func (vs *versionSet) addObsoleteLocked(obsolete []*fileBacking) {
833 1 : if len(obsolete) == 0 {
834 1 : return
835 1 : }
836 :
837 1 : obsoleteFileInfo := make([]fileInfo, len(obsolete))
838 1 : for i, bs := range obsolete {
839 1 : obsoleteFileInfo[i].fileNum = bs.DiskFileNum
840 1 : obsoleteFileInfo[i].fileSize = bs.Size
841 1 : }
842 :
843 1 : if invariants.Enabled {
844 1 : dedup := make(map[base.DiskFileNum]struct{})
845 1 : for _, fi := range obsoleteFileInfo {
846 1 : dedup[fi.fileNum] = struct{}{}
847 1 : }
848 1 : if len(dedup) != len(obsoleteFileInfo) {
849 0 : panic("pebble: duplicate FileBacking present in obsolete list")
850 : }
851 : }
852 :
853 1 : for _, fi := range obsoleteFileInfo {
854 1 : // Note that the obsolete tables are no longer zombie by the definition of
855 1 : // zombie, but we leave them in the zombie tables map until they are
856 1 : // deleted from disk.
857 1 : if _, ok := vs.zombieTables[fi.fileNum]; !ok {
858 0 : vs.opts.Logger.Fatalf("MANIFEST obsolete table %s not marked as zombie", fi.fileNum)
859 0 : }
860 : }
861 :
862 1 : vs.obsoleteTables = append(vs.obsoleteTables, obsoleteFileInfo...)
863 1 : vs.updateObsoleteTableMetricsLocked()
864 : }
865 :
866 : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
867 : // called.
868 1 : func (vs *versionSet) addObsolete(obsolete []*fileBacking) {
869 1 : vs.mu.Lock()
870 1 : defer vs.mu.Unlock()
871 1 : vs.addObsoleteLocked(obsolete)
872 1 : }
873 :
874 1 : func (vs *versionSet) updateObsoleteTableMetricsLocked() {
875 1 : vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
876 1 : vs.metrics.Table.ObsoleteSize = 0
877 1 : for _, fi := range vs.obsoleteTables {
878 1 : vs.metrics.Table.ObsoleteSize += fi.fileSize
879 1 : }
880 : }
881 :
882 : func setCurrentFunc(
883 : vers FormatMajorVersion, marker *atomicfs.Marker, fs vfs.FS, dirname string, dir vfs.File,
884 1 : ) func(base.DiskFileNum) error {
885 1 : if vers < formatVersionedManifestMarker {
886 1 : // Pebble versions before `formatVersionedManifestMarker` used
887 1 : // the CURRENT file to signal which MANIFEST is current. Ignore
888 1 : // the filename read during LocateMarker.
889 1 : return func(manifestFileNum base.DiskFileNum) error {
890 1 : if err := setCurrentFile(dirname, fs, manifestFileNum); err != nil {
891 1 : return err
892 1 : }
893 1 : if err := dir.Sync(); err != nil {
894 1 : // This is a panic here, rather than higher in the call
895 1 : // stack, for parity with the atomicfs.Marker behavior.
896 1 : // A panic is always necessary because failed Syncs are
897 1 : // unrecoverable.
898 1 : panic(errors.Wrap(err, "fatal: MANIFEST dirsync failed"))
899 : }
900 1 : return nil
901 : }
902 : }
903 1 : return setCurrentFuncMarker(marker, fs, dirname)
904 : }
905 :
906 : func setCurrentFuncMarker(
907 : marker *atomicfs.Marker, fs vfs.FS, dirname string,
908 1 : ) func(base.DiskFileNum) error {
909 1 : return func(manifestFileNum base.DiskFileNum) error {
910 1 : return marker.Move(base.MakeFilename(fileTypeManifest, manifestFileNum))
911 1 : }
912 : }
913 :
914 : func findCurrentManifest(
915 : vers FormatMajorVersion, fs vfs.FS, dirname string,
916 1 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
917 1 : // NB: We always locate the manifest marker, even if we might not
918 1 : // actually use it (because we're opening the database at an earlier
919 1 : // format major version that uses the CURRENT file). Locating a
920 1 : // marker should succeed even if the marker has never been placed.
921 1 : var filename string
922 1 : marker, filename, err = atomicfs.LocateMarker(fs, dirname, manifestMarkerName)
923 1 : if err != nil {
924 1 : return nil, base.FileNum(0).DiskFileNum(), false, err
925 1 : }
926 :
927 1 : if vers < formatVersionedManifestMarker {
928 1 : // Pebble versions before `formatVersionedManifestMarker` used
929 1 : // the CURRENT file to signal which MANIFEST is current. Ignore
930 1 : // the filename read during LocateMarker.
931 1 :
932 1 : manifestNum, err = readCurrentFile(fs, dirname)
933 1 : if oserror.IsNotExist(err) {
934 1 : return marker, base.FileNum(0).DiskFileNum(), false, nil
935 1 : } else if err != nil {
936 1 : return marker, base.FileNum(0).DiskFileNum(), false, err
937 1 : }
938 1 : return marker, manifestNum, true, nil
939 : }
940 :
941 : // The current format major version is >=
942 : // formatVersionedManifestMarker indicating that the
943 : // atomicfs.Marker is the source of truth on the current manifest.
944 :
945 1 : if filename == "" {
946 0 : // The marker hasn't been set yet. This database doesn't exist.
947 0 : return marker, base.FileNum(0).DiskFileNum(), false, nil
948 0 : }
949 :
950 1 : var ok bool
951 1 : _, manifestNum, ok = base.ParseFilename(fs, filename)
952 1 : if !ok {
953 0 : return marker, base.FileNum(0).DiskFileNum(), false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
954 0 : }
955 1 : return marker, manifestNum, true, nil
956 : }
957 :
958 1 : func readCurrentFile(fs vfs.FS, dirname string) (base.DiskFileNum, error) {
959 1 : // Read the CURRENT file to find the current manifest file.
960 1 : current, err := fs.Open(base.MakeFilepath(fs, dirname, fileTypeCurrent, base.FileNum(0).DiskFileNum()))
961 1 : if err != nil {
962 1 : return base.FileNum(0).DiskFileNum(), errors.Wrapf(err, "pebble: could not open CURRENT file for DB %q", dirname)
963 1 : }
964 1 : defer current.Close()
965 1 : stat, err := current.Stat()
966 1 : if err != nil {
967 0 : return base.FileNum(0).DiskFileNum(), err
968 0 : }
969 1 : n := stat.Size()
970 1 : if n == 0 {
971 0 : return base.FileNum(0).DiskFileNum(), errors.Errorf("pebble: CURRENT file for DB %q is empty", dirname)
972 0 : }
973 1 : if n > 4096 {
974 0 : return base.FileNum(0).DiskFileNum(), errors.Errorf("pebble: CURRENT file for DB %q is too large", dirname)
975 0 : }
976 1 : b := make([]byte, n)
977 1 : _, err = current.ReadAt(b, 0)
978 1 : if err != nil {
979 0 : return base.FileNum(0).DiskFileNum(), err
980 0 : }
981 1 : if b[n-1] != '\n' {
982 0 : return base.FileNum(0).DiskFileNum(), base.CorruptionErrorf("pebble: CURRENT file for DB %q is malformed", dirname)
983 0 : }
984 1 : b = bytes.TrimSpace(b)
985 1 :
986 1 : _, manifestFileNum, ok := base.ParseFilename(fs, string(b))
987 1 : if !ok {
988 0 : return base.FileNum(0).DiskFileNum(), base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(b))
989 0 : }
990 1 : return manifestFileNum, nil
991 : }
992 :
993 1 : func newFileMetrics(newFiles []manifest.NewFileEntry) map[int]*LevelMetrics {
994 1 : m := map[int]*LevelMetrics{}
995 1 : for _, nf := range newFiles {
996 1 : lm := m[nf.Level]
997 1 : if lm == nil {
998 1 : lm = &LevelMetrics{}
999 1 : m[nf.Level] = lm
1000 1 : }
1001 1 : lm.NumFiles++
1002 1 : lm.Size += int64(nf.Meta.Size)
1003 : }
1004 1 : return m
1005 : }
|