Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "sync"
11 : "sync/atomic"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/record"
19 : "github.com/cockroachdb/pebble/vfs"
20 : "github.com/cockroachdb/pebble/vfs/atomicfs"
21 : )
22 :
23 : const numLevels = manifest.NumLevels
24 :
25 : const manifestMarkerName = `manifest`
26 :
27 : // Provide type aliases for the various manifest structs.
28 : type bulkVersionEdit = manifest.BulkVersionEdit
29 : type deletedFileEntry = manifest.DeletedTableEntry
30 : type tableMetadata = manifest.TableMetadata
31 : type fileBacking = manifest.FileBacking
32 : type newTableEntry = manifest.NewTableEntry
33 : type version = manifest.Version
34 : type versionEdit = manifest.VersionEdit
35 : type versionList = manifest.VersionList
36 :
37 : // versionSet manages a collection of immutable versions, and manages the
38 : // creation of a new version from the most recent version. A new version is
39 : // created from an existing version by applying a version edit which is just
40 : // like it sounds: a delta from the previous version. Version edits are logged
41 : // to the MANIFEST file, which is replayed at startup.
42 : type versionSet struct {
43 : // Next seqNum to use for WAL writes.
44 : logSeqNum base.AtomicSeqNum
45 :
46 : // The upper bound on sequence numbers that have been assigned so far. A
47 : // suffix of these sequence numbers may not have been written to a WAL. Both
48 : // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
49 : // visibleSeqNum is <= logSeqNum.
50 : visibleSeqNum base.AtomicSeqNum
51 :
52 : // Number of bytes present in sstables being written by in-progress
53 : // compactions. This value will be zero if there are no in-progress
54 : // compactions. Updated and read atomically.
55 : atomicInProgressBytes atomic.Int64
56 :
57 : // Immutable fields.
58 : dirname string
59 : provider objstorage.Provider
60 : // Set to DB.mu.
61 : mu *sync.Mutex
62 : opts *Options
63 : fs vfs.FS
64 : cmp *base.Comparer
65 : // Dynamic base level allows the dynamic base level computation to be
66 : // disabled. Used by tests which want to create specific LSM structures.
67 : dynamicBaseLevel bool
68 :
69 : // Mutable fields.
70 : versions versionList
71 : l0Organizer *manifest.L0Organizer
72 : // blobFiles is the set of blob files referenced by the current version.
73 : // blobFiles is protected by the manifest logLock (not vs.mu).
74 : blobFiles manifest.CurrentBlobFileSet
75 : picker compactionPicker
76 : // curCompactionConcurrency is updated whenever picker is updated.
77 : // INVARIANT: >= 1.
78 : curCompactionConcurrency atomic.Int32
79 :
80 : // Not all metrics are kept here. See DB.Metrics().
81 : metrics Metrics
82 :
83 : // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
84 : // on the creation of every version.
85 : obsoleteFn func(manifest.ObsoleteFiles)
86 : // obsolete{Tables,Blobs,Manifests,Options} are sorted by file number ascending.
87 : obsoleteTables []objectInfo
88 : obsoleteBlobs []objectInfo
89 : obsoleteManifests []obsoleteFile
90 : obsoleteOptions []obsoleteFile
91 :
92 : // Zombie tables which have been removed from the current version but are
93 : // still referenced by an inuse iterator.
94 : zombieTables zombieObjects
95 : // Zombie blobs which have been removed from the current version but are
96 : // still referenced by an inuse iterator.
97 : zombieBlobs zombieObjects
98 : // virtualBackings contains information about the FileBackings which support
99 : // virtual sstables in the latest version. It is mainly used to determine when
100 : // a backing is no longer in use by the tables in the latest version; this is
101 : // not a trivial problem because compactions and other operations can happen
102 : // in parallel (and they can finish in unpredictable order).
103 : //
104 : // This mechanism is complementary to the backing Ref/Unref mechanism, which
105 : // determines when a backing is no longer used by *any* live version and can
106 : // be removed.
107 : //
108 : // In principle this should have been a copy-on-write structure, with each
109 : // Version having its own record of its virtual backings (similar to the
110 : // B-tree that holds the tables). However, in practice we only need it for the
111 : // latest version, so we use a simpler structure and store it in the
112 : // versionSet instead.
113 : //
114 : // virtualBackings is modified under DB.mu and the log lock. If it is accessed
115 : // under DB.mu and a version update is in progress, it reflects the state of
116 : // the next version.
117 : virtualBackings manifest.VirtualBackings
118 :
119 : // minUnflushedLogNum is the smallest WAL log file number corresponding to
120 : // mutations that have not been flushed to an sstable.
121 : minUnflushedLogNum base.DiskFileNum
122 :
123 : // The next file number. A single counter is used to assign file
124 : // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
125 : nextFileNum atomic.Uint64
126 :
127 : // The current manifest file number.
128 : manifestFileNum base.DiskFileNum
129 : manifestMarker *atomicfs.Marker
130 :
131 : manifestFile vfs.File
132 : manifest *record.Writer
133 : getFormatMajorVersion func() FormatMajorVersion
134 :
135 : writing bool
136 : writerCond sync.Cond
137 : // State for deciding when to write a snapshot. Protected by mu.
138 : rotationHelper record.RotationHelper
139 :
140 : pickedCompactionCache pickedCompactionCache
141 : }
142 :
143 : func (vs *versionSet) init(
144 : dirname string,
145 : provider objstorage.Provider,
146 : opts *Options,
147 : marker *atomicfs.Marker,
148 : getFMV func() FormatMajorVersion,
149 : mu *sync.Mutex,
150 1 : ) {
151 1 : vs.dirname = dirname
152 1 : vs.provider = provider
153 1 : vs.mu = mu
154 1 : vs.writerCond.L = mu
155 1 : vs.opts = opts
156 1 : vs.fs = opts.FS
157 1 : vs.cmp = opts.Comparer
158 1 : vs.dynamicBaseLevel = true
159 1 : vs.versions.Init(mu)
160 1 : vs.l0Organizer = manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes)
161 1 : vs.obsoleteFn = vs.addObsoleteLocked
162 1 : vs.zombieTables = makeZombieObjects()
163 1 : vs.zombieBlobs = makeZombieObjects()
164 1 : vs.virtualBackings = manifest.MakeVirtualBackings()
165 1 : vs.nextFileNum.Store(1)
166 1 : vs.manifestMarker = marker
167 1 : vs.getFormatMajorVersion = getFMV
168 1 : }
169 :
170 : // create creates a version set for a fresh DB.
171 : func (vs *versionSet) create(
172 : jobID JobID,
173 : dirname string,
174 : provider objstorage.Provider,
175 : opts *Options,
176 : marker *atomicfs.Marker,
177 : getFormatMajorVersion func() FormatMajorVersion,
178 : mu *sync.Mutex,
179 1 : ) error {
180 1 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
181 1 : emptyVersion := manifest.NewInitialVersion(opts.Comparer)
182 1 : vs.append(emptyVersion)
183 1 : vs.blobFiles.Init(nil)
184 1 :
185 1 : vs.setCompactionPicker(
186 1 : newCompactionPickerByScore(emptyVersion, vs.l0Organizer, &vs.virtualBackings, vs.opts, nil))
187 1 : // Note that a "snapshot" version edit is written to the manifest when it is
188 1 : // created.
189 1 : vs.manifestFileNum = vs.getNextDiskFileNum()
190 1 : err := vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum.Load(), nil /* virtualBackings */)
191 1 : if err == nil {
192 1 : if err = vs.manifest.Flush(); err != nil {
193 0 : vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
194 0 : }
195 : }
196 1 : if err == nil {
197 1 : if err = vs.manifestFile.Sync(); err != nil {
198 0 : vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
199 0 : }
200 : }
201 1 : if err == nil {
202 1 : // NB: Move() is responsible for syncing the data directory.
203 1 : if err = vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, vs.manifestFileNum)); err != nil {
204 0 : vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
205 0 : }
206 : }
207 :
208 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
209 1 : JobID: int(jobID),
210 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
211 1 : FileNum: vs.manifestFileNum,
212 1 : Err: err,
213 1 : })
214 1 : if err != nil {
215 0 : return err
216 0 : }
217 1 : return nil
218 : }
219 :
220 : // load loads the version set from the manifest file.
221 : func (vs *versionSet) load(
222 : dirname string,
223 : provider objstorage.Provider,
224 : opts *Options,
225 : manifestFileNum base.DiskFileNum,
226 : marker *atomicfs.Marker,
227 : getFormatMajorVersion func() FormatMajorVersion,
228 : mu *sync.Mutex,
229 1 : ) error {
230 1 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
231 1 :
232 1 : vs.manifestFileNum = manifestFileNum
233 1 : manifestPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeManifest, vs.manifestFileNum)
234 1 : manifestFilename := opts.FS.PathBase(manifestPath)
235 1 :
236 1 : // Read the versionEdits in the manifest file.
237 1 : var bve bulkVersionEdit
238 1 : bve.AddedTablesByFileNum = make(map[base.FileNum]*tableMetadata)
239 1 : manifestFile, err := vs.fs.Open(manifestPath)
240 1 : if err != nil {
241 0 : return errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
242 0 : errors.Safe(manifestFilename), dirname)
243 0 : }
244 1 : defer manifestFile.Close()
245 1 : rr := record.NewReader(manifestFile, 0 /* logNum */)
246 1 : for {
247 1 : r, err := rr.Next()
248 1 : if err == io.EOF || record.IsInvalidRecord(err) {
249 1 : break
250 : }
251 1 : if err != nil {
252 0 : return errors.Wrapf(err, "pebble: error when loading manifest file %q",
253 0 : errors.Safe(manifestFilename))
254 0 : }
255 1 : var ve versionEdit
256 1 : err = ve.Decode(r)
257 1 : if err != nil {
258 0 : // Break instead of returning an error if the record is corrupted
259 0 : // or invalid.
260 0 : if err == io.EOF || record.IsInvalidRecord(err) {
261 0 : break
262 : }
263 0 : return err
264 : }
265 1 : if ve.ComparerName != "" {
266 1 : if ve.ComparerName != vs.cmp.Name {
267 0 : return errors.Errorf("pebble: manifest file %q for DB %q: "+
268 0 : "comparer name from file %q != comparer name from Options %q",
269 0 : errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(vs.cmp.Name))
270 0 : }
271 : }
272 1 : if err := bve.Accumulate(&ve); err != nil {
273 0 : return err
274 0 : }
275 1 : if ve.MinUnflushedLogNum != 0 {
276 1 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
277 1 : }
278 1 : if ve.NextFileNum != 0 {
279 1 : vs.nextFileNum.Store(ve.NextFileNum)
280 1 : }
281 1 : if ve.LastSeqNum != 0 {
282 1 : // logSeqNum is the _next_ sequence number that will be assigned,
283 1 : // while LastSeqNum is the last assigned sequence number. Note that
284 1 : // this behaviour mimics that in RocksDB; the first sequence number
285 1 : // assigned is one greater than the one present in the manifest
286 1 : // (assuming no WALs contain higher sequence numbers than the
287 1 : // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
288 1 : // next sequence number that will be assigned.
289 1 : //
290 1 : // If LastSeqNum is less than SeqNumStart, increase it to at least
291 1 : // SeqNumStart to leave ample room for reserved sequence numbers.
292 1 : if ve.LastSeqNum+1 < base.SeqNumStart {
293 0 : vs.logSeqNum.Store(base.SeqNumStart)
294 1 : } else {
295 1 : vs.logSeqNum.Store(ve.LastSeqNum + 1)
296 1 : }
297 : }
298 : }
299 : // We have already set vs.nextFileNum = 2 at the beginning of the
300 : // function and could have only updated it to some other non-zero value,
301 : // so it cannot be 0 here.
302 1 : if vs.minUnflushedLogNum == 0 {
303 1 : if vs.nextFileNum.Load() >= 2 {
304 1 : // We either have a freshly created DB, or a DB created by RocksDB
305 1 : // that has not had a single flushed SSTable yet. This is because
306 1 : // RocksDB bumps up nextFileNum in this case without bumping up
307 1 : // minUnflushedLogNum, even if WALs with non-zero file numbers are
308 1 : // present in the directory.
309 1 : } else {
310 0 : return base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
311 0 : errors.Safe(manifestFilename), dirname)
312 0 : }
313 : }
314 1 : vs.markFileNumUsed(vs.minUnflushedLogNum)
315 1 :
316 1 : // Populate the fileBackingMap and the FileBacking for virtual sstables since
317 1 : // we have finished version edit accumulation.
318 1 : for _, b := range bve.AddedFileBacking {
319 1 : vs.virtualBackings.AddAndRef(b)
320 1 : }
321 :
322 1 : for _, addedLevel := range bve.AddedTables {
323 1 : for _, m := range addedLevel {
324 1 : if m.Virtual {
325 1 : vs.virtualBackings.AddTable(m)
326 1 : }
327 : }
328 : }
329 :
330 1 : if invariants.Enabled {
331 1 : // There should be no deleted tables or backings, since we're starting from
332 1 : // an empty state.
333 1 : for _, deletedLevel := range bve.DeletedTables {
334 1 : if len(deletedLevel) != 0 {
335 0 : panic("deleted files after manifest replay")
336 : }
337 : }
338 1 : if len(bve.RemovedFileBacking) > 0 {
339 0 : panic("deleted backings after manifest replay")
340 : }
341 : }
342 :
343 1 : emptyVersion := manifest.NewInitialVersion(opts.Comparer)
344 1 : newVersion, err := bve.Apply(emptyVersion, opts.Experimental.ReadCompactionRate)
345 1 : if err != nil {
346 0 : return err
347 0 : }
348 1 : vs.l0Organizer.PerformUpdate(vs.l0Organizer.PrepareUpdate(&bve, newVersion), newVersion)
349 1 : vs.l0Organizer.InitCompactingFileInfo(nil /* in-progress compactions */)
350 1 : vs.blobFiles.Init(&bve)
351 1 : vs.append(newVersion)
352 1 :
353 1 : for i := range vs.metrics.Levels {
354 1 : l := &vs.metrics.Levels[i]
355 1 : l.TablesCount = int64(newVersion.Levels[i].Len())
356 1 : files := newVersion.Levels[i].Slice()
357 1 : l.TablesSize = int64(files.SizeSum())
358 1 : }
359 1 : for _, l := range newVersion.Levels {
360 1 : for f := range l.All() {
361 1 : if !f.Virtual {
362 1 : isLocal, localSize := sizeIfLocal(f.FileBacking, vs.provider)
363 1 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
364 1 : if isLocal {
365 1 : vs.metrics.Table.Local.LiveCount++
366 1 : }
367 : }
368 : }
369 : }
370 1 : vs.virtualBackings.ForEach(func(backing *fileBacking) {
371 1 : isLocal, localSize := sizeIfLocal(backing, vs.provider)
372 1 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
373 1 : if isLocal {
374 1 : vs.metrics.Table.Local.LiveCount++
375 1 : }
376 : })
377 :
378 1 : vs.setCompactionPicker(
379 1 : newCompactionPickerByScore(newVersion, vs.l0Organizer, &vs.virtualBackings, vs.opts, nil))
380 1 : return nil
381 : }
382 :
383 1 : func (vs *versionSet) close() error {
384 1 : if vs.manifestFile != nil {
385 1 : if err := vs.manifestFile.Close(); err != nil {
386 0 : return err
387 0 : }
388 : }
389 1 : if vs.manifestMarker != nil {
390 1 : if err := vs.manifestMarker.Close(); err != nil {
391 0 : return err
392 0 : }
393 : }
394 1 : return nil
395 : }
396 :
397 : // logLock locks the manifest for writing. The lock must be released by
398 : // a call to logUnlock.
399 : //
400 : // DB.mu must be held when calling this method, but the mutex may be dropped and
401 : // re-acquired during the course of this method.
402 1 : func (vs *versionSet) logLock() {
403 1 : // Wait for any existing writing to the manifest to complete, then mark the
404 1 : // manifest as busy.
405 1 : for vs.writing {
406 1 : // Note: writerCond.L is DB.mu, so we unlock it while we wait.
407 1 : vs.writerCond.Wait()
408 1 : }
409 1 : vs.writing = true
410 : }
411 :
412 : // logUnlock releases the lock for manifest writing.
413 : //
414 : // DB.mu must be held when calling this method.
415 1 : func (vs *versionSet) logUnlock() {
416 1 : if !vs.writing {
417 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
418 0 : }
419 1 : vs.writing = false
420 1 : vs.writerCond.Signal()
421 : }
422 :
423 1 : func (vs *versionSet) logUnlockAndInvalidatePickedCompactionCache() {
424 1 : vs.pickedCompactionCache.invalidate()
425 1 : vs.logUnlock()
426 1 : }
427 :
428 : // versionUpdate is returned by the function passed to UpdateVersionLocked.
429 : //
430 : // If VE is nil, there is no update to apply (but it is not an error).
431 : type versionUpdate struct {
432 : VE *manifest.VersionEdit
433 : JobID JobID
434 : Metrics levelMetricsDelta
435 : // InProgressCompactionFn is called while DB.mu is held after the I/O part of
436 : // the update was performed. It should return any compactions that are
437 : // in-progress (excluding than the one that is being applied).
438 : InProgressCompactionsFn func() []compactionInfo
439 : ForceManifestRotation bool
440 : }
441 :
442 : // UpdateVersionLocked is used to update the current version.
443 : //
444 : // DB.mu must be held. UpdateVersionLocked first waits for any other version
445 : // update to complete, releasing and reacquiring DB.mu.
446 : //
447 : // UpdateVersionLocked then calls updateFn which builds a versionUpdate, while
448 : // holding DB.mu. The updateFn can release and reacquire DB.mu (it should
449 : // attempt to do as much work as possible outside of the lock).
450 : //
451 : // UpdateVersionLocked fills in the following fields of the VersionEdit:
452 : // NextFileNum, LastSeqNum, RemovedBackingTables. The removed backing tables are
453 : // those backings that are no longer used (in the new version) after applying
454 : // the edit (as per vs.virtualBackings). Other than these fields, the
455 : // VersionEdit must be complete.
456 : //
457 : // New table backing references (FileBacking.Ref) are taken as part of applying
458 : // the version edit. The state of the virtual backings (vs.virtualBackings) is
459 : // updated before logging to the manifest and installing the latest version;
460 : // this is ok because any failure in those steps is fatal.
461 : //
462 : // If updateFn returns an error, no update is applied and that same error is returned.
463 : // If versionUpdate.VE is nil, the no update is applied (and no error is returned).
464 1 : func (vs *versionSet) UpdateVersionLocked(updateFn func() (versionUpdate, error)) error {
465 1 : vs.logLock()
466 1 : defer vs.logUnlockAndInvalidatePickedCompactionCache()
467 1 :
468 1 : vu, err := updateFn()
469 1 : if err != nil || vu.VE == nil {
470 1 : return err
471 1 : }
472 :
473 1 : if !vs.writing {
474 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
475 0 : }
476 :
477 1 : ve := vu.VE
478 1 : if ve.MinUnflushedLogNum != 0 {
479 1 : if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
480 1 : vs.nextFileNum.Load() <= uint64(ve.MinUnflushedLogNum) {
481 0 : panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
482 0 : ve.MinUnflushedLogNum))
483 : }
484 : }
485 :
486 : // This is the next manifest filenum, but if the current file is too big we
487 : // will write this ve to the next file which means what ve encodes is the
488 : // current filenum and not the next one.
489 : //
490 : // TODO(sbhola): figure out why this is correct and update comment.
491 1 : ve.NextFileNum = vs.nextFileNum.Load()
492 1 :
493 1 : // LastSeqNum is set to the current upper bound on the assigned sequence
494 1 : // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
495 1 : // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
496 1 : // replay. It must be higher than or equal to any than any sequence number
497 1 : // written to an sstable, including sequence numbers in ingested files.
498 1 : // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
499 1 : // number. This is fallout from ingestion which allows a sequence number X to
500 1 : // be assigned to an ingested sstable even though sequence number X-1 resides
501 1 : // in an unflushed memtable. logSeqNum is the _next_ sequence number that
502 1 : // will be assigned, so subtract that by 1 to get the upper bound on the
503 1 : // last assigned sequence number.
504 1 : logSeqNum := vs.logSeqNum.Load()
505 1 : ve.LastSeqNum = logSeqNum - 1
506 1 : if logSeqNum == 0 {
507 0 : // logSeqNum is initialized to 1 in Open() if there are no previous WAL
508 0 : // or manifest records, so this case should never happen.
509 0 : vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
510 0 : }
511 :
512 1 : currentVersion := vs.currentVersion()
513 1 : var newVersion *version
514 1 :
515 1 : // Generate a new manifest if we don't currently have one, or forceRotation
516 1 : // is true, or the current one is too large.
517 1 : //
518 1 : // For largeness, we do not exclusively use MaxManifestFileSize size
519 1 : // threshold since we have had incidents where due to either large keys or
520 1 : // large numbers of files, each edit results in a snapshot + write of the
521 1 : // edit. This slows the system down since each flush or compaction is
522 1 : // writing a new manifest snapshot. The primary goal of the size-based
523 1 : // rollover logic is to ensure that when reopening a DB, the number of edits
524 1 : // that need to be replayed on top of the snapshot is "sane". Rolling over
525 1 : // to a new manifest after each edit is not relevant to that goal.
526 1 : //
527 1 : // Consider the following cases:
528 1 : // - The number of live files F in the DB is roughly stable: after writing
529 1 : // the snapshot (with F files), say we require that there be enough edits
530 1 : // such that the cumulative number of files in those edits, E, be greater
531 1 : // than F. This will ensure that the total amount of time in
532 1 : // UpdateVersionLocked that is spent in snapshot writing is ~50%.
533 1 : //
534 1 : // - The number of live files F in the DB is shrinking drastically, say from
535 1 : // F to F/10: This can happen for various reasons, like wide range
536 1 : // tombstones, or large numbers of smaller than usual files that are being
537 1 : // merged together into larger files. And say the new files generated
538 1 : // during this shrinkage is insignificant compared to F/10, and so for
539 1 : // this example we will assume it is effectively 0. After this shrinking,
540 1 : // E = 0.9F, and so if we used the previous snapshot file count, F, as the
541 1 : // threshold that needs to be exceeded, we will further delay the snapshot
542 1 : // writing. Which means on DB reopen we will need to replay 0.9F edits to
543 1 : // get to a version with 0.1F files. It would be better to create a new
544 1 : // snapshot when E exceeds the number of files in the current version.
545 1 : //
546 1 : // - The number of live files F in the DB is growing via perfect ingests
547 1 : // into L6: Say we wrote the snapshot when there were F files and now we
548 1 : // have 10F files, so E = 9F. We will further delay writing a new
549 1 : // snapshot. This case can be critiqued as contrived, but we consider it
550 1 : // nonetheless.
551 1 : //
552 1 : // The logic below uses the min of the last snapshot file count and the file
553 1 : // count in the current version.
554 1 : vs.rotationHelper.AddRecord(int64(len(ve.DeletedTables) + len(ve.NewTables)))
555 1 : sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
556 1 : requireRotation := vu.ForceManifestRotation || vs.manifest == nil
557 1 :
558 1 : var nextSnapshotFilecount int64
559 1 : for i := range vs.metrics.Levels {
560 1 : nextSnapshotFilecount += vs.metrics.Levels[i].TablesCount
561 1 : }
562 1 : if sizeExceeded && !requireRotation {
563 1 : requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
564 1 : }
565 1 : var newManifestFileNum base.DiskFileNum
566 1 : var prevManifestFileSize uint64
567 1 : var newManifestVirtualBackings []*fileBacking
568 1 : if requireRotation {
569 1 : newManifestFileNum = vs.getNextDiskFileNum()
570 1 : prevManifestFileSize = uint64(vs.manifest.Size())
571 1 :
572 1 : // We want the virtual backings *before* applying the version edit, because
573 1 : // the new manifest will contain the pre-apply version plus the last version
574 1 : // edit.
575 1 : newManifestVirtualBackings = vs.virtualBackings.Backings()
576 1 : }
577 :
578 : // Grab certain values before releasing vs.mu, in case createManifest() needs
579 : // to be called.
580 1 : minUnflushedLogNum := vs.minUnflushedLogNum
581 1 : nextFileNum := vs.nextFileNum.Load()
582 1 :
583 1 : // Note: this call populates ve.RemovedBackingTables.
584 1 : zombieBackings, removedVirtualBackings, localTablesLiveDelta :=
585 1 : getZombieTablesAndUpdateVirtualBackings(ve, &vs.virtualBackings, vs.provider)
586 1 :
587 1 : var l0Update manifest.L0PreparedUpdate
588 1 : if err := func() error {
589 1 : vs.mu.Unlock()
590 1 : defer vs.mu.Lock()
591 1 :
592 1 : if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
593 0 : return base.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
594 0 : }
595 :
596 : // Rotate the manifest if necessary. Rotating the manifest involves
597 : // creating a new file and writing an initial version edit containing a
598 : // snapshot of the current version. This initial version edit will
599 : // reflect the Version prior to the pending version edit (`ve`). Once
600 : // we've created the new manifest with the previous version state, we'll
601 : // append the version edit `ve` to the tail of the new manifest.
602 1 : if newManifestFileNum != 0 {
603 1 : if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum, newManifestVirtualBackings); err != nil {
604 0 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
605 0 : JobID: int(vu.JobID),
606 0 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
607 0 : FileNum: newManifestFileNum,
608 0 : Err: err,
609 0 : })
610 0 : return errors.Wrap(err, "MANIFEST create failed")
611 0 : }
612 : }
613 :
614 : // Call ApplyAndUpdateVersionEdit before accumulating the version edit.
615 : // If any blob files are no longer referenced, the version edit will be
616 : // updated to explicitly record the deletion of the blob files. This can
617 : // happen here because vs.blobFiles is protected by the manifest logLock
618 : // (NOT vs.mu). We only read or write vs.blobFiles while holding the
619 : // manifest lock.
620 1 : if err := vs.blobFiles.ApplyAndUpdateVersionEdit(ve); err != nil {
621 0 : return errors.Wrap(err, "MANIFEST blob files apply and update failed")
622 0 : }
623 :
624 1 : var bulkEdit bulkVersionEdit
625 1 : err := bulkEdit.Accumulate(ve)
626 1 : if err != nil {
627 0 : return errors.Wrap(err, "MANIFEST accumulate failed")
628 0 : }
629 1 : newVersion, err = bulkEdit.Apply(currentVersion, vs.opts.Experimental.ReadCompactionRate)
630 1 : if err != nil {
631 0 : return errors.Wrap(err, "MANIFEST apply failed")
632 0 : }
633 1 : l0Update = vs.l0Organizer.PrepareUpdate(&bulkEdit, newVersion)
634 1 :
635 1 : w, err := vs.manifest.Next()
636 1 : if err != nil {
637 0 : return errors.Wrap(err, "MANIFEST next record write failed")
638 0 : }
639 :
640 : // NB: Any error from this point on is considered fatal as we don't know if
641 : // the MANIFEST write occurred or not. Trying to determine that is
642 : // fraught. Instead we rely on the standard recovery mechanism run when a
643 : // database is open. In particular, that mechanism generates a new MANIFEST
644 : // and ensures it is synced.
645 1 : if err := ve.Encode(w); err != nil {
646 0 : return errors.Wrap(err, "MANIFEST write failed")
647 0 : }
648 1 : if err := vs.manifest.Flush(); err != nil {
649 0 : return errors.Wrap(err, "MANIFEST flush failed")
650 0 : }
651 1 : if err := vs.manifestFile.Sync(); err != nil {
652 0 : return errors.Wrap(err, "MANIFEST sync failed")
653 0 : }
654 1 : if newManifestFileNum != 0 {
655 1 : // NB: Move() is responsible for syncing the data directory.
656 1 : if err := vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, newManifestFileNum)); err != nil {
657 0 : return errors.Wrap(err, "MANIFEST set current failed")
658 0 : }
659 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
660 1 : JobID: int(vu.JobID),
661 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
662 1 : FileNum: newManifestFileNum,
663 1 : })
664 : }
665 1 : return nil
666 0 : }(); err != nil {
667 0 : // Any error encountered during any of the operations in the previous
668 0 : // closure are considered fatal. Treating such errors as fatal is preferred
669 0 : // to attempting to unwind various file and b-tree reference counts, and
670 0 : // re-generating L0 sublevel metadata. This may change in the future, if
671 0 : // certain manifest / WAL operations become retryable. For more context, see
672 0 : // #1159 and #1792.
673 0 : vs.opts.Logger.Fatalf("%s", err)
674 0 : return err
675 0 : }
676 :
677 1 : if requireRotation {
678 1 : // Successfully rotated.
679 1 : vs.rotationHelper.Rotate(nextSnapshotFilecount)
680 1 : }
681 : // Now that DB.mu is held again, initialize compacting file info in
682 : // L0Sublevels.
683 1 : inProgress := vu.InProgressCompactionsFn()
684 1 :
685 1 : zombieBlobs, localBlobLiveDelta := getZombieBlobFilesAndComputeLocalMetrics(ve, vs.provider)
686 1 : vs.l0Organizer.PerformUpdate(l0Update, newVersion)
687 1 : vs.l0Organizer.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
688 1 :
689 1 : // Update the zombie objects sets first, as installation of the new version
690 1 : // will unref the previous version which could result in addObsoleteLocked
691 1 : // being called.
692 1 : for _, b := range zombieBackings {
693 1 : vs.zombieTables.Add(objectInfo{
694 1 : fileInfo: fileInfo{
695 1 : FileNum: b.backing.DiskFileNum,
696 1 : FileSize: b.backing.Size,
697 1 : },
698 1 : isLocal: b.isLocal,
699 1 : })
700 1 : }
701 1 : for _, zb := range zombieBlobs {
702 0 : vs.zombieBlobs.Add(zb)
703 0 : }
704 : // Unref the removed backings and report those that already became obsolete.
705 : // Note that the only case where we report obsolete tables here is when
706 : // VirtualBackings.Protect/Unprotect was used to keep a backing alive without
707 : // it being used in the current version.
708 1 : var obsoleteVirtualBackings manifest.ObsoleteFiles
709 1 : for _, b := range removedVirtualBackings {
710 1 : if b.backing.Unref() == 0 {
711 1 : obsoleteVirtualBackings.FileBackings = append(obsoleteVirtualBackings.FileBackings, b.backing)
712 1 : }
713 : }
714 1 : vs.addObsoleteLocked(obsoleteVirtualBackings)
715 1 :
716 1 : // Install the new version.
717 1 : vs.append(newVersion)
718 1 :
719 1 : if ve.MinUnflushedLogNum != 0 {
720 1 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
721 1 : }
722 1 : if newManifestFileNum != 0 {
723 1 : if vs.manifestFileNum != 0 {
724 1 : vs.obsoleteManifests = append(vs.obsoleteManifests, obsoleteFile{
725 1 : fileType: base.FileTypeManifest,
726 1 : fs: vs.fs,
727 1 : path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
728 1 : fileNum: vs.manifestFileNum,
729 1 : fileSize: prevManifestFileSize,
730 1 : isLocal: true,
731 1 : })
732 1 : }
733 1 : vs.manifestFileNum = newManifestFileNum
734 : }
735 :
736 1 : vs.metrics.updateLevelMetrics(vu.Metrics)
737 1 : for i := range vs.metrics.Levels {
738 1 : l := &vs.metrics.Levels[i]
739 1 : l.TablesCount = int64(newVersion.Levels[i].Len())
740 1 : l.VirtualTablesCount = newVersion.Levels[i].NumVirtual
741 1 : l.VirtualTablesSize = newVersion.Levels[i].VirtualSize
742 1 : l.TablesSize = int64(newVersion.Levels[i].Size())
743 1 :
744 1 : l.Sublevels = 0
745 1 : if l.TablesCount > 0 {
746 1 : l.Sublevels = 1
747 1 : }
748 1 : if invariants.Enabled {
749 1 : levelFiles := newVersion.Levels[i].Slice()
750 1 : if size := int64(levelFiles.SizeSum()); l.TablesSize != size {
751 0 : vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.TablesSize, size)
752 0 : }
753 1 : if nVirtual := levelFiles.NumVirtual(); nVirtual != l.VirtualTablesCount {
754 0 : vs.opts.Logger.Fatalf(
755 0 : "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
756 0 : i, l.VirtualTablesCount, nVirtual,
757 0 : )
758 0 : }
759 1 : if vSize := levelFiles.VirtualSizeSum(); vSize != l.VirtualTablesSize {
760 0 : vs.opts.Logger.Fatalf(
761 0 : "versionSet metrics L%d Virtual size = %d, actual size = %d",
762 0 : i, l.VirtualTablesSize, vSize,
763 0 : )
764 0 : }
765 : }
766 : }
767 1 : vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
768 1 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localTablesLiveDelta.size)
769 1 : vs.metrics.Table.Local.LiveCount = uint64(int64(vs.metrics.Table.Local.LiveCount) + localTablesLiveDelta.count)
770 1 : vs.metrics.BlobFiles.Local.LiveSize = uint64(int64(vs.metrics.BlobFiles.Local.LiveSize) + localBlobLiveDelta.size)
771 1 : vs.metrics.BlobFiles.Local.LiveCount = uint64(int64(vs.metrics.BlobFiles.Local.LiveCount) + localBlobLiveDelta.count)
772 1 :
773 1 : vs.setCompactionPicker(
774 1 : newCompactionPickerByScore(newVersion, vs.l0Organizer, &vs.virtualBackings, vs.opts, inProgress))
775 1 : if !vs.dynamicBaseLevel {
776 0 : vs.picker.forceBaseLevel1()
777 0 : }
778 1 : return nil
779 : }
780 :
781 1 : func (vs *versionSet) setCompactionPicker(picker *compactionPickerByScore) {
782 1 : vs.picker = picker
783 1 : vs.curCompactionConcurrency.Store(int32(picker.getCompactionConcurrency()))
784 1 : }
785 :
786 : type fileBackingInfo struct {
787 : backing *fileBacking
788 : isLocal bool
789 : }
790 :
791 : type fileMetricDelta struct {
792 : count int64
793 : size int64
794 : }
795 :
796 : // getZombieTablesAndUpdateVirtualBackings updates the virtual backings with the
797 : // changes in the versionEdit and populates ve.RemovedBackingTables.
798 : // Returns:
799 : // - zombieBackings: all backings (physical and virtual) that will no longer be
800 : // needed when we apply ve.
801 : // - removedVirtualBackings: the virtual backings that will be removed by the
802 : // VersionEdit and which must be Unref()ed by the caller. These backings
803 : // match ve.RemovedBackingTables.
804 : // - localLiveSizeDelta: the delta in local live bytes.
805 : func getZombieTablesAndUpdateVirtualBackings(
806 : ve *versionEdit, virtualBackings *manifest.VirtualBackings, provider objstorage.Provider,
807 1 : ) (zombieBackings, removedVirtualBackings []fileBackingInfo, localLiveDelta fileMetricDelta) {
808 1 : // First, deal with the physical tables.
809 1 : //
810 1 : // A physical backing has become unused if it is in DeletedFiles but not in
811 1 : // NewFiles or CreatedBackingTables.
812 1 : //
813 1 : // Note that for the common case where there are very few elements, the map
814 1 : // will stay on the stack.
815 1 : stillUsed := make(map[base.DiskFileNum]struct{})
816 1 : for _, nf := range ve.NewTables {
817 1 : if !nf.Meta.Virtual {
818 1 : stillUsed[nf.Meta.FileBacking.DiskFileNum] = struct{}{}
819 1 : isLocal, localFileDelta := sizeIfLocal(nf.Meta.FileBacking, provider)
820 1 : localLiveDelta.size += localFileDelta
821 1 : if isLocal {
822 1 : localLiveDelta.count++
823 1 : }
824 : }
825 : }
826 1 : for _, b := range ve.CreatedBackingTables {
827 1 : stillUsed[b.DiskFileNum] = struct{}{}
828 1 : }
829 1 : for _, m := range ve.DeletedTables {
830 1 : if !m.Virtual {
831 1 : // NB: this deleted file may also be in NewFiles or
832 1 : // CreatedBackingTables, due to a file moving between levels, or
833 1 : // becoming virtualized. In which case there is no change due to this
834 1 : // file in the localLiveSizeDelta -- the subtraction below compensates
835 1 : // for the addition.
836 1 : isLocal, localFileDelta := sizeIfLocal(m.FileBacking, provider)
837 1 : localLiveDelta.size -= localFileDelta
838 1 : if isLocal {
839 1 : localLiveDelta.count--
840 1 : }
841 1 : if _, ok := stillUsed[m.FileBacking.DiskFileNum]; !ok {
842 1 : zombieBackings = append(zombieBackings, fileBackingInfo{
843 1 : backing: m.FileBacking,
844 1 : isLocal: isLocal,
845 1 : })
846 1 : }
847 : }
848 : }
849 :
850 : // Now deal with virtual tables.
851 : //
852 : // When a virtual table moves between levels we AddTable() then RemoveTable(),
853 : // which works out.
854 1 : for _, b := range ve.CreatedBackingTables {
855 1 : virtualBackings.AddAndRef(b)
856 1 : isLocal, localFileDelta := sizeIfLocal(b, provider)
857 1 : localLiveDelta.size += localFileDelta
858 1 : if isLocal {
859 1 : localLiveDelta.count++
860 1 : }
861 : }
862 1 : for _, nf := range ve.NewTables {
863 1 : if nf.Meta.Virtual {
864 1 : virtualBackings.AddTable(nf.Meta)
865 1 : }
866 : }
867 1 : for _, m := range ve.DeletedTables {
868 1 : if m.Virtual {
869 1 : virtualBackings.RemoveTable(m)
870 1 : }
871 : }
872 :
873 1 : if unused := virtualBackings.Unused(); len(unused) > 0 {
874 1 : // Virtual backings that are no longer used are zombies and are also added
875 1 : // to RemovedBackingTables (before the version edit is written to disk).
876 1 : ve.RemovedBackingTables = make([]base.DiskFileNum, len(unused))
877 1 : for i, b := range unused {
878 1 : isLocal, localFileDelta := sizeIfLocal(b, provider)
879 1 : localLiveDelta.size -= localFileDelta
880 1 : if isLocal {
881 1 : localLiveDelta.count--
882 1 : }
883 1 : ve.RemovedBackingTables[i] = b.DiskFileNum
884 1 : zombieBackings = append(zombieBackings, fileBackingInfo{
885 1 : backing: b,
886 1 : isLocal: isLocal,
887 1 : })
888 1 : virtualBackings.Remove(b.DiskFileNum)
889 : }
890 1 : removedVirtualBackings = zombieBackings[len(zombieBackings)-len(unused):]
891 : }
892 1 : return zombieBackings, removedVirtualBackings, localLiveDelta
893 : }
894 :
895 : // getZombieBlobFilesAndComputeLocalMetrics constructs objectInfos for all
896 : // zombie blob files, and computes the metric deltas for live files overall and
897 : // locally.
898 : func getZombieBlobFilesAndComputeLocalMetrics(
899 : ve *versionEdit, provider objstorage.Provider,
900 1 : ) (zombieBlobFiles []objectInfo, localLiveDelta fileMetricDelta) {
901 1 : for _, b := range ve.NewBlobFiles {
902 0 : if objstorage.IsLocalBlobFile(provider, b.FileNum) {
903 0 : localLiveDelta.count++
904 0 : localLiveDelta.size += int64(b.Size)
905 0 : }
906 : }
907 1 : zombieBlobFiles = make([]objectInfo, 0, len(ve.DeletedBlobFiles))
908 1 : for _, b := range ve.DeletedBlobFiles {
909 0 : isLocal := objstorage.IsLocalBlobFile(provider, b.FileNum)
910 0 : if isLocal {
911 0 : localLiveDelta.count--
912 0 : localLiveDelta.size -= int64(b.Size)
913 0 : }
914 0 : zombieBlobFiles = append(zombieBlobFiles, objectInfo{
915 0 : fileInfo: fileInfo{
916 0 : FileNum: b.FileNum,
917 0 : FileSize: b.Size,
918 0 : },
919 0 : isLocal: isLocal,
920 0 : })
921 : }
922 1 : return zombieBlobFiles, localLiveDelta
923 : }
924 :
925 : // sizeIfLocal returns backing.Size if the backing is a local file, else 0.
926 : func sizeIfLocal(
927 : backing *fileBacking, provider objstorage.Provider,
928 1 : ) (isLocal bool, localSize int64) {
929 1 : isLocal = objstorage.IsLocalTable(provider, backing.DiskFileNum)
930 1 : if isLocal {
931 1 : return true, int64(backing.Size)
932 1 : }
933 1 : return false, 0
934 : }
935 :
936 : func (vs *versionSet) incrementCompactions(
937 : kind compactionKind, extraLevels []*compactionLevel, pickerMetrics compactionPickerMetrics,
938 1 : ) {
939 1 : switch kind {
940 1 : case compactionKindDefault:
941 1 : vs.metrics.Compact.Count++
942 1 : vs.metrics.Compact.DefaultCount++
943 :
944 1 : case compactionKindFlush, compactionKindIngestedFlushable:
945 1 : vs.metrics.Flush.Count++
946 :
947 1 : case compactionKindMove:
948 1 : vs.metrics.Compact.Count++
949 1 : vs.metrics.Compact.MoveCount++
950 :
951 1 : case compactionKindDeleteOnly:
952 1 : vs.metrics.Compact.Count++
953 1 : vs.metrics.Compact.DeleteOnlyCount++
954 :
955 1 : case compactionKindElisionOnly:
956 1 : vs.metrics.Compact.Count++
957 1 : vs.metrics.Compact.ElisionOnlyCount++
958 :
959 0 : case compactionKindRead:
960 0 : vs.metrics.Compact.Count++
961 0 : vs.metrics.Compact.ReadCount++
962 :
963 1 : case compactionKindTombstoneDensity:
964 1 : vs.metrics.Compact.Count++
965 1 : vs.metrics.Compact.TombstoneDensityCount++
966 :
967 1 : case compactionKindRewrite:
968 1 : vs.metrics.Compact.Count++
969 1 : vs.metrics.Compact.RewriteCount++
970 :
971 1 : case compactionKindCopy:
972 1 : vs.metrics.Compact.Count++
973 1 : vs.metrics.Compact.CopyCount++
974 :
975 0 : default:
976 0 : if invariants.Enabled {
977 0 : panic("unhandled compaction kind")
978 : }
979 : }
980 1 : if len(extraLevels) > 0 {
981 1 : vs.metrics.Compact.MultiLevelCount++
982 1 : }
983 : }
984 :
985 1 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
986 1 : vs.atomicInProgressBytes.Add(numBytes)
987 1 : }
988 :
989 : // createManifest creates a manifest file that contains a snapshot of vs.
990 : func (vs *versionSet) createManifest(
991 : dirname string,
992 : fileNum, minUnflushedLogNum base.DiskFileNum,
993 : nextFileNum uint64,
994 : virtualBackings []*fileBacking,
995 1 : ) (err error) {
996 1 : var (
997 1 : filename = base.MakeFilepath(vs.fs, dirname, base.FileTypeManifest, fileNum)
998 1 : manifestFile vfs.File
999 1 : manifestWriter *record.Writer
1000 1 : )
1001 1 : defer func() {
1002 1 : if manifestWriter != nil {
1003 0 : _ = manifestWriter.Close()
1004 0 : }
1005 1 : if manifestFile != nil {
1006 0 : _ = manifestFile.Close()
1007 0 : }
1008 1 : if err != nil {
1009 0 : _ = vs.fs.Remove(filename)
1010 0 : }
1011 : }()
1012 1 : manifestFile, err = vs.fs.Create(filename, "pebble-manifest")
1013 1 : if err != nil {
1014 0 : return err
1015 0 : }
1016 1 : manifestWriter = record.NewWriter(manifestFile)
1017 1 :
1018 1 : snapshot := manifest.VersionEdit{
1019 1 : ComparerName: vs.cmp.Name,
1020 1 : // When creating a version snapshot for an existing DB, this snapshot
1021 1 : // VersionEdit will be immediately followed by another VersionEdit
1022 1 : // (being written in UpdateVersionLocked()). That VersionEdit always
1023 1 : // contains a LastSeqNum, so we don't need to include that in the
1024 1 : // snapshot. But it does not necessarily include MinUnflushedLogNum,
1025 1 : // NextFileNum, so we initialize those using the corresponding fields in
1026 1 : // the versionSet (which came from the latest preceding VersionEdit that
1027 1 : // had those fields).
1028 1 : MinUnflushedLogNum: minUnflushedLogNum,
1029 1 : NextFileNum: nextFileNum,
1030 1 : CreatedBackingTables: virtualBackings,
1031 1 : NewBlobFiles: vs.blobFiles.Metadatas(),
1032 1 : }
1033 1 : // Add all extant sstables in the current version.
1034 1 : for level, levelMetadata := range vs.currentVersion().Levels {
1035 1 : for meta := range levelMetadata.All() {
1036 1 : snapshot.NewTables = append(snapshot.NewTables, newTableEntry{
1037 1 : Level: level,
1038 1 : Meta: meta,
1039 1 : })
1040 1 : }
1041 : }
1042 :
1043 1 : w, err1 := manifestWriter.Next()
1044 1 : if err1 != nil {
1045 0 : return err1
1046 0 : }
1047 1 : if err := snapshot.Encode(w); err != nil {
1048 0 : return err
1049 0 : }
1050 :
1051 1 : if vs.manifest != nil {
1052 1 : if err := vs.manifest.Close(); err != nil {
1053 0 : return err
1054 0 : }
1055 1 : vs.manifest = nil
1056 : }
1057 1 : if vs.manifestFile != nil {
1058 1 : if err := vs.manifestFile.Close(); err != nil {
1059 0 : return err
1060 0 : }
1061 1 : vs.manifestFile = nil
1062 : }
1063 :
1064 1 : vs.manifest, manifestWriter = manifestWriter, nil
1065 1 : vs.manifestFile, manifestFile = manifestFile, nil
1066 1 : return nil
1067 : }
1068 :
1069 : // NB: This method is not safe for concurrent use. It is only safe
1070 : // to be called when concurrent changes to nextFileNum are not expected.
1071 1 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
1072 1 : if vs.nextFileNum.Load() <= uint64(fileNum) {
1073 1 : vs.nextFileNum.Store(uint64(fileNum + 1))
1074 1 : }
1075 : }
1076 :
1077 : // getNextFileNum returns the next file number to be used.
1078 : //
1079 : // Can be called without the versionSet's mutex being held.
1080 1 : func (vs *versionSet) getNextFileNum() base.FileNum {
1081 1 : x := vs.nextFileNum.Add(1) - 1
1082 1 : return base.FileNum(x)
1083 1 : }
1084 :
1085 : // Can be called without the versionSet's mutex being held.
1086 1 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
1087 1 : x := vs.nextFileNum.Add(1) - 1
1088 1 : return base.DiskFileNum(x)
1089 1 : }
1090 :
1091 1 : func (vs *versionSet) append(v *version) {
1092 1 : if v.Refs() != 0 {
1093 0 : panic("pebble: version should be unreferenced")
1094 : }
1095 1 : if !vs.versions.Empty() {
1096 1 : vs.versions.Back().UnrefLocked()
1097 1 : }
1098 1 : v.Deleted = vs.obsoleteFn
1099 1 : v.Ref()
1100 1 : vs.versions.PushBack(v)
1101 1 : if invariants.Enabled {
1102 1 : // Verify that the virtualBackings contains all the backings referenced by
1103 1 : // the version.
1104 1 : for _, l := range v.Levels {
1105 1 : for f := range l.All() {
1106 1 : if f.Virtual {
1107 1 : if _, ok := vs.virtualBackings.Get(f.FileBacking.DiskFileNum); !ok {
1108 0 : panic(fmt.Sprintf("%s is not in virtualBackings", f.FileBacking.DiskFileNum))
1109 : }
1110 : }
1111 : }
1112 : }
1113 : }
1114 : }
1115 :
1116 1 : func (vs *versionSet) currentVersion() *version {
1117 1 : return vs.versions.Back()
1118 1 : }
1119 :
1120 1 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
1121 1 : current := vs.currentVersion()
1122 1 : for v := vs.versions.Front(); true; v = v.Next() {
1123 1 : for _, lm := range v.Levels {
1124 1 : for f := range lm.All() {
1125 1 : m[f.FileBacking.DiskFileNum] = struct{}{}
1126 1 : for _, ref := range f.BlobReferences {
1127 0 : m[ref.FileNum] = struct{}{}
1128 0 : }
1129 : }
1130 : }
1131 1 : if v == current {
1132 1 : break
1133 : }
1134 : }
1135 : // virtualBackings contains backings that are referenced by some virtual
1136 : // tables in the latest version (which are handled above), and backings that
1137 : // are not but are still alive because of the protection mechanism (see
1138 : // manifset.VirtualBackings). This loop ensures the latter get added to the
1139 : // map.
1140 1 : vs.virtualBackings.ForEach(func(b *fileBacking) {
1141 1 : m[b.DiskFileNum] = struct{}{}
1142 1 : })
1143 : }
1144 :
1145 : // addObsoleteLocked will add the fileInfo associated with obsolete backing
1146 : // sstables to the obsolete tables list.
1147 : //
1148 : // The file backings in the obsolete list must not appear more than once.
1149 : //
1150 : // DB.mu must be held when addObsoleteLocked is called.
1151 1 : func (vs *versionSet) addObsoleteLocked(obsolete manifest.ObsoleteFiles) {
1152 1 : if obsolete.Count() == 0 {
1153 1 : return
1154 1 : }
1155 :
1156 : // Note that the zombie objects transition from zombie *to* obsolete, and
1157 : // will no longer be considered zombie.
1158 :
1159 1 : newlyObsoleteTables := make([]objectInfo, len(obsolete.FileBackings))
1160 1 : for i, bs := range obsolete.FileBackings {
1161 1 : newlyObsoleteTables[i] = vs.zombieTables.Extract(bs.DiskFileNum)
1162 1 : }
1163 1 : vs.obsoleteTables = mergeObjectInfos(vs.obsoleteTables, newlyObsoleteTables)
1164 1 :
1165 1 : newlyObsoleteBlobFiles := make([]objectInfo, len(obsolete.BlobFiles))
1166 1 : for i, bf := range obsolete.BlobFiles {
1167 0 : newlyObsoleteBlobFiles[i] = vs.zombieBlobs.Extract(bf.FileNum)
1168 0 : }
1169 1 : vs.obsoleteBlobs = mergeObjectInfos(vs.obsoleteBlobs, newlyObsoleteBlobFiles)
1170 1 : vs.updateObsoleteObjectMetricsLocked()
1171 : }
1172 :
1173 : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
1174 : // called.
1175 1 : func (vs *versionSet) addObsolete(obsolete manifest.ObsoleteFiles) {
1176 1 : vs.mu.Lock()
1177 1 : defer vs.mu.Unlock()
1178 1 : vs.addObsoleteLocked(obsolete)
1179 1 : }
1180 :
1181 1 : func (vs *versionSet) updateObsoleteObjectMetricsLocked() {
1182 1 : vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
1183 1 : vs.metrics.Table.ObsoleteSize = 0
1184 1 : vs.metrics.Table.Local.ObsoleteSize = 0
1185 1 : vs.metrics.Table.Local.ObsoleteCount = 0
1186 1 : for _, fi := range vs.obsoleteTables {
1187 1 : vs.metrics.Table.ObsoleteSize += fi.FileSize
1188 1 : if fi.isLocal {
1189 1 : vs.metrics.Table.Local.ObsoleteSize += fi.FileSize
1190 1 : vs.metrics.Table.Local.ObsoleteCount++
1191 1 : }
1192 : }
1193 1 : vs.metrics.BlobFiles.ObsoleteCount = uint64(len(vs.obsoleteBlobs))
1194 1 : vs.metrics.BlobFiles.ObsoleteSize = 0
1195 1 : vs.metrics.BlobFiles.Local.ObsoleteSize = 0
1196 1 : vs.metrics.BlobFiles.Local.ObsoleteCount = 0
1197 1 : for _, fi := range vs.obsoleteBlobs {
1198 0 : vs.metrics.BlobFiles.ObsoleteSize += fi.FileSize
1199 0 : if fi.isLocal {
1200 0 : vs.metrics.BlobFiles.Local.ObsoleteSize += fi.FileSize
1201 0 : vs.metrics.BlobFiles.Local.ObsoleteCount++
1202 0 : }
1203 : }
1204 : }
1205 :
1206 : func findCurrentManifest(
1207 : fs vfs.FS, dirname string, ls []string,
1208 1 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
1209 1 : // Locating a marker should succeed even if the marker has never been placed.
1210 1 : var filename string
1211 1 : marker, filename, err = atomicfs.LocateMarkerInListing(fs, dirname, manifestMarkerName, ls)
1212 1 : if err != nil {
1213 0 : return nil, 0, false, err
1214 0 : }
1215 :
1216 1 : if filename == "" {
1217 1 : // The marker hasn't been set yet. This database doesn't exist.
1218 1 : return marker, 0, false, nil
1219 1 : }
1220 :
1221 1 : var ok bool
1222 1 : _, manifestNum, ok = base.ParseFilename(fs, filename)
1223 1 : if !ok {
1224 0 : return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
1225 0 : }
1226 1 : return marker, manifestNum, true, nil
1227 : }
1228 :
1229 0 : func newFileMetrics(newFiles []manifest.NewTableEntry) levelMetricsDelta {
1230 0 : var m levelMetricsDelta
1231 0 : for _, nf := range newFiles {
1232 0 : lm := m[nf.Level]
1233 0 : if lm == nil {
1234 0 : lm = &LevelMetrics{}
1235 0 : m[nf.Level] = lm
1236 0 : }
1237 0 : lm.TablesCount++
1238 0 : lm.TablesSize += int64(nf.Meta.Size)
1239 : }
1240 0 : return m
1241 : }
|