Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "sync"
11 : "sync/atomic"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : "github.com/cockroachdb/pebble/record"
18 : "github.com/cockroachdb/pebble/vfs"
19 : "github.com/cockroachdb/pebble/vfs/atomicfs"
20 : )
21 :
22 : const numLevels = manifest.NumLevels
23 :
24 : const manifestMarkerName = `manifest`
25 :
26 : // Provide type aliases for the various manifest structs.
27 : type bulkVersionEdit = manifest.BulkVersionEdit
28 : type deletedFileEntry = manifest.DeletedFileEntry
29 : type fileMetadata = manifest.FileMetadata
30 : type physicalMeta = manifest.PhysicalFileMeta
31 : type virtualMeta = manifest.VirtualFileMeta
32 : type fileBacking = manifest.FileBacking
33 : type newFileEntry = manifest.NewFileEntry
34 : type version = manifest.Version
35 : type versionEdit = manifest.VersionEdit
36 : type versionList = manifest.VersionList
37 :
38 : // versionSet manages a collection of immutable versions, and manages the
39 : // creation of a new version from the most recent version. A new version is
40 : // created from an existing version by applying a version edit which is just
41 : // like it sounds: a delta from the previous version. Version edits are logged
42 : // to the MANIFEST file, which is replayed at startup.
43 : type versionSet struct {
44 : // Next seqNum to use for WAL writes.
45 : logSeqNum atomic.Uint64
46 :
47 : // The upper bound on sequence numbers that have been assigned so far. A
48 : // suffix of these sequence numbers may not have been written to a WAL. Both
49 : // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
50 : // visibleSeqNum is <= logSeqNum.
51 : visibleSeqNum atomic.Uint64
52 :
53 : // Number of bytes present in sstables being written by in-progress
54 : // compactions. This value will be zero if there are no in-progress
55 : // compactions. Updated and read atomically.
56 : atomicInProgressBytes atomic.Int64
57 :
58 : // Immutable fields.
59 : dirname string
60 : // Set to DB.mu.
61 : mu *sync.Mutex
62 : opts *Options
63 : fs vfs.FS
64 : cmp Compare
65 : cmpName string
66 : // Dynamic base level allows the dynamic base level computation to be
67 : // disabled. Used by tests which want to create specific LSM structures.
68 : dynamicBaseLevel bool
69 :
70 : // Mutable fields.
71 : versions versionList
72 : picker compactionPicker
73 :
74 : // Not all metrics are kept here. See DB.Metrics().
75 : metrics Metrics
76 :
77 : // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
78 : // on the creation of every version.
79 : obsoleteFn func(obsolete []*fileBacking)
80 : obsoleteTables []fileInfo
81 : obsoleteManifests []fileInfo
82 : obsoleteOptions []fileInfo
83 :
84 : // Zombie tables which have been removed from the current version but are
85 : // still referenced by an inuse iterator.
86 : zombieTables map[base.DiskFileNum]uint64 // filenum -> size
87 :
88 : // backingState is protected by the versionSet.logLock. It's populated
89 : // during Open in versionSet.load, but it's not used concurrently during
90 : // load.
91 : backingState struct {
92 : // fileBackingMap is a map for the FileBacking which is supporting virtual
93 : // sstables in the latest version. Once the file backing is backing no
94 : // virtual sstables in the latest version, it is removed from this map and
95 : // the corresponding state is added to the zombieTables map. Note that we
96 : // don't keep track of file backing which supports a virtual sstable
97 : // which is not in the latest version.
98 : fileBackingMap map[base.DiskFileNum]*fileBacking
99 : // fileBackingSize is the sum of the sizes of the fileBackings in the
100 : // fileBackingMap.
101 : fileBackingSize uint64
102 : }
103 :
104 : // minUnflushedLogNum is the smallest WAL log file number corresponding to
105 : // mutations that have not been flushed to an sstable.
106 : minUnflushedLogNum base.DiskFileNum
107 :
108 : // The next file number. A single counter is used to assign file
109 : // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
110 : nextFileNum uint64
111 :
112 : // The current manifest file number.
113 : manifestFileNum base.DiskFileNum
114 : manifestMarker *atomicfs.Marker
115 :
116 : manifestFile vfs.File
117 : manifest *record.Writer
118 : getFormatMajorVersion func() FormatMajorVersion
119 :
120 : writing bool
121 : writerCond sync.Cond
122 : // State for deciding when to write a snapshot. Protected by mu.
123 : rotationHelper record.RotationHelper
124 : }
125 :
126 : func (vs *versionSet) init(
127 : dirname string,
128 : opts *Options,
129 : marker *atomicfs.Marker,
130 : getFMV func() FormatMajorVersion,
131 : mu *sync.Mutex,
132 2 : ) {
133 2 : vs.dirname = dirname
134 2 : vs.mu = mu
135 2 : vs.writerCond.L = mu
136 2 : vs.opts = opts
137 2 : vs.fs = opts.FS
138 2 : vs.cmp = opts.Comparer.Compare
139 2 : vs.cmpName = opts.Comparer.Name
140 2 : vs.dynamicBaseLevel = true
141 2 : vs.versions.Init(mu)
142 2 : vs.obsoleteFn = vs.addObsoleteLocked
143 2 : vs.zombieTables = make(map[base.DiskFileNum]uint64)
144 2 : vs.backingState.fileBackingMap = make(map[base.DiskFileNum]*fileBacking)
145 2 : vs.backingState.fileBackingSize = 0
146 2 : vs.nextFileNum = 1
147 2 : vs.manifestMarker = marker
148 2 : vs.getFormatMajorVersion = getFMV
149 2 : }
150 :
151 : // create creates a version set for a fresh DB.
152 : func (vs *versionSet) create(
153 : jobID int,
154 : dirname string,
155 : opts *Options,
156 : marker *atomicfs.Marker,
157 : getFormatMajorVersion func() FormatMajorVersion,
158 : mu *sync.Mutex,
159 2 : ) error {
160 2 : vs.init(dirname, opts, marker, getFormatMajorVersion, mu)
161 2 : newVersion := &version{}
162 2 : vs.append(newVersion)
163 2 : var err error
164 2 :
165 2 : vs.picker = newCompactionPicker(newVersion, vs.opts, nil)
166 2 : // Note that a "snapshot" version edit is written to the manifest when it is
167 2 : // created.
168 2 : vs.manifestFileNum = vs.getNextDiskFileNum()
169 2 : err = vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum)
170 2 : if err == nil {
171 2 : if err = vs.manifest.Flush(); err != nil {
172 1 : vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
173 1 : }
174 : }
175 2 : if err == nil {
176 2 : if err = vs.manifestFile.Sync(); err != nil {
177 1 : vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
178 1 : }
179 : }
180 2 : if err == nil {
181 2 : // NB: Move() is responsible for syncing the data directory.
182 2 : if err = vs.manifestMarker.Move(base.MakeFilename(fileTypeManifest, vs.manifestFileNum)); err != nil {
183 1 : vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
184 1 : }
185 : }
186 :
187 2 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
188 2 : JobID: jobID,
189 2 : Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, vs.manifestFileNum),
190 2 : FileNum: vs.manifestFileNum,
191 2 : Err: err,
192 2 : })
193 2 : if err != nil {
194 1 : return err
195 1 : }
196 2 : return nil
197 : }
198 :
199 : // load loads the version set from the manifest file.
200 : func (vs *versionSet) load(
201 : dirname string,
202 : opts *Options,
203 : manifestFileNum base.DiskFileNum,
204 : marker *atomicfs.Marker,
205 : getFormatMajorVersion func() FormatMajorVersion,
206 : mu *sync.Mutex,
207 2 : ) error {
208 2 : vs.init(dirname, opts, marker, getFormatMajorVersion, mu)
209 2 :
210 2 : vs.manifestFileNum = manifestFileNum
211 2 : manifestPath := base.MakeFilepath(opts.FS, dirname, fileTypeManifest, vs.manifestFileNum)
212 2 : manifestFilename := opts.FS.PathBase(manifestPath)
213 2 :
214 2 : // Read the versionEdits in the manifest file.
215 2 : var bve bulkVersionEdit
216 2 : bve.AddedByFileNum = make(map[base.FileNum]*fileMetadata)
217 2 : manifest, err := vs.fs.Open(manifestPath)
218 2 : if err != nil {
219 0 : return errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
220 0 : errors.Safe(manifestFilename), dirname)
221 0 : }
222 2 : defer manifest.Close()
223 2 : rr := record.NewReader(manifest, 0 /* logNum */)
224 2 : for {
225 2 : r, err := rr.Next()
226 2 : if err == io.EOF || record.IsInvalidRecord(err) {
227 2 : break
228 : }
229 2 : if err != nil {
230 0 : return errors.Wrapf(err, "pebble: error when loading manifest file %q",
231 0 : errors.Safe(manifestFilename))
232 0 : }
233 2 : var ve versionEdit
234 2 : err = ve.Decode(r)
235 2 : if err != nil {
236 0 : // Break instead of returning an error if the record is corrupted
237 0 : // or invalid.
238 0 : if err == io.EOF || record.IsInvalidRecord(err) {
239 0 : break
240 : }
241 0 : return err
242 : }
243 2 : if ve.ComparerName != "" {
244 2 : if ve.ComparerName != vs.cmpName {
245 1 : return errors.Errorf("pebble: manifest file %q for DB %q: "+
246 1 : "comparer name from file %q != comparer name from Options %q",
247 1 : errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(vs.cmpName))
248 1 : }
249 : }
250 2 : if err := bve.Accumulate(&ve); err != nil {
251 0 : return err
252 0 : }
253 2 : if ve.MinUnflushedLogNum != 0 {
254 2 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
255 2 : }
256 2 : if ve.NextFileNum != 0 {
257 2 : vs.nextFileNum = ve.NextFileNum
258 2 : }
259 2 : if ve.LastSeqNum != 0 {
260 2 : // logSeqNum is the _next_ sequence number that will be assigned,
261 2 : // while LastSeqNum is the last assigned sequence number. Note that
262 2 : // this behaviour mimics that in RocksDB; the first sequence number
263 2 : // assigned is one greater than the one present in the manifest
264 2 : // (assuming no WALs contain higher sequence numbers than the
265 2 : // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
266 2 : // next sequence number that will be assigned.
267 2 : //
268 2 : // If LastSeqNum is less than SeqNumStart, increase it to at least
269 2 : // SeqNumStart to leave ample room for reserved sequence numbers.
270 2 : if ve.LastSeqNum+1 < base.SeqNumStart {
271 0 : vs.logSeqNum.Store(base.SeqNumStart)
272 2 : } else {
273 2 : vs.logSeqNum.Store(ve.LastSeqNum + 1)
274 2 : }
275 : }
276 : }
277 : // We have already set vs.nextFileNum = 2 at the beginning of the
278 : // function and could have only updated it to some other non-zero value,
279 : // so it cannot be 0 here.
280 2 : if vs.minUnflushedLogNum == 0 {
281 0 : if vs.nextFileNum >= 2 {
282 0 : // We either have a freshly created DB, or a DB created by RocksDB
283 0 : // that has not had a single flushed SSTable yet. This is because
284 0 : // RocksDB bumps up nextFileNum in this case without bumping up
285 0 : // minUnflushedLogNum, even if WALs with non-zero file numbers are
286 0 : // present in the directory.
287 0 : } else {
288 0 : return base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
289 0 : errors.Safe(manifestFilename), dirname)
290 0 : }
291 : }
292 2 : vs.markFileNumUsed(vs.minUnflushedLogNum)
293 2 :
294 2 : // Populate the fileBackingMap and the FileBacking for virtual sstables since
295 2 : // we have finished version edit accumulation.
296 2 : for _, s := range bve.AddedFileBacking {
297 2 : vs.addFileBacking(s)
298 2 : }
299 :
300 2 : for _, fileNum := range bve.RemovedFileBacking {
301 2 : vs.removeFileBacking(fileNum)
302 2 : }
303 :
304 2 : newVersion, err := bve.Apply(
305 2 : nil, vs.cmp, opts.Comparer.FormatKey, opts.FlushSplitBytes,
306 2 : opts.Experimental.ReadCompactionRate, nil, /* zombies */
307 2 : )
308 2 : if err != nil {
309 0 : return err
310 0 : }
311 2 : newVersion.L0Sublevels.InitCompactingFileInfo(nil /* in-progress compactions */)
312 2 : vs.append(newVersion)
313 2 :
314 2 : for i := range vs.metrics.Levels {
315 2 : l := &vs.metrics.Levels[i]
316 2 : l.NumFiles = int64(newVersion.Levels[i].Len())
317 2 : files := newVersion.Levels[i].Slice()
318 2 : l.Size = int64(files.SizeSum())
319 2 : }
320 :
321 2 : vs.picker = newCompactionPicker(newVersion, vs.opts, nil)
322 2 : return nil
323 : }
324 :
325 2 : func (vs *versionSet) close() error {
326 2 : if vs.manifestFile != nil {
327 2 : if err := vs.manifestFile.Close(); err != nil {
328 0 : return err
329 0 : }
330 : }
331 2 : if vs.manifestMarker != nil {
332 2 : if err := vs.manifestMarker.Close(); err != nil {
333 0 : return err
334 0 : }
335 : }
336 2 : return nil
337 : }
338 :
339 : // logLock locks the manifest for writing. The lock must be released by either
340 : // a call to logUnlock or logAndApply.
341 : //
342 : // DB.mu must be held when calling this method, but the mutex may be dropped and
343 : // re-acquired during the course of this method.
344 2 : func (vs *versionSet) logLock() {
345 2 : // Wait for any existing writing to the manifest to complete, then mark the
346 2 : // manifest as busy.
347 2 : for vs.writing {
348 2 : vs.writerCond.Wait()
349 2 : }
350 2 : vs.writing = true
351 : }
352 :
353 : // logUnlock releases the lock for manifest writing.
354 : //
355 : // DB.mu must be held when calling this method.
356 2 : func (vs *versionSet) logUnlock() {
357 2 : if !vs.writing {
358 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
359 0 : }
360 2 : vs.writing = false
361 2 : vs.writerCond.Signal()
362 : }
363 :
364 : // Only call if the DiskFileNum doesn't exist in the fileBackingMap.
365 2 : func (vs *versionSet) addFileBacking(backing *manifest.FileBacking) {
366 2 : _, ok := vs.backingState.fileBackingMap[backing.DiskFileNum]
367 2 : if ok {
368 0 : panic("pebble: trying to add an existing file backing")
369 : }
370 2 : vs.backingState.fileBackingMap[backing.DiskFileNum] = backing
371 2 : vs.backingState.fileBackingSize += backing.Size
372 : }
373 :
374 : // Only call if the the DiskFileNum exists in the fileBackingMap.
375 2 : func (vs *versionSet) removeFileBacking(dfn base.DiskFileNum) {
376 2 : backing, ok := vs.backingState.fileBackingMap[dfn]
377 2 : if !ok {
378 0 : panic("pebble: trying to remove an unknown file backing")
379 : }
380 2 : delete(vs.backingState.fileBackingMap, dfn)
381 2 : vs.backingState.fileBackingSize -= backing.Size
382 : }
383 :
384 : // logAndApply logs the version edit to the manifest, applies the version edit
385 : // to the current version, and installs the new version.
386 : //
387 : // DB.mu must be held when calling this method and will be released temporarily
388 : // while performing file I/O. Requires that the manifest is locked for writing
389 : // (see logLock). Will unconditionally release the manifest lock (via
390 : // logUnlock) even if an error occurs.
391 : //
392 : // inProgressCompactions is called while DB.mu is held, to get the list of
393 : // in-progress compactions.
394 : func (vs *versionSet) logAndApply(
395 : jobID int,
396 : ve *versionEdit,
397 : metrics map[int]*LevelMetrics,
398 : forceRotation bool,
399 : inProgressCompactions func() []compactionInfo,
400 2 : ) error {
401 2 : if !vs.writing {
402 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
403 0 : }
404 2 : defer vs.logUnlock()
405 2 :
406 2 : if ve.MinUnflushedLogNum != 0 {
407 2 : if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
408 2 : vs.nextFileNum <= uint64(ve.MinUnflushedLogNum) {
409 0 : panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
410 0 : ve.MinUnflushedLogNum))
411 : }
412 : }
413 :
414 : // This is the next manifest filenum, but if the current file is too big we
415 : // will write this ve to the next file which means what ve encodes is the
416 : // current filenum and not the next one.
417 : //
418 : // TODO(sbhola): figure out why this is correct and update comment.
419 2 : ve.NextFileNum = vs.nextFileNum
420 2 :
421 2 : // LastSeqNum is set to the current upper bound on the assigned sequence
422 2 : // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
423 2 : // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
424 2 : // replay. It must be higher than or equal to any than any sequence number
425 2 : // written to an sstable, including sequence numbers in ingested files.
426 2 : // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
427 2 : // number. This is fallout from ingestion which allows a sequence number X to
428 2 : // be assigned to an ingested sstable even though sequence number X-1 resides
429 2 : // in an unflushed memtable. logSeqNum is the _next_ sequence number that
430 2 : // will be assigned, so subtract that by 1 to get the upper bound on the
431 2 : // last assigned sequence number.
432 2 : logSeqNum := vs.logSeqNum.Load()
433 2 : ve.LastSeqNum = logSeqNum - 1
434 2 : if logSeqNum == 0 {
435 0 : // logSeqNum is initialized to 1 in Open() if there are no previous WAL
436 0 : // or manifest records, so this case should never happen.
437 0 : vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
438 0 : }
439 :
440 2 : currentVersion := vs.currentVersion()
441 2 : var newVersion *version
442 2 :
443 2 : // Generate a new manifest if we don't currently have one, or forceRotation
444 2 : // is true, or the current one is too large.
445 2 : //
446 2 : // For largeness, we do not exclusively use MaxManifestFileSize size
447 2 : // threshold since we have had incidents where due to either large keys or
448 2 : // large numbers of files, each edit results in a snapshot + write of the
449 2 : // edit. This slows the system down since each flush or compaction is
450 2 : // writing a new manifest snapshot. The primary goal of the size-based
451 2 : // rollover logic is to ensure that when reopening a DB, the number of edits
452 2 : // that need to be replayed on top of the snapshot is "sane". Rolling over
453 2 : // to a new manifest after each edit is not relevant to that goal.
454 2 : //
455 2 : // Consider the following cases:
456 2 : // - The number of live files F in the DB is roughly stable: after writing
457 2 : // the snapshot (with F files), say we require that there be enough edits
458 2 : // such that the cumulative number of files in those edits, E, be greater
459 2 : // than F. This will ensure that the total amount of time in logAndApply
460 2 : // that is spent in snapshot writing is ~50%.
461 2 : //
462 2 : // - The number of live files F in the DB is shrinking drastically, say from
463 2 : // F to F/10: This can happen for various reasons, like wide range
464 2 : // tombstones, or large numbers of smaller than usual files that are being
465 2 : // merged together into larger files. And say the new files generated
466 2 : // during this shrinkage is insignificant compared to F/10, and so for
467 2 : // this example we will assume it is effectively 0. After this shrinking,
468 2 : // E = 0.9F, and so if we used the previous snapshot file count, F, as the
469 2 : // threshold that needs to be exceeded, we will further delay the snapshot
470 2 : // writing. Which means on DB reopen we will need to replay 0.9F edits to
471 2 : // get to a version with 0.1F files. It would be better to create a new
472 2 : // snapshot when E exceeds the number of files in the current version.
473 2 : //
474 2 : // - The number of live files F in the DB is growing via perfect ingests
475 2 : // into L6: Say we wrote the snapshot when there were F files and now we
476 2 : // have 10F files, so E = 9F. We will further delay writing a new
477 2 : // snapshot. This case can be critiqued as contrived, but we consider it
478 2 : // nonetheless.
479 2 : //
480 2 : // The logic below uses the min of the last snapshot file count and the file
481 2 : // count in the current version.
482 2 : vs.rotationHelper.AddRecord(int64(len(ve.DeletedFiles) + len(ve.NewFiles)))
483 2 : sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
484 2 : requireRotation := forceRotation || vs.manifest == nil
485 2 :
486 2 : var nextSnapshotFilecount int64
487 2 : for i := range vs.metrics.Levels {
488 2 : nextSnapshotFilecount += vs.metrics.Levels[i].NumFiles
489 2 : }
490 2 : if sizeExceeded && !requireRotation {
491 2 : requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
492 2 : }
493 2 : var newManifestFileNum base.DiskFileNum
494 2 : var prevManifestFileSize uint64
495 2 : if requireRotation {
496 2 : newManifestFileNum = vs.getNextDiskFileNum()
497 2 : prevManifestFileSize = uint64(vs.manifest.Size())
498 2 : }
499 :
500 : // Grab certain values before releasing vs.mu, in case createManifest() needs
501 : // to be called.
502 2 : minUnflushedLogNum := vs.minUnflushedLogNum
503 2 : nextFileNum := vs.nextFileNum
504 2 :
505 2 : var zombies map[base.DiskFileNum]uint64
506 2 : if err := func() error {
507 2 : vs.mu.Unlock()
508 2 : defer vs.mu.Lock()
509 2 :
510 2 : var err error
511 2 : if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
512 0 : return errors.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
513 0 : }
514 2 : newVersion, zombies, err = manifest.AccumulateIncompleteAndApplySingleVE(
515 2 : ve, currentVersion, vs.cmp, vs.opts.Comparer.FormatKey,
516 2 : vs.opts.FlushSplitBytes, vs.opts.Experimental.ReadCompactionRate,
517 2 : vs.backingState.fileBackingMap, vs.addFileBacking, vs.removeFileBacking,
518 2 : )
519 2 : if err != nil {
520 0 : return errors.Wrap(err, "MANIFEST apply failed")
521 0 : }
522 :
523 2 : if newManifestFileNum != 0 {
524 2 : if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum); err != nil {
525 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
526 1 : JobID: jobID,
527 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
528 1 : FileNum: newManifestFileNum,
529 1 : Err: err,
530 1 : })
531 1 : return errors.Wrap(err, "MANIFEST create failed")
532 1 : }
533 : }
534 :
535 2 : w, err := vs.manifest.Next()
536 2 : if err != nil {
537 0 : return errors.Wrap(err, "MANIFEST next record write failed")
538 0 : }
539 :
540 : // NB: Any error from this point on is considered fatal as we don't know if
541 : // the MANIFEST write occurred or not. Trying to determine that is
542 : // fraught. Instead we rely on the standard recovery mechanism run when a
543 : // database is open. In particular, that mechanism generates a new MANIFEST
544 : // and ensures it is synced.
545 2 : if err := ve.Encode(w); err != nil {
546 0 : return errors.Wrap(err, "MANIFEST write failed")
547 0 : }
548 2 : if err := vs.manifest.Flush(); err != nil {
549 1 : return errors.Wrap(err, "MANIFEST flush failed")
550 1 : }
551 2 : if err := vs.manifestFile.Sync(); err != nil {
552 1 : return errors.Wrap(err, "MANIFEST sync failed")
553 1 : }
554 2 : if newManifestFileNum != 0 {
555 2 : // NB: Move() is responsible for syncing the data directory.
556 2 : if err := vs.manifestMarker.Move(base.MakeFilename(fileTypeManifest, newManifestFileNum)); err != nil {
557 0 : return errors.Wrap(err, "MANIFEST set current failed")
558 0 : }
559 2 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
560 2 : JobID: jobID,
561 2 : Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
562 2 : FileNum: newManifestFileNum,
563 2 : })
564 : }
565 2 : return nil
566 1 : }(); err != nil {
567 1 : // Any error encountered during any of the operations in the previous
568 1 : // closure are considered fatal. Treating such errors as fatal is preferred
569 1 : // to attempting to unwind various file and b-tree reference counts, and
570 1 : // re-generating L0 sublevel metadata. This may change in the future, if
571 1 : // certain manifest / WAL operations become retryable. For more context, see
572 1 : // #1159 and #1792.
573 1 : vs.opts.Logger.Fatalf("%s", err)
574 1 : return err
575 1 : }
576 :
577 2 : if requireRotation {
578 2 : // Successfully rotated.
579 2 : vs.rotationHelper.Rotate(nextSnapshotFilecount)
580 2 : }
581 : // Now that DB.mu is held again, initialize compacting file info in
582 : // L0Sublevels.
583 2 : inProgress := inProgressCompactions()
584 2 :
585 2 : newVersion.L0Sublevels.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
586 2 :
587 2 : // Update the zombie tables set first, as installation of the new version
588 2 : // will unref the previous version which could result in addObsoleteLocked
589 2 : // being called.
590 2 : for fileNum, size := range zombies {
591 2 : vs.zombieTables[fileNum] = size
592 2 : }
593 :
594 : // Install the new version.
595 2 : vs.append(newVersion)
596 2 : if ve.MinUnflushedLogNum != 0 {
597 2 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
598 2 : }
599 2 : if newManifestFileNum != 0 {
600 2 : if vs.manifestFileNum != 0 {
601 2 : vs.obsoleteManifests = append(vs.obsoleteManifests, fileInfo{
602 2 : FileNum: vs.manifestFileNum,
603 2 : FileSize: prevManifestFileSize,
604 2 : })
605 2 : }
606 2 : vs.manifestFileNum = newManifestFileNum
607 : }
608 :
609 2 : for level, update := range metrics {
610 2 : vs.metrics.Levels[level].Add(update)
611 2 : }
612 2 : for i := range vs.metrics.Levels {
613 2 : l := &vs.metrics.Levels[i]
614 2 : l.NumFiles = int64(newVersion.Levels[i].Len())
615 2 : l.NumVirtualFiles = newVersion.Levels[i].NumVirtual
616 2 : l.VirtualSize = newVersion.Levels[i].VirtualSize
617 2 : l.Size = int64(newVersion.Levels[i].Size())
618 2 :
619 2 : l.Sublevels = 0
620 2 : if l.NumFiles > 0 {
621 2 : l.Sublevels = 1
622 2 : }
623 2 : if invariants.Enabled {
624 2 : levelFiles := newVersion.Levels[i].Slice()
625 2 : if size := int64(levelFiles.SizeSum()); l.Size != size {
626 0 : vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.Size, size)
627 0 : }
628 2 : if nVirtual := levelFiles.NumVirtual(); nVirtual != l.NumVirtualFiles {
629 0 : vs.opts.Logger.Fatalf(
630 0 : "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
631 0 : i, l.NumVirtualFiles, nVirtual,
632 0 : )
633 0 : }
634 2 : if vSize := levelFiles.VirtualSizeSum(); vSize != l.VirtualSize {
635 0 : vs.opts.Logger.Fatalf(
636 0 : "versionSet metrics L%d Virtual size = %d, actual size = %d",
637 0 : i, l.VirtualSize, vSize,
638 0 : )
639 0 : }
640 : }
641 : }
642 2 : vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
643 2 :
644 2 : vs.picker = newCompactionPicker(newVersion, vs.opts, inProgress)
645 2 : if !vs.dynamicBaseLevel {
646 1 : vs.picker.forceBaseLevel1()
647 1 : }
648 2 : return nil
649 : }
650 :
651 : func (vs *versionSet) incrementCompactions(
652 : kind compactionKind, extraLevels []*compactionLevel, pickerMetrics compactionPickerMetrics,
653 2 : ) {
654 2 : switch kind {
655 2 : case compactionKindDefault:
656 2 : vs.metrics.Compact.Count++
657 2 : vs.metrics.Compact.DefaultCount++
658 :
659 2 : case compactionKindFlush, compactionKindIngestedFlushable:
660 2 : vs.metrics.Flush.Count++
661 :
662 2 : case compactionKindMove:
663 2 : vs.metrics.Compact.Count++
664 2 : vs.metrics.Compact.MoveCount++
665 :
666 2 : case compactionKindDeleteOnly:
667 2 : vs.metrics.Compact.Count++
668 2 : vs.metrics.Compact.DeleteOnlyCount++
669 :
670 2 : case compactionKindElisionOnly:
671 2 : vs.metrics.Compact.Count++
672 2 : vs.metrics.Compact.ElisionOnlyCount++
673 :
674 1 : case compactionKindRead:
675 1 : vs.metrics.Compact.Count++
676 1 : vs.metrics.Compact.ReadCount++
677 :
678 1 : case compactionKindRewrite:
679 1 : vs.metrics.Compact.Count++
680 1 : vs.metrics.Compact.RewriteCount++
681 : }
682 2 : if len(extraLevels) > 0 {
683 2 : vs.metrics.Compact.MultiLevelCount++
684 2 : }
685 : }
686 :
687 2 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
688 2 : vs.atomicInProgressBytes.Add(numBytes)
689 2 : }
690 :
691 : // createManifest creates a manifest file that contains a snapshot of vs.
692 : func (vs *versionSet) createManifest(
693 : dirname string, fileNum, minUnflushedLogNum base.DiskFileNum, nextFileNum uint64,
694 2 : ) (err error) {
695 2 : var (
696 2 : filename = base.MakeFilepath(vs.fs, dirname, fileTypeManifest, fileNum)
697 2 : manifestFile vfs.File
698 2 : manifest *record.Writer
699 2 : )
700 2 : defer func() {
701 2 : if manifest != nil {
702 0 : manifest.Close()
703 0 : }
704 2 : if manifestFile != nil {
705 0 : manifestFile.Close()
706 0 : }
707 2 : if err != nil {
708 1 : vs.fs.Remove(filename)
709 1 : }
710 : }()
711 2 : manifestFile, err = vs.fs.Create(filename)
712 2 : if err != nil {
713 1 : return err
714 1 : }
715 2 : manifest = record.NewWriter(manifestFile)
716 2 :
717 2 : snapshot := versionEdit{
718 2 : ComparerName: vs.cmpName,
719 2 : }
720 2 : dedup := make(map[base.DiskFileNum]struct{})
721 2 : for level, levelMetadata := range vs.currentVersion().Levels {
722 2 : iter := levelMetadata.Iter()
723 2 : for meta := iter.First(); meta != nil; meta = iter.Next() {
724 2 : snapshot.NewFiles = append(snapshot.NewFiles, newFileEntry{
725 2 : Level: level,
726 2 : Meta: meta,
727 2 : })
728 2 : if _, ok := dedup[meta.FileBacking.DiskFileNum]; meta.Virtual && !ok {
729 2 : dedup[meta.FileBacking.DiskFileNum] = struct{}{}
730 2 : snapshot.CreatedBackingTables = append(
731 2 : snapshot.CreatedBackingTables,
732 2 : meta.FileBacking,
733 2 : )
734 2 : }
735 : }
736 : }
737 :
738 : // When creating a version snapshot for an existing DB, this snapshot VersionEdit will be
739 : // immediately followed by another VersionEdit (being written in logAndApply()). That
740 : // VersionEdit always contains a LastSeqNum, so we don't need to include that in the snapshot.
741 : // But it does not necessarily include MinUnflushedLogNum, NextFileNum, so we initialize those
742 : // using the corresponding fields in the versionSet (which came from the latest preceding
743 : // VersionEdit that had those fields).
744 2 : snapshot.MinUnflushedLogNum = minUnflushedLogNum
745 2 : snapshot.NextFileNum = nextFileNum
746 2 :
747 2 : w, err1 := manifest.Next()
748 2 : if err1 != nil {
749 0 : return err1
750 0 : }
751 2 : if err := snapshot.Encode(w); err != nil {
752 0 : return err
753 0 : }
754 :
755 2 : if vs.manifest != nil {
756 2 : vs.manifest.Close()
757 2 : vs.manifest = nil
758 2 : }
759 2 : if vs.manifestFile != nil {
760 2 : if err := vs.manifestFile.Close(); err != nil {
761 0 : return err
762 0 : }
763 2 : vs.manifestFile = nil
764 : }
765 :
766 2 : vs.manifest, manifest = manifest, nil
767 2 : vs.manifestFile, manifestFile = manifestFile, nil
768 2 : return nil
769 : }
770 :
771 2 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
772 2 : if vs.nextFileNum <= uint64(fileNum) {
773 2 : vs.nextFileNum = uint64(fileNum + 1)
774 2 : }
775 : }
776 :
777 2 : func (vs *versionSet) getNextFileNum() base.FileNum {
778 2 : x := vs.nextFileNum
779 2 : vs.nextFileNum++
780 2 : return base.FileNum(x)
781 2 : }
782 :
783 2 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
784 2 : x := vs.nextFileNum
785 2 : vs.nextFileNum++
786 2 : return base.DiskFileNum(x)
787 2 : }
788 :
789 2 : func (vs *versionSet) append(v *version) {
790 2 : if v.Refs() != 0 {
791 0 : panic("pebble: version should be unreferenced")
792 : }
793 2 : if !vs.versions.Empty() {
794 2 : vs.versions.Back().UnrefLocked()
795 2 : }
796 2 : v.Deleted = vs.obsoleteFn
797 2 : v.Ref()
798 2 : vs.versions.PushBack(v)
799 : }
800 :
801 2 : func (vs *versionSet) currentVersion() *version {
802 2 : return vs.versions.Back()
803 2 : }
804 :
805 2 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
806 2 : current := vs.currentVersion()
807 2 : for v := vs.versions.Front(); true; v = v.Next() {
808 2 : for _, lm := range v.Levels {
809 2 : iter := lm.Iter()
810 2 : for f := iter.First(); f != nil; f = iter.Next() {
811 2 : m[f.FileBacking.DiskFileNum] = struct{}{}
812 2 : }
813 : }
814 2 : if v == current {
815 2 : break
816 : }
817 : }
818 : }
819 :
820 : // addObsoleteLocked will add the fileInfo associated with obsolete backing
821 : // sstables to the obsolete tables list.
822 : //
823 : // The file backings in the obsolete list must not appear more than once.
824 : //
825 : // DB.mu must be held when addObsoleteLocked is called.
826 2 : func (vs *versionSet) addObsoleteLocked(obsolete []*fileBacking) {
827 2 : if len(obsolete) == 0 {
828 2 : return
829 2 : }
830 :
831 2 : obsoleteFileInfo := make([]fileInfo, len(obsolete))
832 2 : for i, bs := range obsolete {
833 2 : obsoleteFileInfo[i].FileNum = bs.DiskFileNum
834 2 : obsoleteFileInfo[i].FileSize = bs.Size
835 2 : }
836 :
837 2 : if invariants.Enabled {
838 2 : dedup := make(map[base.DiskFileNum]struct{})
839 2 : for _, fi := range obsoleteFileInfo {
840 2 : dedup[fi.FileNum] = struct{}{}
841 2 : }
842 2 : if len(dedup) != len(obsoleteFileInfo) {
843 0 : panic("pebble: duplicate FileBacking present in obsolete list")
844 : }
845 : }
846 :
847 2 : for _, fi := range obsoleteFileInfo {
848 2 : // Note that the obsolete tables are no longer zombie by the definition of
849 2 : // zombie, but we leave them in the zombie tables map until they are
850 2 : // deleted from disk.
851 2 : if _, ok := vs.zombieTables[fi.FileNum]; !ok {
852 0 : vs.opts.Logger.Fatalf("MANIFEST obsolete table %s not marked as zombie", fi.FileNum)
853 0 : }
854 : }
855 :
856 2 : vs.obsoleteTables = append(vs.obsoleteTables, obsoleteFileInfo...)
857 2 : vs.updateObsoleteTableMetricsLocked()
858 : }
859 :
860 : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
861 : // called.
862 2 : func (vs *versionSet) addObsolete(obsolete []*fileBacking) {
863 2 : vs.mu.Lock()
864 2 : defer vs.mu.Unlock()
865 2 : vs.addObsoleteLocked(obsolete)
866 2 : }
867 :
868 2 : func (vs *versionSet) updateObsoleteTableMetricsLocked() {
869 2 : vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
870 2 : vs.metrics.Table.ObsoleteSize = 0
871 2 : for _, fi := range vs.obsoleteTables {
872 2 : vs.metrics.Table.ObsoleteSize += fi.FileSize
873 2 : }
874 : }
875 :
876 : func findCurrentManifest(
877 : fs vfs.FS, dirname string,
878 2 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
879 2 : // Locating a marker should succeed even if the marker has never been placed.
880 2 : var filename string
881 2 : marker, filename, err = atomicfs.LocateMarker(fs, dirname, manifestMarkerName)
882 2 : if err != nil {
883 1 : return nil, 0, false, err
884 1 : }
885 :
886 2 : if filename == "" {
887 2 : // The marker hasn't been set yet. This database doesn't exist.
888 2 : return marker, 0, false, nil
889 2 : }
890 :
891 2 : var ok bool
892 2 : _, manifestNum, ok = base.ParseFilename(fs, filename)
893 2 : if !ok {
894 0 : return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
895 0 : }
896 2 : return marker, manifestNum, true, nil
897 : }
898 :
899 2 : func newFileMetrics(newFiles []manifest.NewFileEntry) map[int]*LevelMetrics {
900 2 : m := map[int]*LevelMetrics{}
901 2 : for _, nf := range newFiles {
902 2 : lm := m[nf.Level]
903 2 : if lm == nil {
904 2 : lm = &LevelMetrics{}
905 2 : m[nf.Level] = lm
906 2 : }
907 2 : lm.NumFiles++
908 2 : lm.Size += int64(nf.Meta.Size)
909 : }
910 2 : return m
911 : }
|