Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "sync"
11 : "sync/atomic"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/record"
19 : "github.com/cockroachdb/pebble/sstable/blob"
20 : "github.com/cockroachdb/pebble/vfs"
21 : "github.com/cockroachdb/pebble/vfs/atomicfs"
22 : )
23 :
24 : const numLevels = manifest.NumLevels
25 :
26 : const manifestMarkerName = `manifest`
27 :
28 : // versionSet manages a collection of immutable versions, and manages the
29 : // creation of a new version from the most recent version. A new version is
30 : // created from an existing version by applying a version edit which is just
31 : // like it sounds: a delta from the previous version. Version edits are logged
32 : // to the MANIFEST file, which is replayed at startup.
33 : type versionSet struct {
34 : // Next seqNum to use for WAL writes.
35 : logSeqNum base.AtomicSeqNum
36 :
37 : // The upper bound on sequence numbers that have been assigned so far. A
38 : // suffix of these sequence numbers may not have been written to a WAL. Both
39 : // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
40 : // visibleSeqNum is <= logSeqNum.
41 : visibleSeqNum base.AtomicSeqNum
42 :
43 : // Number of bytes present in sstables being written by in-progress
44 : // compactions. This value will be zero if there are no in-progress
45 : // compactions. Updated and read atomically.
46 : atomicInProgressBytes atomic.Int64
47 :
48 : // Immutable fields.
49 : dirname string
50 : provider objstorage.Provider
51 : // Set to DB.mu.
52 : mu *sync.Mutex
53 : opts *Options
54 : fs vfs.FS
55 : cmp *base.Comparer
56 : // Dynamic base level allows the dynamic base level computation to be
57 : // disabled. Used by tests which want to create specific LSM structures.
58 : dynamicBaseLevel bool
59 :
60 : // Mutable fields.
61 : versions manifest.VersionList
62 : l0Organizer *manifest.L0Organizer
63 : // blobFiles is the set of blob files referenced by the current version.
64 : // blobFiles is protected by the manifest logLock (not vs.mu).
65 : blobFiles manifest.CurrentBlobFileSet
66 : picker compactionPicker
67 : // curCompactionConcurrency is updated whenever picker is updated.
68 : // INVARIANT: >= 1.
69 : curCompactionConcurrency atomic.Int32
70 :
71 : // Not all metrics are kept here. See DB.Metrics().
72 : metrics Metrics
73 :
74 : // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
75 : // on the creation of every version.
76 : obsoleteFn func(manifest.ObsoleteFiles)
77 : // obsolete{Tables,Blobs,Manifests,Options} are sorted by file number ascending.
78 : obsoleteTables []obsoleteFile
79 : obsoleteBlobs []obsoleteFile
80 : obsoleteManifests []obsoleteFile
81 : obsoleteOptions []obsoleteFile
82 :
83 : // Zombie tables which have been removed from the current version but are
84 : // still referenced by an inuse iterator.
85 : zombieTables zombieObjects
86 : // Zombie blobs which have been removed from the current version but are
87 : // still referenced by an inuse iterator.
88 : zombieBlobs zombieObjects
89 : // virtualBackings contains information about the FileBackings which support
90 : // virtual sstables in the latest version. It is mainly used to determine when
91 : // a backing is no longer in use by the tables in the latest version; this is
92 : // not a trivial problem because compactions and other operations can happen
93 : // in parallel (and they can finish in unpredictable order).
94 : //
95 : // This mechanism is complementary to the backing Ref/Unref mechanism, which
96 : // determines when a backing is no longer used by *any* live version and can
97 : // be removed.
98 : //
99 : // In principle this should have been a copy-on-write structure, with each
100 : // Version having its own record of its virtual backings (similar to the
101 : // B-tree that holds the tables). However, in practice we only need it for the
102 : // latest version, so we use a simpler structure and store it in the
103 : // versionSet instead.
104 : //
105 : // virtualBackings is modified under DB.mu and the log lock. If it is accessed
106 : // under DB.mu and a version update is in progress, it reflects the state of
107 : // the next version.
108 : virtualBackings manifest.VirtualBackings
109 :
110 : // minUnflushedLogNum is the smallest WAL log file number corresponding to
111 : // mutations that have not been flushed to an sstable.
112 : minUnflushedLogNum base.DiskFileNum
113 :
114 : // The next file number. A single counter is used to assign file
115 : // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
116 : nextFileNum atomic.Uint64
117 :
118 : // The current manifest file number.
119 : manifestFileNum base.DiskFileNum
120 : manifestMarker *atomicfs.Marker
121 :
122 : manifestFile vfs.File
123 : manifest *record.Writer
124 : getFormatMajorVersion func() FormatMajorVersion
125 :
126 : writing bool
127 : writerCond sync.Cond
128 : // State for deciding when to write a snapshot. Protected by mu.
129 : rotationHelper record.RotationHelper
130 :
131 : pickedCompactionCache pickedCompactionCache
132 : }
133 :
134 : func (vs *versionSet) init(
135 : dirname string,
136 : provider objstorage.Provider,
137 : opts *Options,
138 : marker *atomicfs.Marker,
139 : getFMV func() FormatMajorVersion,
140 : mu *sync.Mutex,
141 2 : ) {
142 2 : vs.dirname = dirname
143 2 : vs.provider = provider
144 2 : vs.mu = mu
145 2 : vs.writerCond.L = mu
146 2 : vs.opts = opts
147 2 : vs.fs = opts.FS
148 2 : vs.cmp = opts.Comparer
149 2 : vs.dynamicBaseLevel = true
150 2 : vs.versions.Init(mu)
151 2 : vs.l0Organizer = manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes)
152 2 : vs.obsoleteFn = vs.addObsoleteLocked
153 2 : vs.zombieTables = makeZombieObjects()
154 2 : vs.zombieBlobs = makeZombieObjects()
155 2 : vs.virtualBackings = manifest.MakeVirtualBackings()
156 2 : vs.nextFileNum.Store(1)
157 2 : vs.manifestMarker = marker
158 2 : vs.getFormatMajorVersion = getFMV
159 2 : }
160 :
161 : // create creates a version set for a fresh DB.
162 : func (vs *versionSet) create(
163 : jobID JobID,
164 : dirname string,
165 : provider objstorage.Provider,
166 : opts *Options,
167 : marker *atomicfs.Marker,
168 : getFormatMajorVersion func() FormatMajorVersion,
169 : mu *sync.Mutex,
170 2 : ) error {
171 2 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
172 2 : emptyVersion := manifest.NewInitialVersion(opts.Comparer)
173 2 : vs.append(emptyVersion)
174 2 : vs.blobFiles.Init(nil)
175 2 :
176 2 : vs.setCompactionPicker(
177 2 : newCompactionPickerByScore(emptyVersion, vs.l0Organizer, &vs.virtualBackings, vs.opts, nil))
178 2 : // Note that a "snapshot" version edit is written to the manifest when it is
179 2 : // created.
180 2 : vs.manifestFileNum = vs.getNextDiskFileNum()
181 2 : err := vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum.Load(), nil /* virtualBackings */)
182 2 : if err == nil {
183 2 : if err = vs.manifest.Flush(); err != nil {
184 1 : vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
185 1 : }
186 : }
187 2 : if err == nil {
188 2 : if err = vs.manifestFile.Sync(); err != nil {
189 1 : vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
190 1 : }
191 : }
192 2 : if err == nil {
193 2 : // NB: Move() is responsible for syncing the data directory.
194 2 : if err = vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, vs.manifestFileNum)); err != nil {
195 1 : vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
196 1 : }
197 : }
198 :
199 2 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
200 2 : JobID: int(jobID),
201 2 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
202 2 : FileNum: vs.manifestFileNum,
203 2 : Err: err,
204 2 : })
205 2 : if err != nil {
206 1 : return err
207 1 : }
208 2 : return nil
209 : }
210 :
211 : // load loads the version set from the manifest file.
212 : func (vs *versionSet) load(
213 : dirname string,
214 : provider objstorage.Provider,
215 : opts *Options,
216 : manifestFileNum base.DiskFileNum,
217 : marker *atomicfs.Marker,
218 : getFormatMajorVersion func() FormatMajorVersion,
219 : mu *sync.Mutex,
220 2 : ) error {
221 2 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
222 2 :
223 2 : vs.manifestFileNum = manifestFileNum
224 2 : manifestPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeManifest, vs.manifestFileNum)
225 2 : manifestFilename := opts.FS.PathBase(manifestPath)
226 2 :
227 2 : // Read the versionEdits in the manifest file.
228 2 : var bve manifest.BulkVersionEdit
229 2 : bve.AllAddedTables = make(map[base.TableNum]*manifest.TableMetadata)
230 2 : manifestFile, err := vs.fs.Open(manifestPath)
231 2 : if err != nil {
232 0 : return errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
233 0 : errors.Safe(manifestFilename), dirname)
234 0 : }
235 2 : defer manifestFile.Close()
236 2 : rr := record.NewReader(manifestFile, 0 /* logNum */)
237 2 : for {
238 2 : r, err := rr.Next()
239 2 : if err == io.EOF || record.IsInvalidRecord(err) {
240 2 : break
241 : }
242 2 : if err != nil {
243 0 : return errors.Wrapf(err, "pebble: error when loading manifest file %q",
244 0 : errors.Safe(manifestFilename))
245 0 : }
246 2 : var ve manifest.VersionEdit
247 2 : err = ve.Decode(r)
248 2 : if err != nil {
249 1 : // Break instead of returning an error if the record is corrupted
250 1 : // or invalid.
251 1 : if err == io.EOF || record.IsInvalidRecord(err) {
252 1 : break
253 : }
254 0 : return err
255 : }
256 2 : if ve.ComparerName != "" {
257 2 : if ve.ComparerName != vs.cmp.Name {
258 1 : return errors.Errorf("pebble: manifest file %q for DB %q: "+
259 1 : "comparer name from file %q != comparer name from Options %q",
260 1 : errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(vs.cmp.Name))
261 1 : }
262 : }
263 2 : if err := bve.Accumulate(&ve); err != nil {
264 0 : return err
265 0 : }
266 2 : if ve.MinUnflushedLogNum != 0 {
267 2 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
268 2 : }
269 2 : if ve.NextFileNum != 0 {
270 2 : vs.nextFileNum.Store(ve.NextFileNum)
271 2 : }
272 2 : if ve.LastSeqNum != 0 {
273 2 : // logSeqNum is the _next_ sequence number that will be assigned,
274 2 : // while LastSeqNum is the last assigned sequence number. Note that
275 2 : // this behaviour mimics that in RocksDB; the first sequence number
276 2 : // assigned is one greater than the one present in the manifest
277 2 : // (assuming no WALs contain higher sequence numbers than the
278 2 : // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
279 2 : // next sequence number that will be assigned.
280 2 : //
281 2 : // If LastSeqNum is less than SeqNumStart, increase it to at least
282 2 : // SeqNumStart to leave ample room for reserved sequence numbers.
283 2 : if ve.LastSeqNum+1 < base.SeqNumStart {
284 0 : vs.logSeqNum.Store(base.SeqNumStart)
285 2 : } else {
286 2 : vs.logSeqNum.Store(ve.LastSeqNum + 1)
287 2 : }
288 : }
289 : }
290 : // We have already set vs.nextFileNum = 2 at the beginning of the
291 : // function and could have only updated it to some other non-zero value,
292 : // so it cannot be 0 here.
293 2 : if vs.minUnflushedLogNum == 0 {
294 2 : if vs.nextFileNum.Load() >= 2 {
295 2 : // We either have a freshly created DB, or a DB created by RocksDB
296 2 : // that has not had a single flushed SSTable yet. This is because
297 2 : // RocksDB bumps up nextFileNum in this case without bumping up
298 2 : // minUnflushedLogNum, even if WALs with non-zero file numbers are
299 2 : // present in the directory.
300 2 : } else {
301 0 : return base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
302 0 : errors.Safe(manifestFilename), dirname)
303 0 : }
304 : }
305 2 : vs.markFileNumUsed(vs.minUnflushedLogNum)
306 2 :
307 2 : // Populate the virtual backings for virtual sstables since we have finished
308 2 : // version edit accumulation.
309 2 : for _, b := range bve.AddedFileBacking {
310 2 : vs.virtualBackings.AddAndRef(b)
311 2 : }
312 :
313 2 : for _, addedLevel := range bve.AddedTables {
314 2 : for _, m := range addedLevel {
315 2 : if m.Virtual {
316 2 : vs.virtualBackings.AddTable(m)
317 2 : }
318 : }
319 : }
320 :
321 2 : if invariants.Enabled {
322 2 : // There should be no deleted tables or backings, since we're starting from
323 2 : // an empty state.
324 2 : for _, deletedLevel := range bve.DeletedTables {
325 2 : if len(deletedLevel) != 0 {
326 0 : panic("deleted files after manifest replay")
327 : }
328 : }
329 2 : if len(bve.RemovedFileBacking) > 0 {
330 0 : panic("deleted backings after manifest replay")
331 : }
332 : }
333 :
334 2 : emptyVersion := manifest.NewInitialVersion(opts.Comparer)
335 2 : newVersion, err := bve.Apply(emptyVersion, opts.Experimental.ReadCompactionRate)
336 2 : if err != nil {
337 0 : return err
338 0 : }
339 2 : vs.l0Organizer.PerformUpdate(vs.l0Organizer.PrepareUpdate(&bve, newVersion), newVersion)
340 2 : vs.l0Organizer.InitCompactingFileInfo(nil /* in-progress compactions */)
341 2 : vs.blobFiles.Init(&bve)
342 2 : vs.append(newVersion)
343 2 :
344 2 : for i := range vs.metrics.Levels {
345 2 : l := &vs.metrics.Levels[i]
346 2 : l.TablesCount = int64(newVersion.Levels[i].Len())
347 2 : files := newVersion.Levels[i].Slice()
348 2 : l.TablesSize = int64(files.TableSizeSum())
349 2 : }
350 2 : for _, l := range newVersion.Levels {
351 2 : for f := range l.All() {
352 2 : if !f.Virtual {
353 2 : isLocal, localSize := sizeIfLocal(f.TableBacking, vs.provider)
354 2 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
355 2 : if isLocal {
356 2 : vs.metrics.Table.Local.LiveCount++
357 2 : }
358 : }
359 : }
360 : }
361 2 : vs.virtualBackings.ForEach(func(backing *manifest.TableBacking) {
362 2 : isLocal, localSize := sizeIfLocal(backing, vs.provider)
363 2 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
364 2 : if isLocal {
365 2 : vs.metrics.Table.Local.LiveCount++
366 2 : }
367 : })
368 :
369 2 : vs.setCompactionPicker(
370 2 : newCompactionPickerByScore(newVersion, vs.l0Organizer, &vs.virtualBackings, vs.opts, nil))
371 2 : return nil
372 : }
373 :
374 2 : func (vs *versionSet) close() error {
375 2 : if vs.manifestFile != nil {
376 2 : if err := vs.manifestFile.Close(); err != nil {
377 0 : return err
378 0 : }
379 : }
380 2 : if vs.manifestMarker != nil {
381 2 : if err := vs.manifestMarker.Close(); err != nil {
382 0 : return err
383 0 : }
384 : }
385 2 : return nil
386 : }
387 :
388 : // logLock locks the manifest for writing. The lock must be released by
389 : // a call to logUnlock.
390 : //
391 : // DB.mu must be held when calling this method, but the mutex may be dropped and
392 : // re-acquired during the course of this method.
393 2 : func (vs *versionSet) logLock() {
394 2 : // Wait for any existing writing to the manifest to complete, then mark the
395 2 : // manifest as busy.
396 2 : for vs.writing {
397 2 : // Note: writerCond.L is DB.mu, so we unlock it while we wait.
398 2 : vs.writerCond.Wait()
399 2 : }
400 2 : vs.writing = true
401 : }
402 :
403 : // logUnlock releases the lock for manifest writing.
404 : //
405 : // DB.mu must be held when calling this method.
406 2 : func (vs *versionSet) logUnlock() {
407 2 : if !vs.writing {
408 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
409 0 : }
410 2 : vs.writing = false
411 2 : vs.writerCond.Signal()
412 : }
413 :
414 2 : func (vs *versionSet) logUnlockAndInvalidatePickedCompactionCache() {
415 2 : vs.pickedCompactionCache.invalidate()
416 2 : vs.logUnlock()
417 2 : }
418 :
419 : // versionUpdate is returned by the function passed to UpdateVersionLocked.
420 : //
421 : // If VE is nil, there is no update to apply (but it is not an error).
422 : type versionUpdate struct {
423 : VE *manifest.VersionEdit
424 : JobID JobID
425 : Metrics levelMetricsDelta
426 : // InProgressCompactionFn is called while DB.mu is held after the I/O part of
427 : // the update was performed. It should return any compactions that are
428 : // in-progress (excluding than the one that is being applied).
429 : InProgressCompactionsFn func() []compactionInfo
430 : ForceManifestRotation bool
431 : }
432 :
433 : // UpdateVersionLocked is used to update the current version.
434 : //
435 : // DB.mu must be held. UpdateVersionLocked first waits for any other version
436 : // update to complete, releasing and reacquiring DB.mu.
437 : //
438 : // UpdateVersionLocked then calls updateFn which builds a versionUpdate, while
439 : // holding DB.mu. The updateFn can release and reacquire DB.mu (it should
440 : // attempt to do as much work as possible outside of the lock).
441 : //
442 : // UpdateVersionLocked fills in the following fields of the VersionEdit:
443 : // NextFileNum, LastSeqNum, RemovedBackingTables. The removed backing tables are
444 : // those backings that are no longer used (in the new version) after applying
445 : // the edit (as per vs.virtualBackings). Other than these fields, the
446 : // VersionEdit must be complete.
447 : //
448 : // New table backing references (TableBacking.Ref) are taken as part of applying
449 : // the version edit. The state of the virtual backings (vs.virtualBackings) is
450 : // updated before logging to the manifest and installing the latest version;
451 : // this is ok because any failure in those steps is fatal.
452 : //
453 : // If updateFn returns an error, no update is applied and that same error is returned.
454 : // If versionUpdate.VE is nil, the no update is applied (and no error is returned).
455 2 : func (vs *versionSet) UpdateVersionLocked(updateFn func() (versionUpdate, error)) error {
456 2 : vs.logLock()
457 2 : defer vs.logUnlockAndInvalidatePickedCompactionCache()
458 2 :
459 2 : vu, err := updateFn()
460 2 : if err != nil || vu.VE == nil {
461 2 : return err
462 2 : }
463 :
464 2 : if !vs.writing {
465 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
466 0 : }
467 :
468 2 : ve := vu.VE
469 2 : if ve.MinUnflushedLogNum != 0 {
470 2 : if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
471 2 : vs.nextFileNum.Load() <= uint64(ve.MinUnflushedLogNum) {
472 0 : panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
473 0 : ve.MinUnflushedLogNum))
474 : }
475 : }
476 :
477 : // This is the next manifest filenum, but if the current file is too big we
478 : // will write this ve to the next file which means what ve encodes is the
479 : // current filenum and not the next one.
480 : //
481 : // TODO(sbhola): figure out why this is correct and update comment.
482 2 : ve.NextFileNum = vs.nextFileNum.Load()
483 2 :
484 2 : // LastSeqNum is set to the current upper bound on the assigned sequence
485 2 : // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
486 2 : // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
487 2 : // replay. It must be higher than or equal to any than any sequence number
488 2 : // written to an sstable, including sequence numbers in ingested files.
489 2 : // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
490 2 : // number. This is fallout from ingestion which allows a sequence number X to
491 2 : // be assigned to an ingested sstable even though sequence number X-1 resides
492 2 : // in an unflushed memtable. logSeqNum is the _next_ sequence number that
493 2 : // will be assigned, so subtract that by 1 to get the upper bound on the
494 2 : // last assigned sequence number.
495 2 : logSeqNum := vs.logSeqNum.Load()
496 2 : ve.LastSeqNum = logSeqNum - 1
497 2 : if logSeqNum == 0 {
498 0 : // logSeqNum is initialized to 1 in Open() if there are no previous WAL
499 0 : // or manifest records, so this case should never happen.
500 0 : vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
501 0 : }
502 :
503 2 : currentVersion := vs.currentVersion()
504 2 : var newVersion *manifest.Version
505 2 :
506 2 : // Generate a new manifest if we don't currently have one, or forceRotation
507 2 : // is true, or the current one is too large.
508 2 : //
509 2 : // For largeness, we do not exclusively use MaxManifestFileSize size
510 2 : // threshold since we have had incidents where due to either large keys or
511 2 : // large numbers of files, each edit results in a snapshot + write of the
512 2 : // edit. This slows the system down since each flush or compaction is
513 2 : // writing a new manifest snapshot. The primary goal of the size-based
514 2 : // rollover logic is to ensure that when reopening a DB, the number of edits
515 2 : // that need to be replayed on top of the snapshot is "sane". Rolling over
516 2 : // to a new manifest after each edit is not relevant to that goal.
517 2 : //
518 2 : // Consider the following cases:
519 2 : // - The number of live files F in the DB is roughly stable: after writing
520 2 : // the snapshot (with F files), say we require that there be enough edits
521 2 : // such that the cumulative number of files in those edits, E, be greater
522 2 : // than F. This will ensure that the total amount of time in
523 2 : // UpdateVersionLocked that is spent in snapshot writing is ~50%.
524 2 : //
525 2 : // - The number of live files F in the DB is shrinking drastically, say from
526 2 : // F to F/10: This can happen for various reasons, like wide range
527 2 : // tombstones, or large numbers of smaller than usual files that are being
528 2 : // merged together into larger files. And say the new files generated
529 2 : // during this shrinkage is insignificant compared to F/10, and so for
530 2 : // this example we will assume it is effectively 0. After this shrinking,
531 2 : // E = 0.9F, and so if we used the previous snapshot file count, F, as the
532 2 : // threshold that needs to be exceeded, we will further delay the snapshot
533 2 : // writing. Which means on DB reopen we will need to replay 0.9F edits to
534 2 : // get to a version with 0.1F files. It would be better to create a new
535 2 : // snapshot when E exceeds the number of files in the current version.
536 2 : //
537 2 : // - The number of live files F in the DB is growing via perfect ingests
538 2 : // into L6: Say we wrote the snapshot when there were F files and now we
539 2 : // have 10F files, so E = 9F. We will further delay writing a new
540 2 : // snapshot. This case can be critiqued as contrived, but we consider it
541 2 : // nonetheless.
542 2 : //
543 2 : // The logic below uses the min of the last snapshot file count and the file
544 2 : // count in the current version.
545 2 : vs.rotationHelper.AddRecord(int64(len(ve.DeletedTables) + len(ve.NewTables)))
546 2 : sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
547 2 : requireRotation := vu.ForceManifestRotation || vs.manifest == nil
548 2 :
549 2 : var nextSnapshotFilecount int64
550 2 : for i := range vs.metrics.Levels {
551 2 : nextSnapshotFilecount += vs.metrics.Levels[i].TablesCount
552 2 : }
553 2 : if sizeExceeded && !requireRotation {
554 2 : requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
555 2 : }
556 2 : var newManifestFileNum base.DiskFileNum
557 2 : var prevManifestFileSize uint64
558 2 : var newManifestVirtualBackings []*manifest.TableBacking
559 2 : if requireRotation {
560 2 : newManifestFileNum = vs.getNextDiskFileNum()
561 2 : prevManifestFileSize = uint64(vs.manifest.Size())
562 2 :
563 2 : // We want the virtual backings *before* applying the version edit, because
564 2 : // the new manifest will contain the pre-apply version plus the last version
565 2 : // edit.
566 2 : newManifestVirtualBackings = vs.virtualBackings.Backings()
567 2 : }
568 :
569 : // Grab certain values before releasing vs.mu, in case createManifest() needs
570 : // to be called.
571 2 : minUnflushedLogNum := vs.minUnflushedLogNum
572 2 : nextFileNum := vs.nextFileNum.Load()
573 2 :
574 2 : // Note: this call populates ve.RemovedBackingTables.
575 2 : zombieBackings, removedVirtualBackings, localTablesLiveDelta :=
576 2 : getZombieTablesAndUpdateVirtualBackings(ve, &vs.virtualBackings, vs.provider)
577 2 :
578 2 : var l0Update manifest.L0PreparedUpdate
579 2 : if err := func() error {
580 2 : vs.mu.Unlock()
581 2 : defer vs.mu.Lock()
582 2 :
583 2 : if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
584 0 : return base.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
585 0 : }
586 :
587 : // Rotate the manifest if necessary. Rotating the manifest involves
588 : // creating a new file and writing an initial version edit containing a
589 : // snapshot of the current version. This initial version edit will
590 : // reflect the Version prior to the pending version edit (`ve`). Once
591 : // we've created the new manifest with the previous version state, we'll
592 : // append the version edit `ve` to the tail of the new manifest.
593 2 : if newManifestFileNum != 0 {
594 2 : if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum, newManifestVirtualBackings); err != nil {
595 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
596 1 : JobID: int(vu.JobID),
597 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
598 1 : FileNum: newManifestFileNum,
599 1 : Err: err,
600 1 : })
601 1 : return errors.Wrap(err, "MANIFEST create failed")
602 1 : }
603 : }
604 :
605 : // Call ApplyAndUpdateVersionEdit before accumulating the version edit.
606 : // If any blob files are no longer referenced, the version edit will be
607 : // updated to explicitly record the deletion of the blob files. This can
608 : // happen here because vs.blobFiles is protected by the manifest logLock
609 : // (NOT vs.mu). We only read or write vs.blobFiles while holding the
610 : // manifest lock.
611 2 : if err := vs.blobFiles.ApplyAndUpdateVersionEdit(ve); err != nil {
612 0 : return errors.Wrap(err, "MANIFEST blob files apply and update failed")
613 0 : }
614 :
615 2 : var bulkEdit manifest.BulkVersionEdit
616 2 : err := bulkEdit.Accumulate(ve)
617 2 : if err != nil {
618 0 : return errors.Wrap(err, "MANIFEST accumulate failed")
619 0 : }
620 2 : newVersion, err = bulkEdit.Apply(currentVersion, vs.opts.Experimental.ReadCompactionRate)
621 2 : if err != nil {
622 0 : return errors.Wrap(err, "MANIFEST apply failed")
623 0 : }
624 2 : l0Update = vs.l0Organizer.PrepareUpdate(&bulkEdit, newVersion)
625 2 :
626 2 : w, err := vs.manifest.Next()
627 2 : if err != nil {
628 0 : return errors.Wrap(err, "MANIFEST next record write failed")
629 0 : }
630 :
631 : // NB: Any error from this point on is considered fatal as we don't know if
632 : // the MANIFEST write occurred or not. Trying to determine that is
633 : // fraught. Instead we rely on the standard recovery mechanism run when a
634 : // database is open. In particular, that mechanism generates a new MANIFEST
635 : // and ensures it is synced.
636 2 : if err := ve.Encode(w); err != nil {
637 0 : return errors.Wrap(err, "MANIFEST write failed")
638 0 : }
639 2 : if err := vs.manifest.Flush(); err != nil {
640 1 : return errors.Wrap(err, "MANIFEST flush failed")
641 1 : }
642 2 : if err := vs.manifestFile.Sync(); err != nil {
643 1 : return errors.Wrap(err, "MANIFEST sync failed")
644 1 : }
645 2 : if newManifestFileNum != 0 {
646 2 : // NB: Move() is responsible for syncing the data directory.
647 2 : if err := vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, newManifestFileNum)); err != nil {
648 0 : return errors.Wrap(err, "MANIFEST set current failed")
649 0 : }
650 2 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
651 2 : JobID: int(vu.JobID),
652 2 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
653 2 : FileNum: newManifestFileNum,
654 2 : })
655 : }
656 2 : return nil
657 1 : }(); err != nil {
658 1 : // Any error encountered during any of the operations in the previous
659 1 : // closure are considered fatal. Treating such errors as fatal is preferred
660 1 : // to attempting to unwind various file and b-tree reference counts, and
661 1 : // re-generating L0 sublevel metadata. This may change in the future, if
662 1 : // certain manifest / WAL operations become retryable. For more context, see
663 1 : // #1159 and #1792.
664 1 : vs.opts.Logger.Fatalf("%s", err)
665 1 : return err
666 1 : }
667 :
668 2 : if requireRotation {
669 2 : // Successfully rotated.
670 2 : vs.rotationHelper.Rotate(nextSnapshotFilecount)
671 2 : }
672 : // Now that DB.mu is held again, initialize compacting file info in
673 : // L0Sublevels.
674 2 : inProgress := vu.InProgressCompactionsFn()
675 2 :
676 2 : zombieBlobs, localBlobLiveDelta := getZombieBlobFilesAndComputeLocalMetrics(ve, vs.provider)
677 2 : vs.l0Organizer.PerformUpdate(l0Update, newVersion)
678 2 : vs.l0Organizer.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
679 2 :
680 2 : // Update the zombie objects sets first, as installation of the new version
681 2 : // will unref the previous version which could result in addObsoleteLocked
682 2 : // being called.
683 2 : for _, b := range zombieBackings {
684 2 : vs.zombieTables.Add(objectInfo{
685 2 : fileInfo: fileInfo{
686 2 : FileNum: b.backing.DiskFileNum,
687 2 : FileSize: b.backing.Size,
688 2 : },
689 2 : isLocal: b.isLocal,
690 2 : })
691 2 : }
692 2 : for _, zb := range zombieBlobs {
693 2 : vs.zombieBlobs.Add(zb)
694 2 : }
695 : // Unref the removed backings and report those that already became obsolete.
696 : // Note that the only case where we report obsolete tables here is when
697 : // VirtualBackings.Protect/Unprotect was used to keep a backing alive without
698 : // it being used in the current version.
699 2 : var obsoleteVirtualBackings manifest.ObsoleteFiles
700 2 : for _, b := range removedVirtualBackings {
701 2 : if b.backing.Unref() == 0 {
702 2 : obsoleteVirtualBackings.TableBackings = append(obsoleteVirtualBackings.TableBackings, b.backing)
703 2 : }
704 : }
705 2 : vs.addObsoleteLocked(obsoleteVirtualBackings)
706 2 :
707 2 : // Install the new version.
708 2 : vs.append(newVersion)
709 2 :
710 2 : if ve.MinUnflushedLogNum != 0 {
711 2 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
712 2 : }
713 2 : if newManifestFileNum != 0 {
714 2 : if vs.manifestFileNum != 0 {
715 2 : vs.obsoleteManifests = append(vs.obsoleteManifests, obsoleteFile{
716 2 : fileType: base.FileTypeManifest,
717 2 : fs: vs.fs,
718 2 : path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
719 2 : fileNum: vs.manifestFileNum,
720 2 : fileSize: prevManifestFileSize,
721 2 : isLocal: true,
722 2 : })
723 2 : }
724 2 : vs.manifestFileNum = newManifestFileNum
725 : }
726 :
727 2 : vs.metrics.updateLevelMetrics(vu.Metrics)
728 2 : for i := range vs.metrics.Levels {
729 2 : l := &vs.metrics.Levels[i]
730 2 : l.TablesCount = int64(newVersion.Levels[i].Len())
731 2 : l.VirtualTablesCount = newVersion.Levels[i].NumVirtual
732 2 : l.VirtualTablesSize = newVersion.Levels[i].VirtualTableSize
733 2 : l.TablesSize = int64(newVersion.Levels[i].TableSize())
734 2 : l.EstimatedReferencesSize = newVersion.Levels[i].EstimatedReferenceSize()
735 2 : l.Sublevels = 0
736 2 : if l.TablesCount > 0 {
737 2 : l.Sublevels = 1
738 2 : }
739 2 : if invariants.Enabled {
740 2 : levelFiles := newVersion.Levels[i].Slice()
741 2 : if size := int64(levelFiles.TableSizeSum()); l.TablesSize != size {
742 0 : vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.TablesSize, size)
743 0 : }
744 2 : refSize := uint64(0)
745 2 : for f := range levelFiles.All() {
746 2 : refSize += f.EstimatedReferenceSize()
747 2 : }
748 2 : if refSize != l.EstimatedReferencesSize {
749 0 : vs.opts.Logger.Fatalf("versionSet metrics L%d EstimatedReferencesSize = %d, recomputed size = %d", i, l.EstimatedReferencesSize, refSize)
750 0 : }
751 :
752 2 : if nVirtual := levelFiles.NumVirtual(); nVirtual != l.VirtualTablesCount {
753 0 : vs.opts.Logger.Fatalf(
754 0 : "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
755 0 : i, l.VirtualTablesCount, nVirtual,
756 0 : )
757 0 : }
758 2 : if vSize := levelFiles.VirtualTableSizeSum(); vSize != l.VirtualTablesSize {
759 0 : vs.opts.Logger.Fatalf(
760 0 : "versionSet metrics L%d Virtual size = %d, actual size = %d",
761 0 : i, l.VirtualTablesSize, vSize,
762 0 : )
763 0 : }
764 : }
765 : }
766 2 : vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
767 2 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localTablesLiveDelta.size)
768 2 : vs.metrics.Table.Local.LiveCount = uint64(int64(vs.metrics.Table.Local.LiveCount) + localTablesLiveDelta.count)
769 2 : vs.metrics.BlobFiles.Local.LiveSize = uint64(int64(vs.metrics.BlobFiles.Local.LiveSize) + localBlobLiveDelta.size)
770 2 : vs.metrics.BlobFiles.Local.LiveCount = uint64(int64(vs.metrics.BlobFiles.Local.LiveCount) + localBlobLiveDelta.count)
771 2 :
772 2 : vs.setCompactionPicker(
773 2 : newCompactionPickerByScore(newVersion, vs.l0Organizer, &vs.virtualBackings, vs.opts, inProgress))
774 2 : if !vs.dynamicBaseLevel {
775 1 : vs.picker.forceBaseLevel1()
776 1 : }
777 2 : return nil
778 : }
779 :
780 2 : func (vs *versionSet) setCompactionPicker(picker *compactionPickerByScore) {
781 2 : vs.picker = picker
782 2 : vs.curCompactionConcurrency.Store(int32(picker.getCompactionConcurrency()))
783 2 : }
784 :
785 : type tableBackingInfo struct {
786 : backing *manifest.TableBacking
787 : isLocal bool
788 : }
789 :
790 : type fileMetricDelta struct {
791 : count int64
792 : size int64
793 : }
794 :
795 : // getZombieTablesAndUpdateVirtualBackings updates the virtual backings with the
796 : // changes in the versionEdit and populates ve.RemovedBackingTables.
797 : // Returns:
798 : // - zombieBackings: all backings (physical and virtual) that will no longer be
799 : // needed when we apply ve.
800 : // - removedVirtualBackings: the virtual backings that will be removed by the
801 : // VersionEdit and which must be Unref()ed by the caller. These backings
802 : // match ve.RemovedBackingTables.
803 : // - localLiveSizeDelta: the delta in local live bytes.
804 : func getZombieTablesAndUpdateVirtualBackings(
805 : ve *manifest.VersionEdit, virtualBackings *manifest.VirtualBackings, provider objstorage.Provider,
806 2 : ) (zombieBackings, removedVirtualBackings []tableBackingInfo, localLiveDelta fileMetricDelta) {
807 2 : // First, deal with the physical tables.
808 2 : //
809 2 : // A physical backing has become unused if it is in DeletedFiles but not in
810 2 : // NewFiles or CreatedBackingTables.
811 2 : //
812 2 : // Note that for the common case where there are very few elements, the map
813 2 : // will stay on the stack.
814 2 : stillUsed := make(map[base.DiskFileNum]struct{})
815 2 : for _, nf := range ve.NewTables {
816 2 : if !nf.Meta.Virtual {
817 2 : stillUsed[nf.Meta.TableBacking.DiskFileNum] = struct{}{}
818 2 : isLocal, localFileDelta := sizeIfLocal(nf.Meta.TableBacking, provider)
819 2 : localLiveDelta.size += localFileDelta
820 2 : if isLocal {
821 2 : localLiveDelta.count++
822 2 : }
823 : }
824 : }
825 2 : for _, b := range ve.CreatedBackingTables {
826 2 : stillUsed[b.DiskFileNum] = struct{}{}
827 2 : }
828 2 : for _, m := range ve.DeletedTables {
829 2 : if !m.Virtual {
830 2 : // NB: this deleted file may also be in NewFiles or
831 2 : // CreatedBackingTables, due to a file moving between levels, or
832 2 : // becoming virtualized. In which case there is no change due to this
833 2 : // file in the localLiveSizeDelta -- the subtraction below compensates
834 2 : // for the addition.
835 2 : isLocal, localFileDelta := sizeIfLocal(m.TableBacking, provider)
836 2 : localLiveDelta.size -= localFileDelta
837 2 : if isLocal {
838 2 : localLiveDelta.count--
839 2 : }
840 2 : if _, ok := stillUsed[m.TableBacking.DiskFileNum]; !ok {
841 2 : zombieBackings = append(zombieBackings, tableBackingInfo{
842 2 : backing: m.TableBacking,
843 2 : isLocal: isLocal,
844 2 : })
845 2 : }
846 : }
847 : }
848 :
849 : // Now deal with virtual tables.
850 : //
851 : // When a virtual table moves between levels we AddTable() then RemoveTable(),
852 : // which works out.
853 2 : for _, b := range ve.CreatedBackingTables {
854 2 : virtualBackings.AddAndRef(b)
855 2 : isLocal, localFileDelta := sizeIfLocal(b, provider)
856 2 : localLiveDelta.size += localFileDelta
857 2 : if isLocal {
858 2 : localLiveDelta.count++
859 2 : }
860 : }
861 2 : for _, nf := range ve.NewTables {
862 2 : if nf.Meta.Virtual {
863 2 : virtualBackings.AddTable(nf.Meta)
864 2 : }
865 : }
866 2 : for _, m := range ve.DeletedTables {
867 2 : if m.Virtual {
868 2 : virtualBackings.RemoveTable(m)
869 2 : }
870 : }
871 :
872 2 : if unused := virtualBackings.Unused(); len(unused) > 0 {
873 2 : // Virtual backings that are no longer used are zombies and are also added
874 2 : // to RemovedBackingTables (before the version edit is written to disk).
875 2 : ve.RemovedBackingTables = make([]base.DiskFileNum, len(unused))
876 2 : for i, b := range unused {
877 2 : isLocal, localFileDelta := sizeIfLocal(b, provider)
878 2 : localLiveDelta.size -= localFileDelta
879 2 : if isLocal {
880 2 : localLiveDelta.count--
881 2 : }
882 2 : ve.RemovedBackingTables[i] = b.DiskFileNum
883 2 : zombieBackings = append(zombieBackings, tableBackingInfo{
884 2 : backing: b,
885 2 : isLocal: isLocal,
886 2 : })
887 2 : virtualBackings.Remove(b.DiskFileNum)
888 : }
889 2 : removedVirtualBackings = zombieBackings[len(zombieBackings)-len(unused):]
890 : }
891 2 : return zombieBackings, removedVirtualBackings, localLiveDelta
892 : }
893 :
894 : // getZombieBlobFilesAndComputeLocalMetrics constructs objectInfos for all
895 : // zombie blob files, and computes the metric deltas for live files overall and
896 : // locally.
897 : func getZombieBlobFilesAndComputeLocalMetrics(
898 : ve *manifest.VersionEdit, provider objstorage.Provider,
899 2 : ) (zombieBlobFiles []objectInfo, localLiveDelta fileMetricDelta) {
900 2 : for _, b := range ve.NewBlobFiles {
901 2 : if objstorage.IsLocalBlobFile(provider, b.Physical.FileNum) {
902 2 : localLiveDelta.count++
903 2 : localLiveDelta.size += int64(b.Physical.Size)
904 2 : }
905 : }
906 2 : zombieBlobFiles = make([]objectInfo, 0, len(ve.DeletedBlobFiles))
907 2 : for _, physical := range ve.DeletedBlobFiles {
908 2 : isLocal := objstorage.IsLocalBlobFile(provider, physical.FileNum)
909 2 : if isLocal {
910 2 : localLiveDelta.count--
911 2 : localLiveDelta.size -= int64(physical.Size)
912 2 : }
913 2 : zombieBlobFiles = append(zombieBlobFiles, objectInfo{
914 2 : fileInfo: fileInfo{
915 2 : FileNum: physical.FileNum,
916 2 : FileSize: physical.Size,
917 2 : },
918 2 : isLocal: isLocal,
919 2 : })
920 : }
921 2 : return zombieBlobFiles, localLiveDelta
922 : }
923 :
924 : // sizeIfLocal returns backing.Size if the backing is a local file, else 0.
925 : func sizeIfLocal(
926 : backing *manifest.TableBacking, provider objstorage.Provider,
927 2 : ) (isLocal bool, localSize int64) {
928 2 : isLocal = objstorage.IsLocalTable(provider, backing.DiskFileNum)
929 2 : if isLocal {
930 2 : return true, int64(backing.Size)
931 2 : }
932 2 : return false, 0
933 : }
934 :
935 : func (vs *versionSet) incrementCompactions(
936 : kind compactionKind,
937 : extraLevels []*compactionLevel,
938 : pickerMetrics pickedCompactionMetrics,
939 : bytesWritten int64,
940 : compactionErr error,
941 2 : ) {
942 2 : if kind == compactionKindFlush || kind == compactionKindIngestedFlushable {
943 2 : vs.metrics.Flush.Count++
944 2 : } else {
945 2 : vs.metrics.Compact.Count++
946 2 : if compactionErr != nil {
947 2 : if errors.Is(compactionErr, ErrCancelledCompaction) {
948 2 : vs.metrics.Compact.CancelledCount++
949 2 : vs.metrics.Compact.CancelledBytes += bytesWritten
950 2 : } else {
951 1 : vs.metrics.Compact.FailedCount++
952 1 : }
953 : }
954 : }
955 :
956 2 : switch kind {
957 2 : case compactionKindDefault:
958 2 : vs.metrics.Compact.DefaultCount++
959 :
960 2 : case compactionKindFlush, compactionKindIngestedFlushable:
961 :
962 2 : case compactionKindMove:
963 2 : vs.metrics.Compact.MoveCount++
964 :
965 2 : case compactionKindDeleteOnly:
966 2 : vs.metrics.Compact.DeleteOnlyCount++
967 :
968 2 : case compactionKindElisionOnly:
969 2 : vs.metrics.Compact.ElisionOnlyCount++
970 :
971 1 : case compactionKindRead:
972 1 : vs.metrics.Compact.ReadCount++
973 :
974 2 : case compactionKindTombstoneDensity:
975 2 : vs.metrics.Compact.TombstoneDensityCount++
976 :
977 2 : case compactionKindRewrite:
978 2 : vs.metrics.Compact.RewriteCount++
979 :
980 2 : case compactionKindCopy:
981 2 : vs.metrics.Compact.CopyCount++
982 :
983 0 : default:
984 0 : if invariants.Enabled {
985 0 : panic("unhandled compaction kind")
986 : }
987 : }
988 2 : if len(extraLevels) > 0 {
989 2 : vs.metrics.Compact.MultiLevelCount++
990 2 : }
991 : }
992 :
993 2 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
994 2 : vs.atomicInProgressBytes.Add(numBytes)
995 2 : }
996 :
997 : // createManifest creates a manifest file that contains a snapshot of vs.
998 : func (vs *versionSet) createManifest(
999 : dirname string,
1000 : fileNum, minUnflushedLogNum base.DiskFileNum,
1001 : nextFileNum uint64,
1002 : virtualBackings []*manifest.TableBacking,
1003 2 : ) (err error) {
1004 2 : var (
1005 2 : filename = base.MakeFilepath(vs.fs, dirname, base.FileTypeManifest, fileNum)
1006 2 : manifestFile vfs.File
1007 2 : manifestWriter *record.Writer
1008 2 : )
1009 2 : defer func() {
1010 2 : if manifestWriter != nil {
1011 0 : _ = manifestWriter.Close()
1012 0 : }
1013 2 : if manifestFile != nil {
1014 0 : _ = manifestFile.Close()
1015 0 : }
1016 2 : if err != nil {
1017 1 : _ = vs.fs.Remove(filename)
1018 1 : }
1019 : }()
1020 2 : manifestFile, err = vs.fs.Create(filename, "pebble-manifest")
1021 2 : if err != nil {
1022 1 : return err
1023 1 : }
1024 2 : manifestWriter = record.NewWriter(manifestFile)
1025 2 :
1026 2 : snapshot := manifest.VersionEdit{
1027 2 : ComparerName: vs.cmp.Name,
1028 2 : // When creating a version snapshot for an existing DB, this snapshot
1029 2 : // VersionEdit will be immediately followed by another VersionEdit
1030 2 : // (being written in UpdateVersionLocked()). That VersionEdit always
1031 2 : // contains a LastSeqNum, so we don't need to include that in the
1032 2 : // snapshot. But it does not necessarily include MinUnflushedLogNum,
1033 2 : // NextFileNum, so we initialize those using the corresponding fields in
1034 2 : // the versionSet (which came from the latest preceding VersionEdit that
1035 2 : // had those fields).
1036 2 : MinUnflushedLogNum: minUnflushedLogNum,
1037 2 : NextFileNum: nextFileNum,
1038 2 : CreatedBackingTables: virtualBackings,
1039 2 : NewBlobFiles: vs.blobFiles.Metadatas(),
1040 2 : }
1041 2 :
1042 2 : // Add all extant sstables in the current version.
1043 2 : for level, levelMetadata := range vs.currentVersion().Levels {
1044 2 : for meta := range levelMetadata.All() {
1045 2 : snapshot.NewTables = append(snapshot.NewTables, manifest.NewTableEntry{
1046 2 : Level: level,
1047 2 : Meta: meta,
1048 2 : })
1049 2 : }
1050 : }
1051 :
1052 2 : w, err1 := manifestWriter.Next()
1053 2 : if err1 != nil {
1054 0 : return err1
1055 0 : }
1056 2 : if err := snapshot.Encode(w); err != nil {
1057 0 : return err
1058 0 : }
1059 :
1060 2 : if vs.manifest != nil {
1061 2 : if err := vs.manifest.Close(); err != nil {
1062 0 : return err
1063 0 : }
1064 2 : vs.manifest = nil
1065 : }
1066 2 : if vs.manifestFile != nil {
1067 2 : if err := vs.manifestFile.Close(); err != nil {
1068 0 : return err
1069 0 : }
1070 2 : vs.manifestFile = nil
1071 : }
1072 :
1073 2 : vs.manifest, manifestWriter = manifestWriter, nil
1074 2 : vs.manifestFile, manifestFile = manifestFile, nil
1075 2 : return nil
1076 : }
1077 :
1078 : // NB: This method is not safe for concurrent use. It is only safe
1079 : // to be called when concurrent changes to nextFileNum are not expected.
1080 2 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
1081 2 : if vs.nextFileNum.Load() <= uint64(fileNum) {
1082 2 : vs.nextFileNum.Store(uint64(fileNum + 1))
1083 2 : }
1084 : }
1085 :
1086 : // getNextTableNum returns a new table number.
1087 : //
1088 : // Can be called without the versionSet's mutex being held.
1089 2 : func (vs *versionSet) getNextTableNum() base.TableNum {
1090 2 : x := vs.nextFileNum.Add(1) - 1
1091 2 : return base.TableNum(x)
1092 2 : }
1093 :
1094 : // Can be called without the versionSet's mutex being held.
1095 2 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
1096 2 : x := vs.nextFileNum.Add(1) - 1
1097 2 : return base.DiskFileNum(x)
1098 2 : }
1099 :
1100 2 : func (vs *versionSet) append(v *manifest.Version) {
1101 2 : if v.Refs() != 0 {
1102 0 : panic("pebble: version should be unreferenced")
1103 : }
1104 2 : if !vs.versions.Empty() {
1105 2 : vs.versions.Back().UnrefLocked()
1106 2 : }
1107 2 : v.Deleted = vs.obsoleteFn
1108 2 : v.Ref()
1109 2 : vs.versions.PushBack(v)
1110 2 : if invariants.Enabled {
1111 2 : // Verify that the virtualBackings contains all the backings referenced by
1112 2 : // the version.
1113 2 : for _, l := range v.Levels {
1114 2 : for f := range l.All() {
1115 2 : if f.Virtual {
1116 2 : if _, ok := vs.virtualBackings.Get(f.TableBacking.DiskFileNum); !ok {
1117 0 : panic(fmt.Sprintf("%s is not in virtualBackings", f.TableBacking.DiskFileNum))
1118 : }
1119 : }
1120 : }
1121 : }
1122 : }
1123 : }
1124 :
1125 2 : func (vs *versionSet) currentVersion() *manifest.Version {
1126 2 : return vs.versions.Back()
1127 2 : }
1128 :
1129 2 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
1130 2 : current := vs.currentVersion()
1131 2 : for v := vs.versions.Front(); true; v = v.Next() {
1132 2 : for _, lm := range v.Levels {
1133 2 : for f := range lm.All() {
1134 2 : m[f.TableBacking.DiskFileNum] = struct{}{}
1135 2 : for _, ref := range f.BlobReferences {
1136 2 : // TODO(jackson): Once we support blob file replacement, we
1137 2 : // need to look up the new blob file's number here.
1138 2 : diskFileNum := blob.DiskFileNumTODO(ref.FileID)
1139 2 : m[diskFileNum] = struct{}{}
1140 2 : }
1141 : }
1142 : }
1143 2 : if v == current {
1144 2 : break
1145 : }
1146 : }
1147 : // virtualBackings contains backings that are referenced by some virtual
1148 : // tables in the latest version (which are handled above), and backings that
1149 : // are not but are still alive because of the protection mechanism (see
1150 : // manifset.VirtualBackings). This loop ensures the latter get added to the
1151 : // map.
1152 2 : vs.virtualBackings.ForEach(func(b *manifest.TableBacking) {
1153 2 : m[b.DiskFileNum] = struct{}{}
1154 2 : })
1155 : }
1156 :
1157 : // addObsoleteLocked will add the fileInfo associated with obsolete backing
1158 : // sstables to the obsolete tables list.
1159 : //
1160 : // The file backings in the obsolete list must not appear more than once.
1161 : //
1162 : // DB.mu must be held when addObsoleteLocked is called.
1163 2 : func (vs *versionSet) addObsoleteLocked(obsolete manifest.ObsoleteFiles) {
1164 2 : if obsolete.Count() == 0 {
1165 2 : return
1166 2 : }
1167 :
1168 : // Note that the zombie objects transition from zombie *to* obsolete, and
1169 : // will no longer be considered zombie.
1170 :
1171 2 : newlyObsoleteTables := make([]obsoleteFile, len(obsolete.TableBackings))
1172 2 : for i, bs := range obsolete.TableBackings {
1173 2 : newlyObsoleteTables[i] = vs.zombieTables.Extract(bs.DiskFileNum).
1174 2 : asObsoleteFile(vs.fs, base.FileTypeTable, vs.dirname)
1175 2 : }
1176 2 : vs.obsoleteTables = mergeObsoleteFiles(vs.obsoleteTables, newlyObsoleteTables)
1177 2 :
1178 2 : newlyObsoleteBlobFiles := make([]obsoleteFile, len(obsolete.BlobFiles))
1179 2 : for i, bf := range obsolete.BlobFiles {
1180 2 : newlyObsoleteBlobFiles[i] = vs.zombieBlobs.Extract(bf.FileNum).
1181 2 : asObsoleteFile(vs.fs, base.FileTypeBlob, vs.dirname)
1182 2 : }
1183 2 : vs.obsoleteBlobs = mergeObsoleteFiles(vs.obsoleteBlobs, newlyObsoleteBlobFiles)
1184 2 : vs.updateObsoleteObjectMetricsLocked()
1185 : }
1186 :
1187 : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
1188 : // called.
1189 2 : func (vs *versionSet) addObsolete(obsolete manifest.ObsoleteFiles) {
1190 2 : vs.mu.Lock()
1191 2 : defer vs.mu.Unlock()
1192 2 : vs.addObsoleteLocked(obsolete)
1193 2 : }
1194 :
1195 2 : func (vs *versionSet) updateObsoleteObjectMetricsLocked() {
1196 2 : // TODO(jackson): Ideally we would update vs.fileDeletions.queuedStats to
1197 2 : // include the files on vs.obsolete{Tables,Blobs}, but there's subtlety in
1198 2 : // deduplicating the files before computing the stats. It might also be
1199 2 : // possible to refactor to remove the vs.obsolete{Tables,Blobs} intermediary
1200 2 : // step. Revisit this.
1201 2 : vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
1202 2 : vs.metrics.Table.ObsoleteSize = 0
1203 2 : vs.metrics.Table.Local.ObsoleteSize = 0
1204 2 : vs.metrics.Table.Local.ObsoleteCount = 0
1205 2 : for _, fi := range vs.obsoleteTables {
1206 2 : vs.metrics.Table.ObsoleteSize += fi.fileSize
1207 2 : if fi.isLocal {
1208 2 : vs.metrics.Table.Local.ObsoleteSize += fi.fileSize
1209 2 : vs.metrics.Table.Local.ObsoleteCount++
1210 2 : }
1211 : }
1212 2 : vs.metrics.BlobFiles.ObsoleteCount = uint64(len(vs.obsoleteBlobs))
1213 2 : vs.metrics.BlobFiles.ObsoleteSize = 0
1214 2 : vs.metrics.BlobFiles.Local.ObsoleteSize = 0
1215 2 : vs.metrics.BlobFiles.Local.ObsoleteCount = 0
1216 2 : for _, fi := range vs.obsoleteBlobs {
1217 2 : vs.metrics.BlobFiles.ObsoleteSize += fi.fileSize
1218 2 : if fi.isLocal {
1219 2 : vs.metrics.BlobFiles.Local.ObsoleteSize += fi.fileSize
1220 2 : vs.metrics.BlobFiles.Local.ObsoleteCount++
1221 2 : }
1222 : }
1223 : }
1224 :
1225 : func findCurrentManifest(
1226 : fs vfs.FS, dirname string, ls []string,
1227 2 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
1228 2 : // Locating a marker should succeed even if the marker has never been placed.
1229 2 : var filename string
1230 2 : marker, filename, err = atomicfs.LocateMarkerInListing(fs, dirname, manifestMarkerName, ls)
1231 2 : if err != nil {
1232 1 : return nil, 0, false, err
1233 1 : }
1234 :
1235 2 : if filename == "" {
1236 2 : // The marker hasn't been set yet. This database doesn't exist.
1237 2 : return marker, 0, false, nil
1238 2 : }
1239 :
1240 2 : var ok bool
1241 2 : _, manifestNum, ok = base.ParseFilename(fs, filename)
1242 2 : if !ok {
1243 0 : return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
1244 0 : }
1245 2 : return marker, manifestNum, true, nil
1246 : }
1247 :
1248 1 : func newFileMetrics(newFiles []manifest.NewTableEntry) levelMetricsDelta {
1249 1 : var m levelMetricsDelta
1250 1 : for _, nf := range newFiles {
1251 1 : lm := m[nf.Level]
1252 1 : if lm == nil {
1253 1 : lm = &LevelMetrics{}
1254 1 : m[nf.Level] = lm
1255 1 : }
1256 1 : lm.TablesCount++
1257 1 : lm.TablesSize += int64(nf.Meta.Size)
1258 1 : lm.EstimatedReferencesSize += nf.Meta.EstimatedReferenceSize()
1259 : }
1260 1 : return m
1261 : }
|