Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "sync"
11 : "sync/atomic"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/record"
19 : "github.com/cockroachdb/pebble/vfs"
20 : "github.com/cockroachdb/pebble/vfs/atomicfs"
21 : )
22 :
23 : const numLevels = manifest.NumLevels
24 :
25 : const manifestMarkerName = `manifest`
26 :
27 : // Provide type aliases for the various manifest structs.
28 : type bulkVersionEdit = manifest.BulkVersionEdit
29 : type deletedFileEntry = manifest.DeletedFileEntry
30 : type fileMetadata = manifest.FileMetadata
31 : type physicalMeta = manifest.PhysicalFileMeta
32 : type virtualMeta = manifest.VirtualFileMeta
33 : type fileBacking = manifest.FileBacking
34 : type newFileEntry = manifest.NewFileEntry
35 : type version = manifest.Version
36 : type versionEdit = manifest.VersionEdit
37 : type versionList = manifest.VersionList
38 :
39 : // versionSet manages a collection of immutable versions, and manages the
40 : // creation of a new version from the most recent version. A new version is
41 : // created from an existing version by applying a version edit which is just
42 : // like it sounds: a delta from the previous version. Version edits are logged
43 : // to the MANIFEST file, which is replayed at startup.
44 : type versionSet struct {
45 : // Next seqNum to use for WAL writes.
46 : logSeqNum base.AtomicSeqNum
47 :
48 : // The upper bound on sequence numbers that have been assigned so far. A
49 : // suffix of these sequence numbers may not have been written to a WAL. Both
50 : // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
51 : // visibleSeqNum is <= logSeqNum.
52 : visibleSeqNum base.AtomicSeqNum
53 :
54 : // Number of bytes present in sstables being written by in-progress
55 : // compactions. This value will be zero if there are no in-progress
56 : // compactions. Updated and read atomically.
57 : atomicInProgressBytes atomic.Int64
58 :
59 : // Immutable fields.
60 : dirname string
61 : provider objstorage.Provider
62 : // Set to DB.mu.
63 : mu *sync.Mutex
64 : opts *Options
65 : fs vfs.FS
66 : cmp *base.Comparer
67 : // Dynamic base level allows the dynamic base level computation to be
68 : // disabled. Used by tests which want to create specific LSM structures.
69 : dynamicBaseLevel bool
70 :
71 : // Mutable fields.
72 : versions versionList
73 : picker compactionPicker
74 :
75 : // Not all metrics are kept here. See DB.Metrics().
76 : metrics Metrics
77 :
78 : // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
79 : // on the creation of every version.
80 : obsoleteFn func(obsolete []*fileBacking)
81 : obsoleteTables []tableInfo
82 : obsoleteManifests []fileInfo
83 : obsoleteOptions []fileInfo
84 :
85 : // Zombie tables which have been removed from the current version but are
86 : // still referenced by an inuse iterator.
87 : zombieTables map[base.DiskFileNum]tableInfo
88 :
89 : // virtualBackings contains information about the FileBackings which support
90 : // virtual sstables in the latest version. It is mainly used to determine when
91 : // a backing is no longer in use by the tables in the latest version; this is
92 : // not a trivial problem because compactions and other operations can happen
93 : // in parallel (and they can finish in unpredictable order).
94 : //
95 : // This mechanism is complementary to the backing Ref/Unref mechanism, which
96 : // determines when a backing is no longer used by *any* live version and can
97 : // be removed.
98 : //
99 : // In principle this should have been a copy-on-write structure, with each
100 : // Version having its own record of its virtual backings (similar to the
101 : // B-tree that holds the tables). However, in practice we only need it for the
102 : // latest version, so we use a simpler structure and store it in the
103 : // versionSet instead.
104 : //
105 : // virtualBackings is modified under DB.mu and the log lock. If it is accessed
106 : // under DB.mu and a version update is in progress, it reflects the state of
107 : // the next version.
108 : virtualBackings manifest.VirtualBackings
109 :
110 : // minUnflushedLogNum is the smallest WAL log file number corresponding to
111 : // mutations that have not been flushed to an sstable.
112 : minUnflushedLogNum base.DiskFileNum
113 :
114 : // The next file number. A single counter is used to assign file
115 : // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
116 : nextFileNum atomic.Uint64
117 :
118 : // The current manifest file number.
119 : manifestFileNum base.DiskFileNum
120 : manifestMarker *atomicfs.Marker
121 :
122 : manifestFile vfs.File
123 : manifest *record.Writer
124 : getFormatMajorVersion func() FormatMajorVersion
125 :
126 : writing bool
127 : writerCond sync.Cond
128 : // State for deciding when to write a snapshot. Protected by mu.
129 : rotationHelper record.RotationHelper
130 : }
131 :
132 : type tableInfo struct {
133 : fileInfo
134 : isLocal bool
135 : }
136 :
137 : func (vs *versionSet) init(
138 : dirname string,
139 : provider objstorage.Provider,
140 : opts *Options,
141 : marker *atomicfs.Marker,
142 : getFMV func() FormatMajorVersion,
143 : mu *sync.Mutex,
144 2 : ) {
145 2 : vs.dirname = dirname
146 2 : vs.provider = provider
147 2 : vs.mu = mu
148 2 : vs.writerCond.L = mu
149 2 : vs.opts = opts
150 2 : vs.fs = opts.FS
151 2 : vs.cmp = opts.Comparer
152 2 : vs.dynamicBaseLevel = true
153 2 : vs.versions.Init(mu)
154 2 : vs.obsoleteFn = vs.addObsoleteLocked
155 2 : vs.zombieTables = make(map[base.DiskFileNum]tableInfo)
156 2 : vs.virtualBackings = manifest.MakeVirtualBackings()
157 2 : vs.nextFileNum.Store(1)
158 2 : vs.manifestMarker = marker
159 2 : vs.getFormatMajorVersion = getFMV
160 2 : }
161 :
162 : // create creates a version set for a fresh DB.
163 : func (vs *versionSet) create(
164 : jobID JobID,
165 : dirname string,
166 : provider objstorage.Provider,
167 : opts *Options,
168 : marker *atomicfs.Marker,
169 : getFormatMajorVersion func() FormatMajorVersion,
170 : mu *sync.Mutex,
171 2 : ) error {
172 2 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
173 2 : var bve bulkVersionEdit
174 2 : newVersion, err := bve.Apply(nil /* curr */, opts.Comparer, opts.FlushSplitBytes, opts.Experimental.ReadCompactionRate)
175 2 : if err != nil {
176 0 : return err
177 0 : }
178 2 : vs.append(newVersion)
179 2 :
180 2 : vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil)
181 2 : // Note that a "snapshot" version edit is written to the manifest when it is
182 2 : // created.
183 2 : vs.manifestFileNum = vs.getNextDiskFileNum()
184 2 : err = vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum.Load(), nil /* virtualBackings */)
185 2 : if err == nil {
186 2 : if err = vs.manifest.Flush(); err != nil {
187 1 : vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
188 1 : }
189 : }
190 2 : if err == nil {
191 2 : if err = vs.manifestFile.Sync(); err != nil {
192 1 : vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
193 1 : }
194 : }
195 2 : if err == nil {
196 2 : // NB: Move() is responsible for syncing the data directory.
197 2 : if err = vs.manifestMarker.Move(base.MakeFilename(fileTypeManifest, vs.manifestFileNum)); err != nil {
198 1 : vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
199 1 : }
200 : }
201 :
202 2 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
203 2 : JobID: int(jobID),
204 2 : Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, vs.manifestFileNum),
205 2 : FileNum: vs.manifestFileNum,
206 2 : Err: err,
207 2 : })
208 2 : if err != nil {
209 1 : return err
210 1 : }
211 2 : return nil
212 : }
213 :
214 : // load loads the version set from the manifest file.
215 : func (vs *versionSet) load(
216 : dirname string,
217 : provider objstorage.Provider,
218 : opts *Options,
219 : manifestFileNum base.DiskFileNum,
220 : marker *atomicfs.Marker,
221 : getFormatMajorVersion func() FormatMajorVersion,
222 : mu *sync.Mutex,
223 2 : ) error {
224 2 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
225 2 :
226 2 : vs.manifestFileNum = manifestFileNum
227 2 : manifestPath := base.MakeFilepath(opts.FS, dirname, fileTypeManifest, vs.manifestFileNum)
228 2 : manifestFilename := opts.FS.PathBase(manifestPath)
229 2 :
230 2 : // Read the versionEdits in the manifest file.
231 2 : var bve bulkVersionEdit
232 2 : bve.AddedByFileNum = make(map[base.FileNum]*fileMetadata)
233 2 : manifest, err := vs.fs.Open(manifestPath)
234 2 : if err != nil {
235 0 : return errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
236 0 : errors.Safe(manifestFilename), dirname)
237 0 : }
238 2 : defer manifest.Close()
239 2 : rr := record.NewReader(manifest, 0 /* logNum */)
240 2 : for {
241 2 : r, err := rr.Next()
242 2 : if err == io.EOF || record.IsInvalidRecord(err) {
243 2 : break
244 : }
245 2 : if err != nil {
246 0 : return errors.Wrapf(err, "pebble: error when loading manifest file %q",
247 0 : errors.Safe(manifestFilename))
248 0 : }
249 2 : var ve versionEdit
250 2 : err = ve.Decode(r)
251 2 : if err != nil {
252 0 : // Break instead of returning an error if the record is corrupted
253 0 : // or invalid.
254 0 : if err == io.EOF || record.IsInvalidRecord(err) {
255 0 : break
256 : }
257 0 : return err
258 : }
259 2 : if ve.ComparerName != "" {
260 2 : if ve.ComparerName != vs.cmp.Name {
261 1 : return errors.Errorf("pebble: manifest file %q for DB %q: "+
262 1 : "comparer name from file %q != comparer name from Options %q",
263 1 : errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(vs.cmp.Name))
264 1 : }
265 : }
266 2 : if err := bve.Accumulate(&ve); err != nil {
267 0 : return err
268 0 : }
269 2 : if ve.MinUnflushedLogNum != 0 {
270 2 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
271 2 : }
272 2 : if ve.NextFileNum != 0 {
273 2 : vs.nextFileNum.Store(ve.NextFileNum)
274 2 : }
275 2 : if ve.LastSeqNum != 0 {
276 2 : // logSeqNum is the _next_ sequence number that will be assigned,
277 2 : // while LastSeqNum is the last assigned sequence number. Note that
278 2 : // this behaviour mimics that in RocksDB; the first sequence number
279 2 : // assigned is one greater than the one present in the manifest
280 2 : // (assuming no WALs contain higher sequence numbers than the
281 2 : // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
282 2 : // next sequence number that will be assigned.
283 2 : //
284 2 : // If LastSeqNum is less than SeqNumStart, increase it to at least
285 2 : // SeqNumStart to leave ample room for reserved sequence numbers.
286 2 : if ve.LastSeqNum+1 < base.SeqNumStart {
287 0 : vs.logSeqNum.Store(base.SeqNumStart)
288 2 : } else {
289 2 : vs.logSeqNum.Store(ve.LastSeqNum + 1)
290 2 : }
291 : }
292 : }
293 : // We have already set vs.nextFileNum = 2 at the beginning of the
294 : // function and could have only updated it to some other non-zero value,
295 : // so it cannot be 0 here.
296 2 : if vs.minUnflushedLogNum == 0 {
297 2 : if vs.nextFileNum.Load() >= 2 {
298 2 : // We either have a freshly created DB, or a DB created by RocksDB
299 2 : // that has not had a single flushed SSTable yet. This is because
300 2 : // RocksDB bumps up nextFileNum in this case without bumping up
301 2 : // minUnflushedLogNum, even if WALs with non-zero file numbers are
302 2 : // present in the directory.
303 2 : } else {
304 0 : return base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
305 0 : errors.Safe(manifestFilename), dirname)
306 0 : }
307 : }
308 2 : vs.markFileNumUsed(vs.minUnflushedLogNum)
309 2 :
310 2 : // Populate the fileBackingMap and the FileBacking for virtual sstables since
311 2 : // we have finished version edit accumulation.
312 2 : for _, b := range bve.AddedFileBacking {
313 2 : vs.virtualBackings.AddAndRef(b)
314 2 : }
315 :
316 2 : for _, addedLevel := range bve.Added {
317 2 : for _, m := range addedLevel {
318 2 : if m.Virtual {
319 2 : vs.virtualBackings.AddTable(m)
320 2 : }
321 : }
322 : }
323 :
324 2 : if invariants.Enabled {
325 2 : // There should be no deleted tables or backings, since we're starting from
326 2 : // an empty state.
327 2 : for _, deletedLevel := range bve.Deleted {
328 2 : if len(deletedLevel) != 0 {
329 0 : panic("deleted files after manifest replay")
330 : }
331 : }
332 2 : if len(bve.RemovedFileBacking) > 0 {
333 0 : panic("deleted backings after manifest replay")
334 : }
335 : }
336 :
337 2 : newVersion, err := bve.Apply(nil, opts.Comparer, opts.FlushSplitBytes, opts.Experimental.ReadCompactionRate)
338 2 : if err != nil {
339 0 : return err
340 0 : }
341 2 : newVersion.L0Sublevels.InitCompactingFileInfo(nil /* in-progress compactions */)
342 2 : vs.append(newVersion)
343 2 :
344 2 : for i := range vs.metrics.Levels {
345 2 : l := &vs.metrics.Levels[i]
346 2 : l.NumFiles = int64(newVersion.Levels[i].Len())
347 2 : files := newVersion.Levels[i].Slice()
348 2 : l.Size = int64(files.SizeSum())
349 2 : }
350 2 : for _, l := range newVersion.Levels {
351 2 : iter := l.Iter()
352 2 : for f := iter.First(); f != nil; f = iter.Next() {
353 2 : if !f.Virtual {
354 2 : _, localSize := sizeIfLocal(f.FileBacking, vs.provider)
355 2 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
356 2 : }
357 : }
358 : }
359 2 : vs.virtualBackings.ForEach(func(backing *fileBacking) {
360 2 : _, localSize := sizeIfLocal(backing, vs.provider)
361 2 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
362 2 : })
363 :
364 2 : vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil)
365 2 : return nil
366 : }
367 :
368 2 : func (vs *versionSet) close() error {
369 2 : if vs.manifestFile != nil {
370 2 : if err := vs.manifestFile.Close(); err != nil {
371 0 : return err
372 0 : }
373 : }
374 2 : if vs.manifestMarker != nil {
375 2 : if err := vs.manifestMarker.Close(); err != nil {
376 0 : return err
377 0 : }
378 : }
379 2 : return nil
380 : }
381 :
382 : // logLock locks the manifest for writing. The lock must be released by either
383 : // a call to logUnlock or logAndApply.
384 : //
385 : // DB.mu must be held when calling this method, but the mutex may be dropped and
386 : // re-acquired during the course of this method.
387 2 : func (vs *versionSet) logLock() {
388 2 : // Wait for any existing writing to the manifest to complete, then mark the
389 2 : // manifest as busy.
390 2 : for vs.writing {
391 2 : // Note: writerCond.L is DB.mu, so we unlock it while we wait.
392 2 : vs.writerCond.Wait()
393 2 : }
394 2 : vs.writing = true
395 : }
396 :
397 : // logUnlock releases the lock for manifest writing.
398 : //
399 : // DB.mu must be held when calling this method.
400 2 : func (vs *versionSet) logUnlock() {
401 2 : if !vs.writing {
402 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
403 0 : }
404 2 : vs.writing = false
405 2 : vs.writerCond.Signal()
406 : }
407 :
408 : // logAndApply logs the version edit to the manifest, applies the version edit
409 : // to the current version, and installs the new version.
410 : //
411 : // logAndApply fills in the following fields of the VersionEdit: NextFileNum,
412 : // LastSeqNum, RemovedBackingTables. The removed backing tables are those
413 : // backings that are no longer used (in the new version) after applying the edit
414 : // (as per vs.virtualBackings). Other than these fields, the VersionEdit must be
415 : // complete.
416 : //
417 : // New table backing references (FileBacking.Ref) are taken as part of applying
418 : // the version edit. The state of the virtual backings (vs.virtualBackings) is
419 : // updated before logging to the manifest and installing the latest version;
420 : // this is ok because any failure in those steps is fatal.
421 : // TODO(radu): remove the error return.
422 : //
423 : // DB.mu must be held when calling this method and will be released temporarily
424 : // while performing file I/O. Requires that the manifest is locked for writing
425 : // (see logLock). Will unconditionally release the manifest lock (via
426 : // logUnlock) even if an error occurs.
427 : //
428 : // inProgressCompactions is called while DB.mu is held, to get the list of
429 : // in-progress compactions.
430 : func (vs *versionSet) logAndApply(
431 : jobID JobID,
432 : ve *versionEdit,
433 : metrics map[int]*LevelMetrics,
434 : forceRotation bool,
435 : inProgressCompactions func() []compactionInfo,
436 2 : ) error {
437 2 : if !vs.writing {
438 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
439 0 : }
440 2 : defer vs.logUnlock()
441 2 :
442 2 : if ve.MinUnflushedLogNum != 0 {
443 2 : if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
444 2 : vs.nextFileNum.Load() <= uint64(ve.MinUnflushedLogNum) {
445 0 : panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
446 0 : ve.MinUnflushedLogNum))
447 : }
448 : }
449 :
450 : // This is the next manifest filenum, but if the current file is too big we
451 : // will write this ve to the next file which means what ve encodes is the
452 : // current filenum and not the next one.
453 : //
454 : // TODO(sbhola): figure out why this is correct and update comment.
455 2 : ve.NextFileNum = vs.nextFileNum.Load()
456 2 :
457 2 : // LastSeqNum is set to the current upper bound on the assigned sequence
458 2 : // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
459 2 : // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
460 2 : // replay. It must be higher than or equal to any than any sequence number
461 2 : // written to an sstable, including sequence numbers in ingested files.
462 2 : // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
463 2 : // number. This is fallout from ingestion which allows a sequence number X to
464 2 : // be assigned to an ingested sstable even though sequence number X-1 resides
465 2 : // in an unflushed memtable. logSeqNum is the _next_ sequence number that
466 2 : // will be assigned, so subtract that by 1 to get the upper bound on the
467 2 : // last assigned sequence number.
468 2 : logSeqNum := vs.logSeqNum.Load()
469 2 : ve.LastSeqNum = logSeqNum - 1
470 2 : if logSeqNum == 0 {
471 0 : // logSeqNum is initialized to 1 in Open() if there are no previous WAL
472 0 : // or manifest records, so this case should never happen.
473 0 : vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
474 0 : }
475 :
476 2 : currentVersion := vs.currentVersion()
477 2 : var newVersion *version
478 2 :
479 2 : // Generate a new manifest if we don't currently have one, or forceRotation
480 2 : // is true, or the current one is too large.
481 2 : //
482 2 : // For largeness, we do not exclusively use MaxManifestFileSize size
483 2 : // threshold since we have had incidents where due to either large keys or
484 2 : // large numbers of files, each edit results in a snapshot + write of the
485 2 : // edit. This slows the system down since each flush or compaction is
486 2 : // writing a new manifest snapshot. The primary goal of the size-based
487 2 : // rollover logic is to ensure that when reopening a DB, the number of edits
488 2 : // that need to be replayed on top of the snapshot is "sane". Rolling over
489 2 : // to a new manifest after each edit is not relevant to that goal.
490 2 : //
491 2 : // Consider the following cases:
492 2 : // - The number of live files F in the DB is roughly stable: after writing
493 2 : // the snapshot (with F files), say we require that there be enough edits
494 2 : // such that the cumulative number of files in those edits, E, be greater
495 2 : // than F. This will ensure that the total amount of time in logAndApply
496 2 : // that is spent in snapshot writing is ~50%.
497 2 : //
498 2 : // - The number of live files F in the DB is shrinking drastically, say from
499 2 : // F to F/10: This can happen for various reasons, like wide range
500 2 : // tombstones, or large numbers of smaller than usual files that are being
501 2 : // merged together into larger files. And say the new files generated
502 2 : // during this shrinkage is insignificant compared to F/10, and so for
503 2 : // this example we will assume it is effectively 0. After this shrinking,
504 2 : // E = 0.9F, and so if we used the previous snapshot file count, F, as the
505 2 : // threshold that needs to be exceeded, we will further delay the snapshot
506 2 : // writing. Which means on DB reopen we will need to replay 0.9F edits to
507 2 : // get to a version with 0.1F files. It would be better to create a new
508 2 : // snapshot when E exceeds the number of files in the current version.
509 2 : //
510 2 : // - The number of live files F in the DB is growing via perfect ingests
511 2 : // into L6: Say we wrote the snapshot when there were F files and now we
512 2 : // have 10F files, so E = 9F. We will further delay writing a new
513 2 : // snapshot. This case can be critiqued as contrived, but we consider it
514 2 : // nonetheless.
515 2 : //
516 2 : // The logic below uses the min of the last snapshot file count and the file
517 2 : // count in the current version.
518 2 : vs.rotationHelper.AddRecord(int64(len(ve.DeletedFiles) + len(ve.NewFiles)))
519 2 : sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
520 2 : requireRotation := forceRotation || vs.manifest == nil
521 2 :
522 2 : var nextSnapshotFilecount int64
523 2 : for i := range vs.metrics.Levels {
524 2 : nextSnapshotFilecount += vs.metrics.Levels[i].NumFiles
525 2 : }
526 2 : if sizeExceeded && !requireRotation {
527 2 : requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
528 2 : }
529 2 : var newManifestFileNum base.DiskFileNum
530 2 : var prevManifestFileSize uint64
531 2 : var newManifestVirtualBackings []*fileBacking
532 2 : if requireRotation {
533 2 : newManifestFileNum = vs.getNextDiskFileNum()
534 2 : prevManifestFileSize = uint64(vs.manifest.Size())
535 2 :
536 2 : // We want the virtual backings *before* applying the version edit, because
537 2 : // the new manifest will contain the pre-apply version plus the last version
538 2 : // edit.
539 2 : newManifestVirtualBackings = vs.virtualBackings.Backings()
540 2 : }
541 :
542 : // Grab certain values before releasing vs.mu, in case createManifest() needs
543 : // to be called.
544 2 : minUnflushedLogNum := vs.minUnflushedLogNum
545 2 : nextFileNum := vs.nextFileNum.Load()
546 2 :
547 2 : // Note: this call populates ve.RemovedBackingTables.
548 2 : zombieBackings, removedVirtualBackings, localLiveSizeDelta :=
549 2 : getZombiesAndUpdateVirtualBackings(ve, &vs.virtualBackings, vs.provider)
550 2 :
551 2 : if err := func() error {
552 2 : vs.mu.Unlock()
553 2 : defer vs.mu.Lock()
554 2 :
555 2 : if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
556 0 : return base.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
557 0 : }
558 2 : var b bulkVersionEdit
559 2 : err := b.Accumulate(ve)
560 2 : if err != nil {
561 0 : return errors.Wrap(err, "MANIFEST accumulate failed")
562 0 : }
563 2 : newVersion, err = b.Apply(
564 2 : currentVersion, vs.cmp, vs.opts.FlushSplitBytes, vs.opts.Experimental.ReadCompactionRate,
565 2 : )
566 2 : if err != nil {
567 0 : return errors.Wrap(err, "MANIFEST apply failed")
568 0 : }
569 :
570 2 : if newManifestFileNum != 0 {
571 2 : if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum, newManifestVirtualBackings); err != nil {
572 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
573 1 : JobID: int(jobID),
574 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
575 1 : FileNum: newManifestFileNum,
576 1 : Err: err,
577 1 : })
578 1 : return errors.Wrap(err, "MANIFEST create failed")
579 1 : }
580 : }
581 :
582 2 : w, err := vs.manifest.Next()
583 2 : if err != nil {
584 0 : return errors.Wrap(err, "MANIFEST next record write failed")
585 0 : }
586 :
587 : // NB: Any error from this point on is considered fatal as we don't know if
588 : // the MANIFEST write occurred or not. Trying to determine that is
589 : // fraught. Instead we rely on the standard recovery mechanism run when a
590 : // database is open. In particular, that mechanism generates a new MANIFEST
591 : // and ensures it is synced.
592 2 : if err := ve.Encode(w); err != nil {
593 0 : return errors.Wrap(err, "MANIFEST write failed")
594 0 : }
595 2 : if err := vs.manifest.Flush(); err != nil {
596 1 : return errors.Wrap(err, "MANIFEST flush failed")
597 1 : }
598 2 : if err := vs.manifestFile.Sync(); err != nil {
599 1 : return errors.Wrap(err, "MANIFEST sync failed")
600 1 : }
601 2 : if newManifestFileNum != 0 {
602 2 : // NB: Move() is responsible for syncing the data directory.
603 2 : if err := vs.manifestMarker.Move(base.MakeFilename(fileTypeManifest, newManifestFileNum)); err != nil {
604 0 : return errors.Wrap(err, "MANIFEST set current failed")
605 0 : }
606 2 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
607 2 : JobID: int(jobID),
608 2 : Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
609 2 : FileNum: newManifestFileNum,
610 2 : })
611 : }
612 2 : return nil
613 1 : }(); err != nil {
614 1 : // Any error encountered during any of the operations in the previous
615 1 : // closure are considered fatal. Treating such errors as fatal is preferred
616 1 : // to attempting to unwind various file and b-tree reference counts, and
617 1 : // re-generating L0 sublevel metadata. This may change in the future, if
618 1 : // certain manifest / WAL operations become retryable. For more context, see
619 1 : // #1159 and #1792.
620 1 : vs.opts.Logger.Fatalf("%s", err)
621 1 : return err
622 1 : }
623 :
624 2 : if requireRotation {
625 2 : // Successfully rotated.
626 2 : vs.rotationHelper.Rotate(nextSnapshotFilecount)
627 2 : }
628 : // Now that DB.mu is held again, initialize compacting file info in
629 : // L0Sublevels.
630 2 : inProgress := inProgressCompactions()
631 2 :
632 2 : newVersion.L0Sublevels.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
633 2 :
634 2 : // Update the zombie tables set first, as installation of the new version
635 2 : // will unref the previous version which could result in addObsoleteLocked
636 2 : // being called.
637 2 : for _, b := range zombieBackings {
638 2 : vs.zombieTables[b.backing.DiskFileNum] = tableInfo{
639 2 : fileInfo: fileInfo{
640 2 : FileNum: b.backing.DiskFileNum,
641 2 : FileSize: b.backing.Size,
642 2 : },
643 2 : isLocal: b.isLocal,
644 2 : }
645 2 : }
646 :
647 : // Unref the removed backings and report those that already became obsolete.
648 : // Note that the only case where we report obsolete tables here is when
649 : // VirtualBackings.Protect/Unprotect was used to keep a backing alive without
650 : // it being used in the current version.
651 2 : var obsoleteVirtualBackings []*fileBacking
652 2 : for _, b := range removedVirtualBackings {
653 2 : if b.backing.Unref() == 0 {
654 2 : obsoleteVirtualBackings = append(obsoleteVirtualBackings, b.backing)
655 2 : }
656 : }
657 2 : vs.addObsoleteLocked(obsoleteVirtualBackings)
658 2 :
659 2 : // Install the new version.
660 2 : vs.append(newVersion)
661 2 :
662 2 : if ve.MinUnflushedLogNum != 0 {
663 2 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
664 2 : }
665 2 : if newManifestFileNum != 0 {
666 2 : if vs.manifestFileNum != 0 {
667 2 : vs.obsoleteManifests = append(vs.obsoleteManifests, fileInfo{
668 2 : FileNum: vs.manifestFileNum,
669 2 : FileSize: prevManifestFileSize,
670 2 : })
671 2 : }
672 2 : vs.manifestFileNum = newManifestFileNum
673 : }
674 :
675 2 : for level, update := range metrics {
676 2 : vs.metrics.Levels[level].Add(update)
677 2 : }
678 2 : for i := range vs.metrics.Levels {
679 2 : l := &vs.metrics.Levels[i]
680 2 : l.NumFiles = int64(newVersion.Levels[i].Len())
681 2 : l.NumVirtualFiles = newVersion.Levels[i].NumVirtual
682 2 : l.VirtualSize = newVersion.Levels[i].VirtualSize
683 2 : l.Size = int64(newVersion.Levels[i].Size())
684 2 :
685 2 : l.Sublevels = 0
686 2 : if l.NumFiles > 0 {
687 2 : l.Sublevels = 1
688 2 : }
689 2 : if invariants.Enabled {
690 2 : levelFiles := newVersion.Levels[i].Slice()
691 2 : if size := int64(levelFiles.SizeSum()); l.Size != size {
692 0 : vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.Size, size)
693 0 : }
694 2 : if nVirtual := levelFiles.NumVirtual(); nVirtual != l.NumVirtualFiles {
695 0 : vs.opts.Logger.Fatalf(
696 0 : "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
697 0 : i, l.NumVirtualFiles, nVirtual,
698 0 : )
699 0 : }
700 2 : if vSize := levelFiles.VirtualSizeSum(); vSize != l.VirtualSize {
701 0 : vs.opts.Logger.Fatalf(
702 0 : "versionSet metrics L%d Virtual size = %d, actual size = %d",
703 0 : i, l.VirtualSize, vSize,
704 0 : )
705 0 : }
706 : }
707 : }
708 2 : vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
709 2 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localLiveSizeDelta)
710 2 :
711 2 : vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, inProgress)
712 2 : if !vs.dynamicBaseLevel {
713 1 : vs.picker.forceBaseLevel1()
714 1 : }
715 2 : return nil
716 : }
717 :
718 : type fileBackingInfo struct {
719 : backing *fileBacking
720 : isLocal bool
721 : }
722 :
723 : // getZombiesAndUpdateVirtualBackings updates the virtual backings with the
724 : // changes in the versionEdit and populates ve.RemovedBackingTables.
725 : // Returns:
726 : // - zombieBackings: all backings (physical and virtual) that will no longer be
727 : // needed when we apply ve.
728 : // - removedVirtualBackings: the virtual backings that will be removed by the
729 : // VersionEdit and which must be Unref()ed by the caller. These backings
730 : // match ve.RemovedBackingTables.
731 : // - localLiveSizeDelta: the delta in local live bytes.
732 : func getZombiesAndUpdateVirtualBackings(
733 : ve *versionEdit, virtualBackings *manifest.VirtualBackings, provider objstorage.Provider,
734 2 : ) (zombieBackings, removedVirtualBackings []fileBackingInfo, localLiveSizeDelta int64) {
735 2 : // First, deal with the physical tables.
736 2 : //
737 2 : // A physical backing has become unused if it is in DeletedFiles but not in
738 2 : // NewFiles or CreatedBackingTables.
739 2 : //
740 2 : // Note that for the common case where there are very few elements, the map
741 2 : // will stay on the stack.
742 2 : stillUsed := make(map[base.DiskFileNum]struct{})
743 2 : for _, nf := range ve.NewFiles {
744 2 : if !nf.Meta.Virtual {
745 2 : stillUsed[nf.Meta.FileBacking.DiskFileNum] = struct{}{}
746 2 : _, localFileDelta := sizeIfLocal(nf.Meta.FileBacking, provider)
747 2 : localLiveSizeDelta += localFileDelta
748 2 : }
749 : }
750 2 : for _, b := range ve.CreatedBackingTables {
751 2 : stillUsed[b.DiskFileNum] = struct{}{}
752 2 : }
753 2 : for _, m := range ve.DeletedFiles {
754 2 : if !m.Virtual {
755 2 : // NB: this deleted file may also be in NewFiles or
756 2 : // CreatedBackingTables, due to a file moving between levels, or
757 2 : // becoming virtualized. In which case there is no change due to this
758 2 : // file in the localLiveSizeDelta -- the subtraction below compensates
759 2 : // for the addition.
760 2 : isLocal, localFileDelta := sizeIfLocal(m.FileBacking, provider)
761 2 : localLiveSizeDelta -= localFileDelta
762 2 : if _, ok := stillUsed[m.FileBacking.DiskFileNum]; !ok {
763 2 : zombieBackings = append(zombieBackings, fileBackingInfo{
764 2 : backing: m.FileBacking,
765 2 : isLocal: isLocal,
766 2 : })
767 2 : }
768 : }
769 : }
770 :
771 : // Now deal with virtual tables.
772 : //
773 : // When a virtual table moves between levels we AddTable() then RemoveTable(),
774 : // which works out.
775 2 : for _, b := range ve.CreatedBackingTables {
776 2 : virtualBackings.AddAndRef(b)
777 2 : _, localFileDelta := sizeIfLocal(b, provider)
778 2 : localLiveSizeDelta += localFileDelta
779 2 : }
780 2 : for _, nf := range ve.NewFiles {
781 2 : if nf.Meta.Virtual {
782 2 : virtualBackings.AddTable(nf.Meta)
783 2 : }
784 : }
785 2 : for _, m := range ve.DeletedFiles {
786 2 : if m.Virtual {
787 2 : virtualBackings.RemoveTable(m)
788 2 : }
789 : }
790 :
791 2 : if unused := virtualBackings.Unused(); len(unused) > 0 {
792 2 : // Virtual backings that are no longer used are zombies and are also added
793 2 : // to RemovedBackingTables (before the version edit is written to disk).
794 2 : ve.RemovedBackingTables = make([]base.DiskFileNum, len(unused))
795 2 : for i, b := range unused {
796 2 : isLocal, localFileDelta := sizeIfLocal(b, provider)
797 2 : localLiveSizeDelta -= localFileDelta
798 2 : ve.RemovedBackingTables[i] = b.DiskFileNum
799 2 : zombieBackings = append(zombieBackings, fileBackingInfo{
800 2 : backing: b,
801 2 : isLocal: isLocal,
802 2 : })
803 2 : virtualBackings.Remove(b.DiskFileNum)
804 2 : }
805 2 : removedVirtualBackings = zombieBackings[len(zombieBackings)-len(unused):]
806 : }
807 2 : return zombieBackings, removedVirtualBackings, localLiveSizeDelta
808 : }
809 :
810 : // sizeIfLocal returns backing.Size if the backing is a local file, else 0.
811 : func sizeIfLocal(
812 : backing *fileBacking, provider objstorage.Provider,
813 2 : ) (isLocal bool, localSize int64) {
814 2 : isLocal = objstorage.IsLocalTable(provider, backing.DiskFileNum)
815 2 : if isLocal {
816 2 : return true, int64(backing.Size)
817 2 : }
818 2 : return false, 0
819 : }
820 :
821 : func (vs *versionSet) incrementCompactions(
822 : kind compactionKind, extraLevels []*compactionLevel, pickerMetrics compactionPickerMetrics,
823 2 : ) {
824 2 : switch kind {
825 2 : case compactionKindDefault:
826 2 : vs.metrics.Compact.Count++
827 2 : vs.metrics.Compact.DefaultCount++
828 :
829 2 : case compactionKindFlush, compactionKindIngestedFlushable:
830 2 : vs.metrics.Flush.Count++
831 :
832 2 : case compactionKindMove:
833 2 : vs.metrics.Compact.Count++
834 2 : vs.metrics.Compact.MoveCount++
835 :
836 2 : case compactionKindDeleteOnly:
837 2 : vs.metrics.Compact.Count++
838 2 : vs.metrics.Compact.DeleteOnlyCount++
839 :
840 2 : case compactionKindElisionOnly:
841 2 : vs.metrics.Compact.Count++
842 2 : vs.metrics.Compact.ElisionOnlyCount++
843 :
844 1 : case compactionKindRead:
845 1 : vs.metrics.Compact.Count++
846 1 : vs.metrics.Compact.ReadCount++
847 :
848 1 : case compactionKindTombstoneDensity:
849 1 : vs.metrics.Compact.Count++
850 1 : vs.metrics.Compact.TombstoneDensityCount++
851 :
852 2 : case compactionKindRewrite:
853 2 : vs.metrics.Compact.Count++
854 2 : vs.metrics.Compact.RewriteCount++
855 :
856 2 : case compactionKindCopy:
857 2 : vs.metrics.Compact.Count++
858 2 : vs.metrics.Compact.CopyCount++
859 :
860 0 : default:
861 0 : if invariants.Enabled {
862 0 : panic("unhandled compaction kind")
863 : }
864 : }
865 2 : if len(extraLevels) > 0 {
866 2 : vs.metrics.Compact.MultiLevelCount++
867 2 : }
868 : }
869 :
870 2 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
871 2 : vs.atomicInProgressBytes.Add(numBytes)
872 2 : }
873 :
874 : // createManifest creates a manifest file that contains a snapshot of vs.
875 : func (vs *versionSet) createManifest(
876 : dirname string,
877 : fileNum, minUnflushedLogNum base.DiskFileNum,
878 : nextFileNum uint64,
879 : virtualBackings []*fileBacking,
880 2 : ) (err error) {
881 2 : var (
882 2 : filename = base.MakeFilepath(vs.fs, dirname, fileTypeManifest, fileNum)
883 2 : manifestFile vfs.File
884 2 : manifest *record.Writer
885 2 : )
886 2 : defer func() {
887 2 : if manifest != nil {
888 0 : manifest.Close()
889 0 : }
890 2 : if manifestFile != nil {
891 0 : manifestFile.Close()
892 0 : }
893 2 : if err != nil {
894 1 : vs.fs.Remove(filename)
895 1 : }
896 : }()
897 2 : manifestFile, err = vs.fs.Create(filename, "pebble-manifest")
898 2 : if err != nil {
899 1 : return err
900 1 : }
901 2 : manifest = record.NewWriter(manifestFile)
902 2 :
903 2 : snapshot := versionEdit{
904 2 : ComparerName: vs.cmp.Name,
905 2 : }
906 2 :
907 2 : for level, levelMetadata := range vs.currentVersion().Levels {
908 2 : iter := levelMetadata.Iter()
909 2 : for meta := iter.First(); meta != nil; meta = iter.Next() {
910 2 : snapshot.NewFiles = append(snapshot.NewFiles, newFileEntry{
911 2 : Level: level,
912 2 : Meta: meta,
913 2 : })
914 2 : }
915 : }
916 :
917 2 : snapshot.CreatedBackingTables = virtualBackings
918 2 :
919 2 : // When creating a version snapshot for an existing DB, this snapshot VersionEdit will be
920 2 : // immediately followed by another VersionEdit (being written in logAndApply()). That
921 2 : // VersionEdit always contains a LastSeqNum, so we don't need to include that in the snapshot.
922 2 : // But it does not necessarily include MinUnflushedLogNum, NextFileNum, so we initialize those
923 2 : // using the corresponding fields in the versionSet (which came from the latest preceding
924 2 : // VersionEdit that had those fields).
925 2 : snapshot.MinUnflushedLogNum = minUnflushedLogNum
926 2 : snapshot.NextFileNum = nextFileNum
927 2 :
928 2 : w, err1 := manifest.Next()
929 2 : if err1 != nil {
930 0 : return err1
931 0 : }
932 2 : if err := snapshot.Encode(w); err != nil {
933 0 : return err
934 0 : }
935 :
936 2 : if vs.manifest != nil {
937 2 : vs.manifest.Close()
938 2 : vs.manifest = nil
939 2 : }
940 2 : if vs.manifestFile != nil {
941 2 : if err := vs.manifestFile.Close(); err != nil {
942 0 : return err
943 0 : }
944 2 : vs.manifestFile = nil
945 : }
946 :
947 2 : vs.manifest, manifest = manifest, nil
948 2 : vs.manifestFile, manifestFile = manifestFile, nil
949 2 : return nil
950 : }
951 :
952 : // NB: This method is not safe for concurrent use. It is only safe
953 : // to be called when concurrent changes to nextFileNum are not expected.
954 2 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
955 2 : if vs.nextFileNum.Load() <= uint64(fileNum) {
956 2 : vs.nextFileNum.Store(uint64(fileNum + 1))
957 2 : }
958 : }
959 :
960 2 : func (vs *versionSet) getNextFileNum() base.FileNum {
961 2 : x := vs.nextFileNum.Add(1) - 1
962 2 : return base.FileNum(x)
963 2 : }
964 :
965 2 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
966 2 : x := vs.nextFileNum.Add(1) - 1
967 2 : return base.DiskFileNum(x)
968 2 : }
969 :
970 2 : func (vs *versionSet) append(v *version) {
971 2 : if v.Refs() != 0 {
972 0 : panic("pebble: version should be unreferenced")
973 : }
974 2 : if !vs.versions.Empty() {
975 2 : vs.versions.Back().UnrefLocked()
976 2 : }
977 2 : v.Deleted = vs.obsoleteFn
978 2 : v.Ref()
979 2 : vs.versions.PushBack(v)
980 2 : if invariants.Enabled {
981 2 : // Verify that the virtualBackings contains all the backings referenced by
982 2 : // the version.
983 2 : for _, l := range v.Levels {
984 2 : iter := l.Iter()
985 2 : for f := iter.First(); f != nil; f = iter.Next() {
986 2 : if f.Virtual {
987 2 : if _, ok := vs.virtualBackings.Get(f.FileBacking.DiskFileNum); !ok {
988 0 : panic(fmt.Sprintf("%s is not in virtualBackings", f.FileBacking.DiskFileNum))
989 : }
990 : }
991 : }
992 : }
993 : }
994 : }
995 :
996 2 : func (vs *versionSet) currentVersion() *version {
997 2 : return vs.versions.Back()
998 2 : }
999 :
1000 2 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
1001 2 : current := vs.currentVersion()
1002 2 : for v := vs.versions.Front(); true; v = v.Next() {
1003 2 : for _, lm := range v.Levels {
1004 2 : iter := lm.Iter()
1005 2 : for f := iter.First(); f != nil; f = iter.Next() {
1006 2 : m[f.FileBacking.DiskFileNum] = struct{}{}
1007 2 : }
1008 : }
1009 2 : if v == current {
1010 2 : break
1011 : }
1012 : }
1013 : // virtualBackings contains backings that are referenced by some virtual
1014 : // tables in the latest version (which are handled above), and backings that
1015 : // are not but are still alive because of the protection mechanism (see
1016 : // manifset.VirtualBackings). This loop ensures the latter get added to the
1017 : // map.
1018 2 : vs.virtualBackings.ForEach(func(b *fileBacking) {
1019 2 : m[b.DiskFileNum] = struct{}{}
1020 2 : })
1021 : }
1022 :
1023 : // addObsoleteLocked will add the fileInfo associated with obsolete backing
1024 : // sstables to the obsolete tables list.
1025 : //
1026 : // The file backings in the obsolete list must not appear more than once.
1027 : //
1028 : // DB.mu must be held when addObsoleteLocked is called.
1029 2 : func (vs *versionSet) addObsoleteLocked(obsolete []*fileBacking) {
1030 2 : if len(obsolete) == 0 {
1031 2 : return
1032 2 : }
1033 :
1034 2 : obsoleteFileInfo := make([]tableInfo, len(obsolete))
1035 2 : for i, bs := range obsolete {
1036 2 : obsoleteFileInfo[i].FileNum = bs.DiskFileNum
1037 2 : obsoleteFileInfo[i].FileSize = bs.Size
1038 2 : }
1039 :
1040 2 : if invariants.Enabled {
1041 2 : dedup := make(map[base.DiskFileNum]struct{})
1042 2 : for _, fi := range obsoleteFileInfo {
1043 2 : dedup[fi.FileNum] = struct{}{}
1044 2 : }
1045 2 : if len(dedup) != len(obsoleteFileInfo) {
1046 0 : panic("pebble: duplicate FileBacking present in obsolete list")
1047 : }
1048 : }
1049 :
1050 2 : for i, fi := range obsoleteFileInfo {
1051 2 : // Note that the obsolete tables are no longer zombie by the definition of
1052 2 : // zombie, but we leave them in the zombie tables map until they are
1053 2 : // deleted from disk.
1054 2 : //
1055 2 : // TODO(sumeer): this means that the zombie metrics, like ZombieSize,
1056 2 : // computed in DB.Metrics are also being counted in the obsolete metrics.
1057 2 : // Was this intentional?
1058 2 : info, ok := vs.zombieTables[fi.FileNum]
1059 2 : if !ok {
1060 0 : vs.opts.Logger.Fatalf("MANIFEST obsolete table %s not marked as zombie", fi.FileNum)
1061 0 : }
1062 2 : obsoleteFileInfo[i].isLocal = info.isLocal
1063 : }
1064 :
1065 2 : vs.obsoleteTables = append(vs.obsoleteTables, obsoleteFileInfo...)
1066 2 : vs.updateObsoleteTableMetricsLocked()
1067 : }
1068 :
1069 : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
1070 : // called.
1071 2 : func (vs *versionSet) addObsolete(obsolete []*fileBacking) {
1072 2 : vs.mu.Lock()
1073 2 : defer vs.mu.Unlock()
1074 2 : vs.addObsoleteLocked(obsolete)
1075 2 : }
1076 :
1077 2 : func (vs *versionSet) updateObsoleteTableMetricsLocked() {
1078 2 : vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
1079 2 : vs.metrics.Table.ObsoleteSize = 0
1080 2 : vs.metrics.Table.Local.ObsoleteSize = 0
1081 2 : for _, fi := range vs.obsoleteTables {
1082 2 : vs.metrics.Table.ObsoleteSize += fi.FileSize
1083 2 : if fi.isLocal {
1084 2 : vs.metrics.Table.Local.ObsoleteSize += fi.FileSize
1085 2 : }
1086 : }
1087 : }
1088 :
1089 : func findCurrentManifest(
1090 : fs vfs.FS, dirname string, ls []string,
1091 2 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
1092 2 : // Locating a marker should succeed even if the marker has never been placed.
1093 2 : var filename string
1094 2 : marker, filename, err = atomicfs.LocateMarkerInListing(fs, dirname, manifestMarkerName, ls)
1095 2 : if err != nil {
1096 1 : return nil, 0, false, err
1097 1 : }
1098 :
1099 2 : if filename == "" {
1100 2 : // The marker hasn't been set yet. This database doesn't exist.
1101 2 : return marker, 0, false, nil
1102 2 : }
1103 :
1104 2 : var ok bool
1105 2 : _, manifestNum, ok = base.ParseFilename(fs, filename)
1106 2 : if !ok {
1107 0 : return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
1108 0 : }
1109 2 : return marker, manifestNum, true, nil
1110 : }
1111 :
1112 1 : func newFileMetrics(newFiles []manifest.NewFileEntry) map[int]*LevelMetrics {
1113 1 : m := map[int]*LevelMetrics{}
1114 1 : for _, nf := range newFiles {
1115 1 : lm := m[nf.Level]
1116 1 : if lm == nil {
1117 1 : lm = &LevelMetrics{}
1118 1 : m[nf.Level] = lm
1119 1 : }
1120 1 : lm.NumFiles++
1121 1 : lm.Size += int64(nf.Meta.Size)
1122 : }
1123 1 : return m
1124 : }
|