Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "sync"
11 : "sync/atomic"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/record"
19 : "github.com/cockroachdb/pebble/vfs"
20 : "github.com/cockroachdb/pebble/vfs/atomicfs"
21 : )
22 :
23 : const numLevels = manifest.NumLevels
24 :
25 : const manifestMarkerName = `manifest`
26 :
27 : // Provide type aliases for the various manifest structs.
28 : type bulkVersionEdit = manifest.BulkVersionEdit
29 : type deletedFileEntry = manifest.DeletedTableEntry
30 : type tableMetadata = manifest.TableMetadata
31 : type physicalMeta = manifest.PhysicalTableMeta
32 : type virtualMeta = manifest.VirtualTableMeta
33 : type fileBacking = manifest.FileBacking
34 : type newTableEntry = manifest.NewTableEntry
35 : type version = manifest.Version
36 : type versionEdit = manifest.VersionEdit
37 : type versionList = manifest.VersionList
38 :
39 : // versionSet manages a collection of immutable versions, and manages the
40 : // creation of a new version from the most recent version. A new version is
41 : // created from an existing version by applying a version edit which is just
42 : // like it sounds: a delta from the previous version. Version edits are logged
43 : // to the MANIFEST file, which is replayed at startup.
44 : type versionSet struct {
45 : // Next seqNum to use for WAL writes.
46 : logSeqNum base.AtomicSeqNum
47 :
48 : // The upper bound on sequence numbers that have been assigned so far. A
49 : // suffix of these sequence numbers may not have been written to a WAL. Both
50 : // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
51 : // visibleSeqNum is <= logSeqNum.
52 : visibleSeqNum base.AtomicSeqNum
53 :
54 : // Number of bytes present in sstables being written by in-progress
55 : // compactions. This value will be zero if there are no in-progress
56 : // compactions. Updated and read atomically.
57 : atomicInProgressBytes atomic.Int64
58 :
59 : // Immutable fields.
60 : dirname string
61 : provider objstorage.Provider
62 : // Set to DB.mu.
63 : mu *sync.Mutex
64 : opts *Options
65 : fs vfs.FS
66 : cmp *base.Comparer
67 : // Dynamic base level allows the dynamic base level computation to be
68 : // disabled. Used by tests which want to create specific LSM structures.
69 : dynamicBaseLevel bool
70 :
71 : // Mutable fields.
72 : versions versionList
73 : picker compactionPicker
74 : // curCompactionConcurrency is updated whenever picker is updated.
75 : // INVARIANT: >= 1.
76 : curCompactionConcurrency atomic.Int32
77 :
78 : // Not all metrics are kept here. See DB.Metrics().
79 : metrics Metrics
80 :
81 : // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
82 : // on the creation of every version.
83 : obsoleteFn func(manifest.ObsoleteFiles)
84 : obsoleteTables []objectInfo
85 : obsoleteBlobs []objectInfo
86 : obsoleteManifests []fileInfo
87 : obsoleteOptions []fileInfo
88 :
89 : // Zombie tables which have been removed from the current version but are
90 : // still referenced by an inuse iterator.
91 : zombieTables map[base.DiskFileNum]objectInfo
92 :
93 : // virtualBackings contains information about the FileBackings which support
94 : // virtual sstables in the latest version. It is mainly used to determine when
95 : // a backing is no longer in use by the tables in the latest version; this is
96 : // not a trivial problem because compactions and other operations can happen
97 : // in parallel (and they can finish in unpredictable order).
98 : //
99 : // This mechanism is complementary to the backing Ref/Unref mechanism, which
100 : // determines when a backing is no longer used by *any* live version and can
101 : // be removed.
102 : //
103 : // In principle this should have been a copy-on-write structure, with each
104 : // Version having its own record of its virtual backings (similar to the
105 : // B-tree that holds the tables). However, in practice we only need it for the
106 : // latest version, so we use a simpler structure and store it in the
107 : // versionSet instead.
108 : //
109 : // virtualBackings is modified under DB.mu and the log lock. If it is accessed
110 : // under DB.mu and a version update is in progress, it reflects the state of
111 : // the next version.
112 : virtualBackings manifest.VirtualBackings
113 :
114 : // minUnflushedLogNum is the smallest WAL log file number corresponding to
115 : // mutations that have not been flushed to an sstable.
116 : minUnflushedLogNum base.DiskFileNum
117 :
118 : // The next file number. A single counter is used to assign file
119 : // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
120 : nextFileNum atomic.Uint64
121 :
122 : // The current manifest file number.
123 : manifestFileNum base.DiskFileNum
124 : manifestMarker *atomicfs.Marker
125 :
126 : manifestFile vfs.File
127 : manifest *record.Writer
128 : getFormatMajorVersion func() FormatMajorVersion
129 :
130 : writing bool
131 : writerCond sync.Cond
132 : // State for deciding when to write a snapshot. Protected by mu.
133 : rotationHelper record.RotationHelper
134 :
135 : pickedCompactionCache pickedCompactionCache
136 : }
137 :
138 : // objectInfo describes an object in object storage (either a sstable or a blob
139 : // file).
140 : type objectInfo struct {
141 : fileInfo
142 : isLocal bool
143 : }
144 :
145 : func (vs *versionSet) init(
146 : dirname string,
147 : provider objstorage.Provider,
148 : opts *Options,
149 : marker *atomicfs.Marker,
150 : getFMV func() FormatMajorVersion,
151 : mu *sync.Mutex,
152 1 : ) {
153 1 : vs.dirname = dirname
154 1 : vs.provider = provider
155 1 : vs.mu = mu
156 1 : vs.writerCond.L = mu
157 1 : vs.opts = opts
158 1 : vs.fs = opts.FS
159 1 : vs.cmp = opts.Comparer
160 1 : vs.dynamicBaseLevel = true
161 1 : vs.versions.Init(mu)
162 1 : vs.obsoleteFn = vs.addObsoleteLocked
163 1 : vs.zombieTables = make(map[base.DiskFileNum]objectInfo)
164 1 : vs.virtualBackings = manifest.MakeVirtualBackings()
165 1 : vs.nextFileNum.Store(1)
166 1 : vs.manifestMarker = marker
167 1 : vs.getFormatMajorVersion = getFMV
168 1 : }
169 :
170 : // create creates a version set for a fresh DB.
171 : func (vs *versionSet) create(
172 : jobID JobID,
173 : dirname string,
174 : provider objstorage.Provider,
175 : opts *Options,
176 : marker *atomicfs.Marker,
177 : getFormatMajorVersion func() FormatMajorVersion,
178 : mu *sync.Mutex,
179 1 : ) error {
180 1 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
181 1 : var bve bulkVersionEdit
182 1 : newVersion, err := bve.Apply(nil /* curr */, opts.Comparer, opts.FlushSplitBytes, opts.Experimental.ReadCompactionRate)
183 1 : if err != nil {
184 0 : return err
185 0 : }
186 1 : vs.append(newVersion)
187 1 :
188 1 : vs.setCompactionPicker(
189 1 : newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil))
190 1 : // Note that a "snapshot" version edit is written to the manifest when it is
191 1 : // created.
192 1 : vs.manifestFileNum = vs.getNextDiskFileNum()
193 1 : err = vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum.Load(), nil /* virtualBackings */)
194 1 : if err == nil {
195 1 : if err = vs.manifest.Flush(); err != nil {
196 1 : vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
197 1 : }
198 : }
199 1 : if err == nil {
200 1 : if err = vs.manifestFile.Sync(); err != nil {
201 1 : vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
202 1 : }
203 : }
204 1 : if err == nil {
205 1 : // NB: Move() is responsible for syncing the data directory.
206 1 : if err = vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, vs.manifestFileNum)); err != nil {
207 1 : vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
208 1 : }
209 : }
210 :
211 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
212 1 : JobID: int(jobID),
213 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
214 1 : FileNum: vs.manifestFileNum,
215 1 : Err: err,
216 1 : })
217 1 : if err != nil {
218 1 : return err
219 1 : }
220 1 : return nil
221 : }
222 :
223 : // load loads the version set from the manifest file.
224 : func (vs *versionSet) load(
225 : dirname string,
226 : provider objstorage.Provider,
227 : opts *Options,
228 : manifestFileNum base.DiskFileNum,
229 : marker *atomicfs.Marker,
230 : getFormatMajorVersion func() FormatMajorVersion,
231 : mu *sync.Mutex,
232 1 : ) error {
233 1 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
234 1 :
235 1 : vs.manifestFileNum = manifestFileNum
236 1 : manifestPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeManifest, vs.manifestFileNum)
237 1 : manifestFilename := opts.FS.PathBase(manifestPath)
238 1 :
239 1 : // Read the versionEdits in the manifest file.
240 1 : var bve bulkVersionEdit
241 1 : bve.AddedTablesByFileNum = make(map[base.FileNum]*tableMetadata)
242 1 : manifest, err := vs.fs.Open(manifestPath)
243 1 : if err != nil {
244 0 : return errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
245 0 : errors.Safe(manifestFilename), dirname)
246 0 : }
247 1 : defer manifest.Close()
248 1 : rr := record.NewReader(manifest, 0 /* logNum */)
249 1 : for {
250 1 : r, err := rr.Next()
251 1 : if err == io.EOF || record.IsInvalidRecord(err) {
252 1 : break
253 : }
254 1 : if err != nil {
255 0 : return errors.Wrapf(err, "pebble: error when loading manifest file %q",
256 0 : errors.Safe(manifestFilename))
257 0 : }
258 1 : var ve versionEdit
259 1 : err = ve.Decode(r)
260 1 : if err != nil {
261 0 : // Break instead of returning an error if the record is corrupted
262 0 : // or invalid.
263 0 : if err == io.EOF || record.IsInvalidRecord(err) {
264 0 : break
265 : }
266 0 : return err
267 : }
268 1 : if ve.ComparerName != "" {
269 1 : if ve.ComparerName != vs.cmp.Name {
270 1 : return errors.Errorf("pebble: manifest file %q for DB %q: "+
271 1 : "comparer name from file %q != comparer name from Options %q",
272 1 : errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(vs.cmp.Name))
273 1 : }
274 : }
275 1 : if err := bve.Accumulate(&ve); err != nil {
276 0 : return err
277 0 : }
278 1 : if ve.MinUnflushedLogNum != 0 {
279 1 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
280 1 : }
281 1 : if ve.NextFileNum != 0 {
282 1 : vs.nextFileNum.Store(ve.NextFileNum)
283 1 : }
284 1 : if ve.LastSeqNum != 0 {
285 1 : // logSeqNum is the _next_ sequence number that will be assigned,
286 1 : // while LastSeqNum is the last assigned sequence number. Note that
287 1 : // this behaviour mimics that in RocksDB; the first sequence number
288 1 : // assigned is one greater than the one present in the manifest
289 1 : // (assuming no WALs contain higher sequence numbers than the
290 1 : // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
291 1 : // next sequence number that will be assigned.
292 1 : //
293 1 : // If LastSeqNum is less than SeqNumStart, increase it to at least
294 1 : // SeqNumStart to leave ample room for reserved sequence numbers.
295 1 : if ve.LastSeqNum+1 < base.SeqNumStart {
296 0 : vs.logSeqNum.Store(base.SeqNumStart)
297 1 : } else {
298 1 : vs.logSeqNum.Store(ve.LastSeqNum + 1)
299 1 : }
300 : }
301 : }
302 : // We have already set vs.nextFileNum = 2 at the beginning of the
303 : // function and could have only updated it to some other non-zero value,
304 : // so it cannot be 0 here.
305 1 : if vs.minUnflushedLogNum == 0 {
306 1 : if vs.nextFileNum.Load() >= 2 {
307 1 : // We either have a freshly created DB, or a DB created by RocksDB
308 1 : // that has not had a single flushed SSTable yet. This is because
309 1 : // RocksDB bumps up nextFileNum in this case without bumping up
310 1 : // minUnflushedLogNum, even if WALs with non-zero file numbers are
311 1 : // present in the directory.
312 1 : } else {
313 0 : return base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
314 0 : errors.Safe(manifestFilename), dirname)
315 0 : }
316 : }
317 1 : vs.markFileNumUsed(vs.minUnflushedLogNum)
318 1 :
319 1 : // Populate the fileBackingMap and the FileBacking for virtual sstables since
320 1 : // we have finished version edit accumulation.
321 1 : for _, b := range bve.AddedFileBacking {
322 1 : vs.virtualBackings.AddAndRef(b)
323 1 : }
324 :
325 1 : for _, addedLevel := range bve.AddedTables {
326 1 : for _, m := range addedLevel {
327 1 : if m.Virtual {
328 1 : vs.virtualBackings.AddTable(m)
329 1 : }
330 : }
331 : }
332 :
333 1 : if invariants.Enabled {
334 1 : // There should be no deleted tables or backings, since we're starting from
335 1 : // an empty state.
336 1 : for _, deletedLevel := range bve.DeletedTables {
337 1 : if len(deletedLevel) != 0 {
338 0 : panic("deleted files after manifest replay")
339 : }
340 : }
341 1 : if len(bve.RemovedFileBacking) > 0 {
342 0 : panic("deleted backings after manifest replay")
343 : }
344 : }
345 :
346 1 : newVersion, err := bve.Apply(nil, opts.Comparer, opts.FlushSplitBytes, opts.Experimental.ReadCompactionRate)
347 1 : if err != nil {
348 0 : return err
349 0 : }
350 1 : newVersion.L0Sublevels.InitCompactingFileInfo(nil /* in-progress compactions */)
351 1 : vs.append(newVersion)
352 1 :
353 1 : for i := range vs.metrics.Levels {
354 1 : l := &vs.metrics.Levels[i]
355 1 : l.NumFiles = int64(newVersion.Levels[i].Len())
356 1 : files := newVersion.Levels[i].Slice()
357 1 : l.Size = int64(files.SizeSum())
358 1 : }
359 1 : for _, l := range newVersion.Levels {
360 1 : iter := l.Iter()
361 1 : for f := iter.First(); f != nil; f = iter.Next() {
362 1 : if !f.Virtual {
363 1 : _, localSize := sizeIfLocal(f.FileBacking, vs.provider)
364 1 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
365 1 : }
366 : }
367 : }
368 1 : vs.virtualBackings.ForEach(func(backing *fileBacking) {
369 1 : _, localSize := sizeIfLocal(backing, vs.provider)
370 1 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
371 1 : })
372 :
373 1 : vs.setCompactionPicker(
374 1 : newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil))
375 1 : return nil
376 : }
377 :
378 1 : func (vs *versionSet) close() error {
379 1 : if vs.manifestFile != nil {
380 1 : if err := vs.manifestFile.Close(); err != nil {
381 0 : return err
382 0 : }
383 : }
384 1 : if vs.manifestMarker != nil {
385 1 : if err := vs.manifestMarker.Close(); err != nil {
386 0 : return err
387 0 : }
388 : }
389 1 : return nil
390 : }
391 :
392 : // logLock locks the manifest for writing. The lock must be released by either
393 : // a call to logUnlock or logAndApply.
394 : //
395 : // DB.mu must be held when calling this method, but the mutex may be dropped and
396 : // re-acquired during the course of this method.
397 1 : func (vs *versionSet) logLock() {
398 1 : // Wait for any existing writing to the manifest to complete, then mark the
399 1 : // manifest as busy.
400 1 : for vs.writing {
401 1 : // Note: writerCond.L is DB.mu, so we unlock it while we wait.
402 1 : vs.writerCond.Wait()
403 1 : }
404 1 : vs.writing = true
405 : }
406 :
407 : // logUnlock releases the lock for manifest writing.
408 : //
409 : // DB.mu must be held when calling this method.
410 1 : func (vs *versionSet) logUnlock() {
411 1 : if !vs.writing {
412 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
413 0 : }
414 1 : vs.writing = false
415 1 : vs.writerCond.Signal()
416 : }
417 :
418 1 : func (vs *versionSet) logUnlockAndInvalidatePickedCompactionCache() {
419 1 : vs.pickedCompactionCache.invalidate()
420 1 : vs.logUnlock()
421 1 : }
422 :
423 : // logAndApply logs the version edit to the manifest, applies the version edit
424 : // to the current version, and installs the new version.
425 : //
426 : // logAndApply fills in the following fields of the VersionEdit: NextFileNum,
427 : // LastSeqNum, RemovedBackingTables. The removed backing tables are those
428 : // backings that are no longer used (in the new version) after applying the edit
429 : // (as per vs.virtualBackings). Other than these fields, the VersionEdit must be
430 : // complete.
431 : //
432 : // New table backing references (FileBacking.Ref) are taken as part of applying
433 : // the version edit. The state of the virtual backings (vs.virtualBackings) is
434 : // updated before logging to the manifest and installing the latest version;
435 : // this is ok because any failure in those steps is fatal.
436 : // TODO(radu): remove the error return.
437 : //
438 : // DB.mu must be held when calling this method and will be released temporarily
439 : // while performing file I/O. Requires that the manifest is locked for writing
440 : // (see logLock). Will unconditionally release the manifest lock (via
441 : // logUnlock) even if an error occurs.
442 : //
443 : // inProgressCompactions is called while DB.mu is held, to get the list of
444 : // in-progress compactions.
445 : func (vs *versionSet) logAndApply(
446 : jobID JobID,
447 : ve *versionEdit,
448 : metrics map[int]*LevelMetrics,
449 : forceRotation bool,
450 : inProgressCompactions func() []compactionInfo,
451 1 : ) error {
452 1 : if !vs.writing {
453 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
454 0 : }
455 1 : defer vs.logUnlockAndInvalidatePickedCompactionCache()
456 1 :
457 1 : if ve.MinUnflushedLogNum != 0 {
458 1 : if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
459 1 : vs.nextFileNum.Load() <= uint64(ve.MinUnflushedLogNum) {
460 0 : panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
461 0 : ve.MinUnflushedLogNum))
462 : }
463 : }
464 :
465 : // This is the next manifest filenum, but if the current file is too big we
466 : // will write this ve to the next file which means what ve encodes is the
467 : // current filenum and not the next one.
468 : //
469 : // TODO(sbhola): figure out why this is correct and update comment.
470 1 : ve.NextFileNum = vs.nextFileNum.Load()
471 1 :
472 1 : // LastSeqNum is set to the current upper bound on the assigned sequence
473 1 : // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
474 1 : // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
475 1 : // replay. It must be higher than or equal to any than any sequence number
476 1 : // written to an sstable, including sequence numbers in ingested files.
477 1 : // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
478 1 : // number. This is fallout from ingestion which allows a sequence number X to
479 1 : // be assigned to an ingested sstable even though sequence number X-1 resides
480 1 : // in an unflushed memtable. logSeqNum is the _next_ sequence number that
481 1 : // will be assigned, so subtract that by 1 to get the upper bound on the
482 1 : // last assigned sequence number.
483 1 : logSeqNum := vs.logSeqNum.Load()
484 1 : ve.LastSeqNum = logSeqNum - 1
485 1 : if logSeqNum == 0 {
486 0 : // logSeqNum is initialized to 1 in Open() if there are no previous WAL
487 0 : // or manifest records, so this case should never happen.
488 0 : vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
489 0 : }
490 :
491 1 : currentVersion := vs.currentVersion()
492 1 : var newVersion *version
493 1 :
494 1 : // Generate a new manifest if we don't currently have one, or forceRotation
495 1 : // is true, or the current one is too large.
496 1 : //
497 1 : // For largeness, we do not exclusively use MaxManifestFileSize size
498 1 : // threshold since we have had incidents where due to either large keys or
499 1 : // large numbers of files, each edit results in a snapshot + write of the
500 1 : // edit. This slows the system down since each flush or compaction is
501 1 : // writing a new manifest snapshot. The primary goal of the size-based
502 1 : // rollover logic is to ensure that when reopening a DB, the number of edits
503 1 : // that need to be replayed on top of the snapshot is "sane". Rolling over
504 1 : // to a new manifest after each edit is not relevant to that goal.
505 1 : //
506 1 : // Consider the following cases:
507 1 : // - The number of live files F in the DB is roughly stable: after writing
508 1 : // the snapshot (with F files), say we require that there be enough edits
509 1 : // such that the cumulative number of files in those edits, E, be greater
510 1 : // than F. This will ensure that the total amount of time in logAndApply
511 1 : // that is spent in snapshot writing is ~50%.
512 1 : //
513 1 : // - The number of live files F in the DB is shrinking drastically, say from
514 1 : // F to F/10: This can happen for various reasons, like wide range
515 1 : // tombstones, or large numbers of smaller than usual files that are being
516 1 : // merged together into larger files. And say the new files generated
517 1 : // during this shrinkage is insignificant compared to F/10, and so for
518 1 : // this example we will assume it is effectively 0. After this shrinking,
519 1 : // E = 0.9F, and so if we used the previous snapshot file count, F, as the
520 1 : // threshold that needs to be exceeded, we will further delay the snapshot
521 1 : // writing. Which means on DB reopen we will need to replay 0.9F edits to
522 1 : // get to a version with 0.1F files. It would be better to create a new
523 1 : // snapshot when E exceeds the number of files in the current version.
524 1 : //
525 1 : // - The number of live files F in the DB is growing via perfect ingests
526 1 : // into L6: Say we wrote the snapshot when there were F files and now we
527 1 : // have 10F files, so E = 9F. We will further delay writing a new
528 1 : // snapshot. This case can be critiqued as contrived, but we consider it
529 1 : // nonetheless.
530 1 : //
531 1 : // The logic below uses the min of the last snapshot file count and the file
532 1 : // count in the current version.
533 1 : vs.rotationHelper.AddRecord(int64(len(ve.DeletedTables) + len(ve.NewTables)))
534 1 : sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
535 1 : requireRotation := forceRotation || vs.manifest == nil
536 1 :
537 1 : var nextSnapshotFilecount int64
538 1 : for i := range vs.metrics.Levels {
539 1 : nextSnapshotFilecount += vs.metrics.Levels[i].NumFiles
540 1 : }
541 1 : if sizeExceeded && !requireRotation {
542 1 : requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
543 1 : }
544 1 : var newManifestFileNum base.DiskFileNum
545 1 : var prevManifestFileSize uint64
546 1 : var newManifestVirtualBackings []*fileBacking
547 1 : if requireRotation {
548 1 : newManifestFileNum = vs.getNextDiskFileNum()
549 1 : prevManifestFileSize = uint64(vs.manifest.Size())
550 1 :
551 1 : // We want the virtual backings *before* applying the version edit, because
552 1 : // the new manifest will contain the pre-apply version plus the last version
553 1 : // edit.
554 1 : newManifestVirtualBackings = vs.virtualBackings.Backings()
555 1 : }
556 :
557 : // Grab certain values before releasing vs.mu, in case createManifest() needs
558 : // to be called.
559 1 : minUnflushedLogNum := vs.minUnflushedLogNum
560 1 : nextFileNum := vs.nextFileNum.Load()
561 1 :
562 1 : // Note: this call populates ve.RemovedBackingTables.
563 1 : zombieBackings, removedVirtualBackings, localLiveSizeDelta :=
564 1 : getZombiesAndUpdateVirtualBackings(ve, &vs.virtualBackings, vs.provider)
565 1 :
566 1 : if err := func() error {
567 1 : vs.mu.Unlock()
568 1 : defer vs.mu.Lock()
569 1 :
570 1 : if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
571 0 : return base.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
572 0 : }
573 1 : var b bulkVersionEdit
574 1 : err := b.Accumulate(ve)
575 1 : if err != nil {
576 0 : return errors.Wrap(err, "MANIFEST accumulate failed")
577 0 : }
578 1 : newVersion, err = b.Apply(
579 1 : currentVersion, vs.cmp, vs.opts.FlushSplitBytes, vs.opts.Experimental.ReadCompactionRate,
580 1 : )
581 1 : if err != nil {
582 0 : return errors.Wrap(err, "MANIFEST apply failed")
583 0 : }
584 :
585 1 : if newManifestFileNum != 0 {
586 1 : if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum, newManifestVirtualBackings); err != nil {
587 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
588 1 : JobID: int(jobID),
589 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
590 1 : FileNum: newManifestFileNum,
591 1 : Err: err,
592 1 : })
593 1 : return errors.Wrap(err, "MANIFEST create failed")
594 1 : }
595 : }
596 :
597 1 : w, err := vs.manifest.Next()
598 1 : if err != nil {
599 0 : return errors.Wrap(err, "MANIFEST next record write failed")
600 0 : }
601 :
602 : // NB: Any error from this point on is considered fatal as we don't know if
603 : // the MANIFEST write occurred or not. Trying to determine that is
604 : // fraught. Instead we rely on the standard recovery mechanism run when a
605 : // database is open. In particular, that mechanism generates a new MANIFEST
606 : // and ensures it is synced.
607 1 : if err := ve.Encode(w); err != nil {
608 0 : return errors.Wrap(err, "MANIFEST write failed")
609 0 : }
610 1 : if err := vs.manifest.Flush(); err != nil {
611 1 : return errors.Wrap(err, "MANIFEST flush failed")
612 1 : }
613 1 : if err := vs.manifestFile.Sync(); err != nil {
614 1 : return errors.Wrap(err, "MANIFEST sync failed")
615 1 : }
616 1 : if newManifestFileNum != 0 {
617 1 : // NB: Move() is responsible for syncing the data directory.
618 1 : if err := vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, newManifestFileNum)); err != nil {
619 0 : return errors.Wrap(err, "MANIFEST set current failed")
620 0 : }
621 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
622 1 : JobID: int(jobID),
623 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
624 1 : FileNum: newManifestFileNum,
625 1 : })
626 : }
627 1 : return nil
628 1 : }(); err != nil {
629 1 : // Any error encountered during any of the operations in the previous
630 1 : // closure are considered fatal. Treating such errors as fatal is preferred
631 1 : // to attempting to unwind various file and b-tree reference counts, and
632 1 : // re-generating L0 sublevel metadata. This may change in the future, if
633 1 : // certain manifest / WAL operations become retryable. For more context, see
634 1 : // #1159 and #1792.
635 1 : vs.opts.Logger.Fatalf("%s", err)
636 1 : return err
637 1 : }
638 :
639 1 : if requireRotation {
640 1 : // Successfully rotated.
641 1 : vs.rotationHelper.Rotate(nextSnapshotFilecount)
642 1 : }
643 : // Now that DB.mu is held again, initialize compacting file info in
644 : // L0Sublevels.
645 1 : inProgress := inProgressCompactions()
646 1 :
647 1 : newVersion.L0Sublevels.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
648 1 :
649 1 : // Update the zombie tables set first, as installation of the new version
650 1 : // will unref the previous version which could result in addObsoleteLocked
651 1 : // being called.
652 1 : for _, b := range zombieBackings {
653 1 : vs.zombieTables[b.backing.DiskFileNum] = objectInfo{
654 1 : fileInfo: fileInfo{
655 1 : FileNum: b.backing.DiskFileNum,
656 1 : FileSize: b.backing.Size,
657 1 : },
658 1 : isLocal: b.isLocal,
659 1 : }
660 1 : }
661 :
662 : // Unref the removed backings and report those that already became obsolete.
663 : // Note that the only case where we report obsolete tables here is when
664 : // VirtualBackings.Protect/Unprotect was used to keep a backing alive without
665 : // it being used in the current version.
666 1 : var obsoleteVirtualBackings manifest.ObsoleteFiles
667 1 : for _, b := range removedVirtualBackings {
668 1 : if b.backing.Unref() == 0 {
669 1 : obsoleteVirtualBackings.FileBackings = append(obsoleteVirtualBackings.FileBackings, b.backing)
670 1 : }
671 : }
672 1 : vs.addObsoleteLocked(obsoleteVirtualBackings)
673 1 :
674 1 : // Install the new version.
675 1 : vs.append(newVersion)
676 1 :
677 1 : if ve.MinUnflushedLogNum != 0 {
678 1 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
679 1 : }
680 1 : if newManifestFileNum != 0 {
681 1 : if vs.manifestFileNum != 0 {
682 1 : vs.obsoleteManifests = append(vs.obsoleteManifests, fileInfo{
683 1 : FileNum: vs.manifestFileNum,
684 1 : FileSize: prevManifestFileSize,
685 1 : })
686 1 : }
687 1 : vs.manifestFileNum = newManifestFileNum
688 : }
689 :
690 1 : for level, update := range metrics {
691 1 : vs.metrics.Levels[level].Add(update)
692 1 : }
693 1 : for i := range vs.metrics.Levels {
694 1 : l := &vs.metrics.Levels[i]
695 1 : l.NumFiles = int64(newVersion.Levels[i].Len())
696 1 : l.NumVirtualFiles = newVersion.Levels[i].NumVirtual
697 1 : l.VirtualSize = newVersion.Levels[i].VirtualSize
698 1 : l.Size = int64(newVersion.Levels[i].Size())
699 1 :
700 1 : l.Sublevels = 0
701 1 : if l.NumFiles > 0 {
702 1 : l.Sublevels = 1
703 1 : }
704 1 : if invariants.Enabled {
705 1 : levelFiles := newVersion.Levels[i].Slice()
706 1 : if size := int64(levelFiles.SizeSum()); l.Size != size {
707 0 : vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.Size, size)
708 0 : }
709 1 : if nVirtual := levelFiles.NumVirtual(); nVirtual != l.NumVirtualFiles {
710 0 : vs.opts.Logger.Fatalf(
711 0 : "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
712 0 : i, l.NumVirtualFiles, nVirtual,
713 0 : )
714 0 : }
715 1 : if vSize := levelFiles.VirtualSizeSum(); vSize != l.VirtualSize {
716 0 : vs.opts.Logger.Fatalf(
717 0 : "versionSet metrics L%d Virtual size = %d, actual size = %d",
718 0 : i, l.VirtualSize, vSize,
719 0 : )
720 0 : }
721 : }
722 : }
723 1 : vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
724 1 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localLiveSizeDelta)
725 1 :
726 1 : vs.setCompactionPicker(
727 1 : newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, inProgress))
728 1 : if !vs.dynamicBaseLevel {
729 1 : vs.picker.forceBaseLevel1()
730 1 : }
731 1 : return nil
732 : }
733 :
734 1 : func (vs *versionSet) setCompactionPicker(picker *compactionPickerByScore) {
735 1 : vs.picker = picker
736 1 : vs.curCompactionConcurrency.Store(int32(picker.getCompactionConcurrency()))
737 1 : }
738 :
739 : type fileBackingInfo struct {
740 : backing *fileBacking
741 : isLocal bool
742 : }
743 :
744 : // getZombiesAndUpdateVirtualBackings updates the virtual backings with the
745 : // changes in the versionEdit and populates ve.RemovedBackingTables.
746 : // Returns:
747 : // - zombieBackings: all backings (physical and virtual) that will no longer be
748 : // needed when we apply ve.
749 : // - removedVirtualBackings: the virtual backings that will be removed by the
750 : // VersionEdit and which must be Unref()ed by the caller. These backings
751 : // match ve.RemovedBackingTables.
752 : // - localLiveSizeDelta: the delta in local live bytes.
753 : func getZombiesAndUpdateVirtualBackings(
754 : ve *versionEdit, virtualBackings *manifest.VirtualBackings, provider objstorage.Provider,
755 1 : ) (zombieBackings, removedVirtualBackings []fileBackingInfo, localLiveSizeDelta int64) {
756 1 : // First, deal with the physical tables.
757 1 : //
758 1 : // A physical backing has become unused if it is in DeletedFiles but not in
759 1 : // NewFiles or CreatedBackingTables.
760 1 : //
761 1 : // Note that for the common case where there are very few elements, the map
762 1 : // will stay on the stack.
763 1 : stillUsed := make(map[base.DiskFileNum]struct{})
764 1 : for _, nf := range ve.NewTables {
765 1 : if !nf.Meta.Virtual {
766 1 : stillUsed[nf.Meta.FileBacking.DiskFileNum] = struct{}{}
767 1 : _, localFileDelta := sizeIfLocal(nf.Meta.FileBacking, provider)
768 1 : localLiveSizeDelta += localFileDelta
769 1 : }
770 : }
771 1 : for _, b := range ve.CreatedBackingTables {
772 1 : stillUsed[b.DiskFileNum] = struct{}{}
773 1 : }
774 1 : for _, m := range ve.DeletedTables {
775 1 : if !m.Virtual {
776 1 : // NB: this deleted file may also be in NewFiles or
777 1 : // CreatedBackingTables, due to a file moving between levels, or
778 1 : // becoming virtualized. In which case there is no change due to this
779 1 : // file in the localLiveSizeDelta -- the subtraction below compensates
780 1 : // for the addition.
781 1 : isLocal, localFileDelta := sizeIfLocal(m.FileBacking, provider)
782 1 : localLiveSizeDelta -= localFileDelta
783 1 : if _, ok := stillUsed[m.FileBacking.DiskFileNum]; !ok {
784 1 : zombieBackings = append(zombieBackings, fileBackingInfo{
785 1 : backing: m.FileBacking,
786 1 : isLocal: isLocal,
787 1 : })
788 1 : }
789 : }
790 : }
791 :
792 : // Now deal with virtual tables.
793 : //
794 : // When a virtual table moves between levels we AddTable() then RemoveTable(),
795 : // which works out.
796 1 : for _, b := range ve.CreatedBackingTables {
797 1 : virtualBackings.AddAndRef(b)
798 1 : _, localFileDelta := sizeIfLocal(b, provider)
799 1 : localLiveSizeDelta += localFileDelta
800 1 : }
801 1 : for _, nf := range ve.NewTables {
802 1 : if nf.Meta.Virtual {
803 1 : virtualBackings.AddTable(nf.Meta)
804 1 : }
805 : }
806 1 : for _, m := range ve.DeletedTables {
807 1 : if m.Virtual {
808 1 : virtualBackings.RemoveTable(m)
809 1 : }
810 : }
811 :
812 1 : if unused := virtualBackings.Unused(); len(unused) > 0 {
813 1 : // Virtual backings that are no longer used are zombies and are also added
814 1 : // to RemovedBackingTables (before the version edit is written to disk).
815 1 : ve.RemovedBackingTables = make([]base.DiskFileNum, len(unused))
816 1 : for i, b := range unused {
817 1 : isLocal, localFileDelta := sizeIfLocal(b, provider)
818 1 : localLiveSizeDelta -= localFileDelta
819 1 : ve.RemovedBackingTables[i] = b.DiskFileNum
820 1 : zombieBackings = append(zombieBackings, fileBackingInfo{
821 1 : backing: b,
822 1 : isLocal: isLocal,
823 1 : })
824 1 : virtualBackings.Remove(b.DiskFileNum)
825 1 : }
826 1 : removedVirtualBackings = zombieBackings[len(zombieBackings)-len(unused):]
827 : }
828 1 : return zombieBackings, removedVirtualBackings, localLiveSizeDelta
829 : }
830 :
831 : // sizeIfLocal returns backing.Size if the backing is a local file, else 0.
832 : func sizeIfLocal(
833 : backing *fileBacking, provider objstorage.Provider,
834 1 : ) (isLocal bool, localSize int64) {
835 1 : isLocal = objstorage.IsLocalTable(provider, backing.DiskFileNum)
836 1 : if isLocal {
837 1 : return true, int64(backing.Size)
838 1 : }
839 1 : return false, 0
840 : }
841 :
842 : func (vs *versionSet) incrementCompactions(
843 : kind compactionKind, extraLevels []*compactionLevel, pickerMetrics compactionPickerMetrics,
844 1 : ) {
845 1 : switch kind {
846 1 : case compactionKindDefault:
847 1 : vs.metrics.Compact.Count++
848 1 : vs.metrics.Compact.DefaultCount++
849 :
850 1 : case compactionKindFlush, compactionKindIngestedFlushable:
851 1 : vs.metrics.Flush.Count++
852 :
853 1 : case compactionKindMove:
854 1 : vs.metrics.Compact.Count++
855 1 : vs.metrics.Compact.MoveCount++
856 :
857 1 : case compactionKindDeleteOnly:
858 1 : vs.metrics.Compact.Count++
859 1 : vs.metrics.Compact.DeleteOnlyCount++
860 :
861 1 : case compactionKindElisionOnly:
862 1 : vs.metrics.Compact.Count++
863 1 : vs.metrics.Compact.ElisionOnlyCount++
864 :
865 1 : case compactionKindRead:
866 1 : vs.metrics.Compact.Count++
867 1 : vs.metrics.Compact.ReadCount++
868 :
869 0 : case compactionKindTombstoneDensity:
870 0 : vs.metrics.Compact.Count++
871 0 : vs.metrics.Compact.TombstoneDensityCount++
872 :
873 1 : case compactionKindRewrite:
874 1 : vs.metrics.Compact.Count++
875 1 : vs.metrics.Compact.RewriteCount++
876 :
877 1 : case compactionKindCopy:
878 1 : vs.metrics.Compact.Count++
879 1 : vs.metrics.Compact.CopyCount++
880 :
881 0 : default:
882 0 : if invariants.Enabled {
883 0 : panic("unhandled compaction kind")
884 : }
885 : }
886 1 : if len(extraLevels) > 0 {
887 1 : vs.metrics.Compact.MultiLevelCount++
888 1 : }
889 : }
890 :
891 1 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
892 1 : vs.atomicInProgressBytes.Add(numBytes)
893 1 : }
894 :
895 : // createManifest creates a manifest file that contains a snapshot of vs.
896 : func (vs *versionSet) createManifest(
897 : dirname string,
898 : fileNum, minUnflushedLogNum base.DiskFileNum,
899 : nextFileNum uint64,
900 : virtualBackings []*fileBacking,
901 1 : ) (err error) {
902 1 : var (
903 1 : filename = base.MakeFilepath(vs.fs, dirname, base.FileTypeManifest, fileNum)
904 1 : manifestFile vfs.File
905 1 : manifest *record.Writer
906 1 : )
907 1 : defer func() {
908 1 : if manifest != nil {
909 0 : manifest.Close()
910 0 : }
911 1 : if manifestFile != nil {
912 0 : manifestFile.Close()
913 0 : }
914 1 : if err != nil {
915 1 : vs.fs.Remove(filename)
916 1 : }
917 : }()
918 1 : manifestFile, err = vs.fs.Create(filename, "pebble-manifest")
919 1 : if err != nil {
920 1 : return err
921 1 : }
922 1 : manifest = record.NewWriter(manifestFile)
923 1 :
924 1 : snapshot := versionEdit{
925 1 : ComparerName: vs.cmp.Name,
926 1 : }
927 1 :
928 1 : for level, levelMetadata := range vs.currentVersion().Levels {
929 1 : iter := levelMetadata.Iter()
930 1 : for meta := iter.First(); meta != nil; meta = iter.Next() {
931 1 : snapshot.NewTables = append(snapshot.NewTables, newTableEntry{
932 1 : Level: level,
933 1 : Meta: meta,
934 1 : })
935 1 : }
936 : }
937 :
938 1 : snapshot.CreatedBackingTables = virtualBackings
939 1 :
940 1 : // When creating a version snapshot for an existing DB, this snapshot VersionEdit will be
941 1 : // immediately followed by another VersionEdit (being written in logAndApply()). That
942 1 : // VersionEdit always contains a LastSeqNum, so we don't need to include that in the snapshot.
943 1 : // But it does not necessarily include MinUnflushedLogNum, NextFileNum, so we initialize those
944 1 : // using the corresponding fields in the versionSet (which came from the latest preceding
945 1 : // VersionEdit that had those fields).
946 1 : snapshot.MinUnflushedLogNum = minUnflushedLogNum
947 1 : snapshot.NextFileNum = nextFileNum
948 1 :
949 1 : w, err1 := manifest.Next()
950 1 : if err1 != nil {
951 0 : return err1
952 0 : }
953 1 : if err := snapshot.Encode(w); err != nil {
954 0 : return err
955 0 : }
956 :
957 1 : if vs.manifest != nil {
958 1 : vs.manifest.Close()
959 1 : vs.manifest = nil
960 1 : }
961 1 : if vs.manifestFile != nil {
962 1 : if err := vs.manifestFile.Close(); err != nil {
963 0 : return err
964 0 : }
965 1 : vs.manifestFile = nil
966 : }
967 :
968 1 : vs.manifest, manifest = manifest, nil
969 1 : vs.manifestFile, manifestFile = manifestFile, nil
970 1 : return nil
971 : }
972 :
973 : // NB: This method is not safe for concurrent use. It is only safe
974 : // to be called when concurrent changes to nextFileNum are not expected.
975 1 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
976 1 : if vs.nextFileNum.Load() <= uint64(fileNum) {
977 1 : vs.nextFileNum.Store(uint64(fileNum + 1))
978 1 : }
979 : }
980 :
981 : // getNextFileNum returns the next file number to be used.
982 : //
983 : // Can be called without the versionSet's mutex being held.
984 1 : func (vs *versionSet) getNextFileNum() base.FileNum {
985 1 : x := vs.nextFileNum.Add(1) - 1
986 1 : return base.FileNum(x)
987 1 : }
988 :
989 : // Can be called without the versionSet's mutex being held.
990 1 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
991 1 : x := vs.nextFileNum.Add(1) - 1
992 1 : return base.DiskFileNum(x)
993 1 : }
994 :
995 1 : func (vs *versionSet) append(v *version) {
996 1 : if v.Refs() != 0 {
997 0 : panic("pebble: version should be unreferenced")
998 : }
999 1 : if !vs.versions.Empty() {
1000 1 : vs.versions.Back().UnrefLocked()
1001 1 : }
1002 1 : v.Deleted = vs.obsoleteFn
1003 1 : v.Ref()
1004 1 : vs.versions.PushBack(v)
1005 1 : if invariants.Enabled {
1006 1 : // Verify that the virtualBackings contains all the backings referenced by
1007 1 : // the version.
1008 1 : for _, l := range v.Levels {
1009 1 : iter := l.Iter()
1010 1 : for f := iter.First(); f != nil; f = iter.Next() {
1011 1 : if f.Virtual {
1012 1 : if _, ok := vs.virtualBackings.Get(f.FileBacking.DiskFileNum); !ok {
1013 0 : panic(fmt.Sprintf("%s is not in virtualBackings", f.FileBacking.DiskFileNum))
1014 : }
1015 : }
1016 : }
1017 : }
1018 : }
1019 : }
1020 :
1021 1 : func (vs *versionSet) currentVersion() *version {
1022 1 : return vs.versions.Back()
1023 1 : }
1024 :
1025 1 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
1026 1 : current := vs.currentVersion()
1027 1 : for v := vs.versions.Front(); true; v = v.Next() {
1028 1 : for _, lm := range v.Levels {
1029 1 : iter := lm.Iter()
1030 1 : for f := iter.First(); f != nil; f = iter.Next() {
1031 1 : m[f.FileBacking.DiskFileNum] = struct{}{}
1032 1 : }
1033 : }
1034 1 : if v == current {
1035 1 : break
1036 : }
1037 : }
1038 : // virtualBackings contains backings that are referenced by some virtual
1039 : // tables in the latest version (which are handled above), and backings that
1040 : // are not but are still alive because of the protection mechanism (see
1041 : // manifset.VirtualBackings). This loop ensures the latter get added to the
1042 : // map.
1043 1 : vs.virtualBackings.ForEach(func(b *fileBacking) {
1044 1 : m[b.DiskFileNum] = struct{}{}
1045 1 : })
1046 : }
1047 :
1048 : // addObsoleteLocked will add the fileInfo associated with obsolete backing
1049 : // sstables to the obsolete tables list.
1050 : //
1051 : // The file backings in the obsolete list must not appear more than once.
1052 : //
1053 : // DB.mu must be held when addObsoleteLocked is called.
1054 1 : func (vs *versionSet) addObsoleteLocked(obsolete manifest.ObsoleteFiles) {
1055 1 : if obsolete.Count() == 0 {
1056 1 : return
1057 1 : }
1058 :
1059 1 : obsoleteFileInfo := make([]objectInfo, len(obsolete.FileBackings))
1060 1 : for i, bs := range obsolete.FileBackings {
1061 1 : obsoleteFileInfo[i].FileNum = bs.DiskFileNum
1062 1 : obsoleteFileInfo[i].FileSize = bs.Size
1063 1 : }
1064 :
1065 1 : if invariants.Enabled {
1066 1 : dedup := make(map[base.DiskFileNum]struct{})
1067 1 : for _, fi := range obsoleteFileInfo {
1068 1 : dedup[fi.FileNum] = struct{}{}
1069 1 : }
1070 1 : if len(dedup) != len(obsoleteFileInfo) {
1071 0 : panic("pebble: duplicate FileBacking present in obsolete list")
1072 : }
1073 : }
1074 :
1075 1 : for i, fi := range obsoleteFileInfo {
1076 1 : // Note that the obsolete tables are no longer zombie by the definition of
1077 1 : // zombie, but we leave them in the zombie tables map until they are
1078 1 : // deleted from disk.
1079 1 : //
1080 1 : // TODO(sumeer): this means that the zombie metrics, like ZombieSize,
1081 1 : // computed in DB.Metrics are also being counted in the obsolete metrics.
1082 1 : // Was this intentional?
1083 1 : info, ok := vs.zombieTables[fi.FileNum]
1084 1 : if !ok {
1085 0 : vs.opts.Logger.Fatalf("MANIFEST obsolete table %s not marked as zombie", fi.FileNum)
1086 0 : }
1087 1 : obsoleteFileInfo[i].isLocal = info.isLocal
1088 : }
1089 :
1090 1 : vs.obsoleteTables = append(vs.obsoleteTables, obsoleteFileInfo...)
1091 1 : vs.updateObsoleteTableMetricsLocked()
1092 : }
1093 :
1094 : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
1095 : // called.
1096 1 : func (vs *versionSet) addObsolete(obsolete manifest.ObsoleteFiles) {
1097 1 : vs.mu.Lock()
1098 1 : defer vs.mu.Unlock()
1099 1 : vs.addObsoleteLocked(obsolete)
1100 1 : }
1101 :
1102 1 : func (vs *versionSet) updateObsoleteTableMetricsLocked() {
1103 1 : vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
1104 1 : vs.metrics.Table.ObsoleteSize = 0
1105 1 : vs.metrics.Table.Local.ObsoleteSize = 0
1106 1 : for _, fi := range vs.obsoleteTables {
1107 1 : vs.metrics.Table.ObsoleteSize += fi.FileSize
1108 1 : if fi.isLocal {
1109 1 : vs.metrics.Table.Local.ObsoleteSize += fi.FileSize
1110 1 : }
1111 : }
1112 : }
1113 :
1114 : func findCurrentManifest(
1115 : fs vfs.FS, dirname string, ls []string,
1116 1 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
1117 1 : // Locating a marker should succeed even if the marker has never been placed.
1118 1 : var filename string
1119 1 : marker, filename, err = atomicfs.LocateMarkerInListing(fs, dirname, manifestMarkerName, ls)
1120 1 : if err != nil {
1121 1 : return nil, 0, false, err
1122 1 : }
1123 :
1124 1 : if filename == "" {
1125 1 : // The marker hasn't been set yet. This database doesn't exist.
1126 1 : return marker, 0, false, nil
1127 1 : }
1128 :
1129 1 : var ok bool
1130 1 : _, manifestNum, ok = base.ParseFilename(fs, filename)
1131 1 : if !ok {
1132 0 : return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
1133 0 : }
1134 1 : return marker, manifestNum, true, nil
1135 : }
1136 :
1137 1 : func newFileMetrics(newFiles []manifest.NewTableEntry) map[int]*LevelMetrics {
1138 1 : m := map[int]*LevelMetrics{}
1139 1 : for _, nf := range newFiles {
1140 1 : lm := m[nf.Level]
1141 1 : if lm == nil {
1142 1 : lm = &LevelMetrics{}
1143 1 : m[nf.Level] = lm
1144 1 : }
1145 1 : lm.NumFiles++
1146 1 : lm.Size += int64(nf.Meta.Size)
1147 : }
1148 1 : return m
1149 : }
|