Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "slices"
10 : "sync"
11 : "sync/atomic"
12 : "time"
13 :
14 : "github.com/cockroachdb/crlib/crtime"
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/invariants"
18 : "github.com/cockroachdb/pebble/internal/manifest"
19 : "github.com/cockroachdb/pebble/objstorage"
20 : "github.com/cockroachdb/pebble/record"
21 : "github.com/cockroachdb/pebble/vfs"
22 : "github.com/cockroachdb/pebble/vfs/atomicfs"
23 : )
24 :
25 : const numLevels = manifest.NumLevels
26 :
27 : const manifestMarkerName = `manifest`
28 :
29 : // versionSet manages a collection of immutable versions, and manages the
30 : // creation of a new version from the most recent version. A new version is
31 : // created from an existing version by applying a version edit which is just
32 : // like it sounds: a delta from the previous version. Version edits are logged
33 : // to the MANIFEST file, which is replayed at startup.
34 : type versionSet struct {
35 : // Next seqNum to use for WAL writes.
36 : logSeqNum base.AtomicSeqNum
37 :
38 : // The upper bound on sequence numbers that have been assigned so far. A
39 : // suffix of these sequence numbers may not have been written to a WAL. Both
40 : // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
41 : // visibleSeqNum is <= logSeqNum.
42 : visibleSeqNum base.AtomicSeqNum
43 :
44 : // Number of bytes present in sstables being written by in-progress
45 : // compactions. This value will be zero if there are no in-progress
46 : // compactions. Updated and read atomically.
47 : atomicInProgressBytes atomic.Int64
48 :
49 : // Immutable fields.
50 : dirname string
51 : provider objstorage.Provider
52 : // Set to DB.mu.
53 : mu *sync.Mutex
54 : opts *Options
55 : fs vfs.FS
56 : cmp *base.Comparer
57 : // Dynamic base level allows the dynamic base level computation to be
58 : // disabled. Used by tests which want to create specific LSM structures.
59 : dynamicBaseLevel bool
60 :
61 : // Mutable fields.
62 : versions manifest.VersionList
63 : latest *latestVersionState
64 : picker compactionPicker
65 : // curCompactionConcurrency is updated whenever picker is updated.
66 : // INVARIANT: >= 1.
67 : curCompactionConcurrency atomic.Int32
68 :
69 : // Not all metrics are kept here. See DB.Metrics().
70 : metrics Metrics
71 :
72 : // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
73 : // on the creation of every version.
74 : obsoleteFn func(manifest.ObsoleteFiles)
75 : // obsolete{Tables,Blobs,Manifests,Options} are sorted by file number ascending.
76 : obsoleteTables []obsoleteFile
77 : obsoleteBlobs []obsoleteFile
78 : obsoleteManifests []obsoleteFile
79 : obsoleteOptions []obsoleteFile
80 :
81 : // Zombie tables which have been removed from the current version but are
82 : // still referenced by an inuse iterator.
83 : zombieTables zombieObjects
84 : // Zombie blobs which have been removed from the current version but are
85 : // still referenced by an inuse iterator.
86 : zombieBlobs zombieObjects
87 :
88 : // minUnflushedLogNum is the smallest WAL log file number corresponding to
89 : // mutations that have not been flushed to an sstable.
90 : minUnflushedLogNum base.DiskFileNum
91 :
92 : // The next file number. A single counter is used to assign file
93 : // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
94 : nextFileNum atomic.Uint64
95 :
96 : // The current manifest file number.
97 : manifestFileNum base.DiskFileNum
98 : manifestMarker *atomicfs.Marker
99 :
100 : manifestFile vfs.File
101 : manifest *record.Writer
102 : getFormatMajorVersion func() FormatMajorVersion
103 :
104 : writing bool
105 : writerCond sync.Cond
106 : // State for deciding when to write a snapshot. Protected by mu.
107 : rotationHelper record.RotationHelper
108 :
109 : pickedCompactionCache pickedCompactionCache
110 : }
111 :
112 : // latestVersionState maintains mutable state describing only the most recent
113 : // version. Unlike a *Version, the state within this struct is updated as new
114 : // version edits are applied.
115 : type latestVersionState struct {
116 : l0Organizer *manifest.L0Organizer
117 : // blobFiles is the set of blob files referenced by the current version.
118 : // blobFiles is protected by the manifest logLock (not vs.mu).
119 : blobFiles manifest.CurrentBlobFileSet
120 : // virtualBackings contains information about the FileBackings which support
121 : // virtual sstables in the latest version. It is mainly used to determine when
122 : // a backing is no longer in use by the tables in the latest version; this is
123 : // not a trivial problem because compactions and other operations can happen
124 : // in parallel (and they can finish in unpredictable order).
125 : //
126 : // This mechanism is complementary to the backing Ref/Unref mechanism, which
127 : // determines when a backing is no longer used by *any* live version and can
128 : // be removed.
129 : //
130 : // In principle this should have been a copy-on-write structure, with each
131 : // Version having its own record of its virtual backings (similar to the
132 : // B-tree that holds the tables). However, in practice we only need it for the
133 : // latest version, so we use a simpler structure and store it in the
134 : // versionSet instead.
135 : //
136 : // virtualBackings is modified under DB.mu and the log lock. If it is accessed
137 : // under DB.mu and a version update is in progress, it reflects the state of
138 : // the next version.
139 : virtualBackings manifest.VirtualBackings
140 : }
141 :
142 : func (vs *versionSet) init(
143 : dirname string,
144 : provider objstorage.Provider,
145 : opts *Options,
146 : marker *atomicfs.Marker,
147 : getFMV func() FormatMajorVersion,
148 : mu *sync.Mutex,
149 1 : ) {
150 1 : vs.dirname = dirname
151 1 : vs.provider = provider
152 1 : vs.mu = mu
153 1 : vs.writerCond.L = mu
154 1 : vs.opts = opts
155 1 : vs.fs = opts.FS
156 1 : vs.cmp = opts.Comparer
157 1 : vs.dynamicBaseLevel = true
158 1 : vs.versions.Init(mu)
159 1 : vs.obsoleteFn = vs.addObsoleteLocked
160 1 : vs.zombieTables = makeZombieObjects()
161 1 : vs.zombieBlobs = makeZombieObjects()
162 1 : vs.nextFileNum.Store(1)
163 1 : vs.logSeqNum.Store(base.SeqNumStart)
164 1 : vs.manifestMarker = marker
165 1 : vs.getFormatMajorVersion = getFMV
166 1 : }
167 :
168 : // initNewDB creates a version set for a fresh database, persisting an initial
169 : // manifest file.
170 : func (vs *versionSet) initNewDB(
171 : jobID JobID,
172 : dirname string,
173 : provider objstorage.Provider,
174 : opts *Options,
175 : marker *atomicfs.Marker,
176 : getFormatMajorVersion func() FormatMajorVersion,
177 : blobRewriteHeuristic manifest.BlobRewriteHeuristic,
178 : mu *sync.Mutex,
179 1 : ) error {
180 1 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
181 1 : vs.latest = &latestVersionState{
182 1 : l0Organizer: manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes),
183 1 : virtualBackings: manifest.MakeVirtualBackings(),
184 1 : }
185 1 : vs.latest.blobFiles.Init(nil, blobRewriteHeuristic)
186 1 : emptyVersion := manifest.NewInitialVersion(opts.Comparer)
187 1 : vs.append(emptyVersion)
188 1 :
189 1 : vs.setCompactionPicker(
190 1 : newCompactionPickerByScore(emptyVersion, vs.latest, vs.opts, nil))
191 1 : // Note that a "snapshot" version edit is written to the manifest when it is
192 1 : // created.
193 1 : vs.manifestFileNum = vs.getNextDiskFileNum()
194 1 : err := vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum.Load(), nil /* virtualBackings */)
195 1 : if err == nil {
196 1 : if err = vs.manifest.Flush(); err != nil {
197 1 : vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
198 1 : }
199 : }
200 1 : if err == nil {
201 1 : if err = vs.manifestFile.Sync(); err != nil {
202 1 : vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
203 1 : }
204 : }
205 1 : if err == nil {
206 1 : // NB: Move() is responsible for syncing the data directory.
207 1 : if err = vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, vs.manifestFileNum)); err != nil {
208 1 : vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
209 1 : }
210 : }
211 :
212 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
213 1 : JobID: int(jobID),
214 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
215 1 : FileNum: vs.manifestFileNum,
216 1 : Err: err,
217 1 : })
218 1 : if err != nil {
219 1 : return err
220 1 : }
221 1 : return nil
222 : }
223 :
224 : // initRecoveredDB initializes the version set from the existing database state
225 : // recovered from an existing manifest file.
226 : func (vs *versionSet) initRecoveredDB(
227 : dirname string,
228 : provider objstorage.Provider,
229 : opts *Options,
230 : rv *recoveredVersion,
231 : marker *atomicfs.Marker,
232 : getFormatMajorVersion func() FormatMajorVersion,
233 : mu *sync.Mutex,
234 1 : ) error {
235 1 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
236 1 : vs.manifestFileNum = rv.manifestFileNum
237 1 : vs.minUnflushedLogNum = rv.minUnflushedLogNum
238 1 : vs.nextFileNum.Store(uint64(rv.nextFileNum))
239 1 : vs.logSeqNum.Store(rv.logSeqNum)
240 1 : vs.latest = rv.latest
241 1 : vs.metrics = rv.metrics
242 1 : vs.append(rv.version)
243 1 : vs.setCompactionPicker(newCompactionPickerByScore(
244 1 : rv.version, vs.latest, vs.opts, nil))
245 1 : return nil
246 1 : }
247 :
248 1 : func (vs *versionSet) close() error {
249 1 : if vs.manifestFile != nil {
250 1 : if err := vs.manifestFile.Close(); err != nil {
251 0 : return err
252 0 : }
253 : }
254 1 : if vs.manifestMarker != nil {
255 1 : if err := vs.manifestMarker.Close(); err != nil {
256 0 : return err
257 0 : }
258 : }
259 1 : return nil
260 : }
261 :
262 : // logLock locks the manifest for writing. The lock must be released by
263 : // a call to logUnlock.
264 : //
265 : // DB.mu must be held when calling this method, but the mutex may be dropped and
266 : // re-acquired during the course of this method.
267 1 : func (vs *versionSet) logLock() {
268 1 : // Wait for any existing writing to the manifest to complete, then mark the
269 1 : // manifest as busy.
270 1 : for vs.writing {
271 1 : // Note: writerCond.L is DB.mu, so we unlock it while we wait.
272 1 : vs.writerCond.Wait()
273 1 : }
274 1 : vs.writing = true
275 : }
276 :
277 : // logUnlock releases the lock for manifest writing.
278 : //
279 : // DB.mu must be held when calling this method.
280 1 : func (vs *versionSet) logUnlock() {
281 1 : if !vs.writing {
282 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
283 0 : }
284 1 : vs.writing = false
285 1 : vs.writerCond.Signal()
286 : }
287 :
288 1 : func (vs *versionSet) logUnlockAndInvalidatePickedCompactionCache() {
289 1 : vs.pickedCompactionCache.invalidate()
290 1 : vs.logUnlock()
291 1 : }
292 :
293 : // versionUpdate is returned by the function passed to UpdateVersionLocked.
294 : //
295 : // If VE is nil, there is no update to apply (but it is not an error).
296 : type versionUpdate struct {
297 : VE *manifest.VersionEdit
298 : JobID JobID
299 : Metrics levelMetricsDelta
300 : // InProgressCompactionFn is called while DB.mu is held after the I/O part of
301 : // the update was performed. It should return any compactions that are
302 : // in-progress (excluding than the one that is being applied).
303 : InProgressCompactionsFn func() []compactionInfo
304 : ForceManifestRotation bool
305 : }
306 :
307 : // UpdateVersionLocked is used to update the current version, returning the
308 : // duration of the manifest update. If the manifest update is unsuccessful, the
309 : // duration will be unset (0).
310 : //
311 : // DB.mu must be held. UpdateVersionLocked first waits for any other version
312 : // update to complete, releasing and reacquiring DB.mu.
313 : //
314 : // UpdateVersionLocked then calls updateFn which builds a versionUpdate, while
315 : // holding DB.mu. The updateFn can release and reacquire DB.mu (it should
316 : // attempt to do as much work as possible outside of the lock).
317 : //
318 : // UpdateVersionLocked fills in the following fields of the VersionEdit:
319 : // NextFileNum, LastSeqNum, RemovedBackingTables. The removed backing tables are
320 : // those backings that are no longer used (in the new version) after applying
321 : // the edit (as per vs.virtualBackings). Other than these fields, the
322 : // VersionEdit must be complete.
323 : //
324 : // New table backing references (TableBacking.Ref) are taken as part of applying
325 : // the version edit. The state of the virtual backings (vs.virtualBackings) is
326 : // updated before logging to the manifest and installing the latest version;
327 : // this is ok because any failure in those steps is fatal.
328 : //
329 : // If updateFn returns an error, no update is applied and that same error is returned.
330 : // If versionUpdate.VE is nil, the no update is applied (and no error is returned).
331 : func (vs *versionSet) UpdateVersionLocked(
332 : updateFn func() (versionUpdate, error),
333 1 : ) (time.Duration, error) {
334 1 : vs.logLock()
335 1 : defer vs.logUnlockAndInvalidatePickedCompactionCache()
336 1 :
337 1 : vu, err := updateFn()
338 1 : if err != nil || vu.VE == nil {
339 1 : return 0, err
340 1 : }
341 :
342 1 : if !vs.writing {
343 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
344 0 : }
345 :
346 1 : updateManifestStart := crtime.NowMono()
347 1 : ve := vu.VE
348 1 : if ve.MinUnflushedLogNum != 0 {
349 1 : if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
350 1 : vs.nextFileNum.Load() <= uint64(ve.MinUnflushedLogNum) {
351 0 : panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
352 0 : ve.MinUnflushedLogNum))
353 : }
354 : }
355 :
356 : // This is the next manifest filenum, but if the current file is too big we
357 : // will write this ve to the next file which means what ve encodes is the
358 : // current filenum and not the next one.
359 : //
360 : // TODO(sbhola): figure out why this is correct and update comment.
361 1 : ve.NextFileNum = vs.nextFileNum.Load()
362 1 :
363 1 : // LastSeqNum is set to the current upper bound on the assigned sequence
364 1 : // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
365 1 : // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
366 1 : // replay. It must be higher than or equal to any than any sequence number
367 1 : // written to an sstable, including sequence numbers in ingested files.
368 1 : // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
369 1 : // number. This is fallout from ingestion which allows a sequence number X to
370 1 : // be assigned to an ingested sstable even though sequence number X-1 resides
371 1 : // in an unflushed memtable. logSeqNum is the _next_ sequence number that
372 1 : // will be assigned, so subtract that by 1 to get the upper bound on the
373 1 : // last assigned sequence number.
374 1 : logSeqNum := vs.logSeqNum.Load()
375 1 : ve.LastSeqNum = logSeqNum - 1
376 1 : if logSeqNum == 0 {
377 0 : // logSeqNum is initialized to 1 in Open() if there are no previous WAL
378 0 : // or manifest records, so this case should never happen.
379 0 : vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
380 0 : }
381 :
382 1 : currentVersion := vs.currentVersion()
383 1 : var newVersion *manifest.Version
384 1 :
385 1 : // Generate a new manifest if we don't currently have one, or forceRotation
386 1 : // is true, or the current one is too large.
387 1 : //
388 1 : // For largeness, we do not exclusively use MaxManifestFileSize size
389 1 : // threshold since we have had incidents where due to either large keys or
390 1 : // large numbers of files, each edit results in a snapshot + write of the
391 1 : // edit. This slows the system down since each flush or compaction is
392 1 : // writing a new manifest snapshot. The primary goal of the size-based
393 1 : // rollover logic is to ensure that when reopening a DB, the number of edits
394 1 : // that need to be replayed on top of the snapshot is "sane". Rolling over
395 1 : // to a new manifest after each edit is not relevant to that goal.
396 1 : //
397 1 : // Consider the following cases:
398 1 : // - The number of live files F in the DB is roughly stable: after writing
399 1 : // the snapshot (with F files), say we require that there be enough edits
400 1 : // such that the cumulative number of files in those edits, E, be greater
401 1 : // than F. This will ensure that the total amount of time in
402 1 : // UpdateVersionLocked that is spent in snapshot writing is ~50%.
403 1 : //
404 1 : // - The number of live files F in the DB is shrinking drastically, say from
405 1 : // F to F/10: This can happen for various reasons, like wide range
406 1 : // tombstones, or large numbers of smaller than usual files that are being
407 1 : // merged together into larger files. And say the new files generated
408 1 : // during this shrinkage is insignificant compared to F/10, and so for
409 1 : // this example we will assume it is effectively 0. After this shrinking,
410 1 : // E = 0.9F, and so if we used the previous snapshot file count, F, as the
411 1 : // threshold that needs to be exceeded, we will further delay the snapshot
412 1 : // writing. Which means on DB reopen we will need to replay 0.9F edits to
413 1 : // get to a version with 0.1F files. It would be better to create a new
414 1 : // snapshot when E exceeds the number of files in the current version.
415 1 : //
416 1 : // - The number of live files F in the DB is growing via perfect ingests
417 1 : // into L6: Say we wrote the snapshot when there were F files and now we
418 1 : // have 10F files, so E = 9F. We will further delay writing a new
419 1 : // snapshot. This case can be critiqued as contrived, but we consider it
420 1 : // nonetheless.
421 1 : //
422 1 : // The logic below uses the min of the last snapshot file count and the file
423 1 : // count in the current version.
424 1 : vs.rotationHelper.AddRecord(int64(len(ve.DeletedTables) + len(ve.NewTables)))
425 1 : sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
426 1 : requireRotation := vu.ForceManifestRotation || vs.manifest == nil
427 1 :
428 1 : var nextSnapshotFilecount int64
429 1 : for i := range vs.metrics.Levels {
430 1 : nextSnapshotFilecount += vs.metrics.Levels[i].TablesCount
431 1 : }
432 1 : if sizeExceeded && !requireRotation {
433 1 : requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
434 1 : }
435 1 : var newManifestFileNum base.DiskFileNum
436 1 : var prevManifestFileSize uint64
437 1 : var newManifestVirtualBackings []*manifest.TableBacking
438 1 : if requireRotation {
439 1 : newManifestFileNum = vs.getNextDiskFileNum()
440 1 : prevManifestFileSize = uint64(vs.manifest.Size())
441 1 :
442 1 : // We want the virtual backings *before* applying the version edit, because
443 1 : // the new manifest will contain the pre-apply version plus the last version
444 1 : // edit.
445 1 : newManifestVirtualBackings = slices.Collect(vs.latest.virtualBackings.All())
446 1 : }
447 :
448 : // Grab certain values before releasing vs.mu, in case createManifest() needs
449 : // to be called.
450 1 : minUnflushedLogNum := vs.minUnflushedLogNum
451 1 : nextFileNum := vs.nextFileNum.Load()
452 1 :
453 1 : // Note: this call populates ve.RemovedBackingTables.
454 1 : zombieBackings, removedVirtualBackings, localTablesLiveDelta :=
455 1 : getZombieTablesAndUpdateVirtualBackings(ve, &vs.latest.virtualBackings, vs.provider)
456 1 :
457 1 : var l0Update manifest.L0PreparedUpdate
458 1 : if err := func() error {
459 1 : vs.mu.Unlock()
460 1 : defer vs.mu.Lock()
461 1 :
462 1 : if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
463 0 : return base.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
464 0 : }
465 :
466 : // Rotate the manifest if necessary. Rotating the manifest involves
467 : // creating a new file and writing an initial version edit containing a
468 : // snapshot of the current version. This initial version edit will
469 : // reflect the Version prior to the pending version edit (`ve`). Once
470 : // we've created the new manifest with the previous version state, we'll
471 : // append the version edit `ve` to the tail of the new manifest.
472 1 : if newManifestFileNum != 0 {
473 1 : if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum, newManifestVirtualBackings); err != nil {
474 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
475 1 : JobID: int(vu.JobID),
476 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
477 1 : FileNum: newManifestFileNum,
478 1 : Err: err,
479 1 : })
480 1 : return errors.Wrap(err, "MANIFEST create failed")
481 1 : }
482 : }
483 :
484 : // Call ApplyAndUpdateVersionEdit before accumulating the version edit.
485 : // If any blob files are no longer referenced, the version edit will be
486 : // updated to explicitly record the deletion of the blob files. This can
487 : // happen here because vs.blobFiles is protected by the manifest logLock
488 : // (NOT vs.mu). We only read or write vs.blobFiles while holding the
489 : // manifest lock.
490 1 : if err := vs.latest.blobFiles.ApplyAndUpdateVersionEdit(ve); err != nil {
491 0 : return errors.Wrap(err, "MANIFEST blob files apply and update failed")
492 0 : }
493 :
494 1 : var bulkEdit manifest.BulkVersionEdit
495 1 : err := bulkEdit.Accumulate(ve)
496 1 : if err != nil {
497 0 : return errors.Wrap(err, "MANIFEST accumulate failed")
498 0 : }
499 1 : newVersion, err = bulkEdit.Apply(currentVersion, vs.opts.Experimental.ReadCompactionRate)
500 1 : if err != nil {
501 0 : return errors.Wrap(err, "MANIFEST apply failed")
502 0 : }
503 1 : l0Update = vs.latest.l0Organizer.PrepareUpdate(&bulkEdit, newVersion)
504 1 :
505 1 : w, err := vs.manifest.Next()
506 1 : if err != nil {
507 0 : return errors.Wrap(err, "MANIFEST next record write failed")
508 0 : }
509 :
510 : // NB: Any error from this point on is considered fatal as we don't know if
511 : // the MANIFEST write occurred or not. Trying to determine that is
512 : // fraught. Instead we rely on the standard recovery mechanism run when a
513 : // database is open. In particular, that mechanism generates a new MANIFEST
514 : // and ensures it is synced.
515 1 : if err := ve.Encode(w); err != nil {
516 0 : return errors.Wrap(err, "MANIFEST write failed")
517 0 : }
518 1 : if err := vs.manifest.Flush(); err != nil {
519 1 : return errors.Wrap(err, "MANIFEST flush failed")
520 1 : }
521 1 : if err := vs.manifestFile.Sync(); err != nil {
522 1 : return errors.Wrap(err, "MANIFEST sync failed")
523 1 : }
524 1 : if newManifestFileNum != 0 {
525 1 : // NB: Move() is responsible for syncing the data directory.
526 1 : if err := vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, newManifestFileNum)); err != nil {
527 0 : return errors.Wrap(err, "MANIFEST set current failed")
528 0 : }
529 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
530 1 : JobID: int(vu.JobID),
531 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
532 1 : FileNum: newManifestFileNum,
533 1 : })
534 : }
535 1 : return nil
536 1 : }(); err != nil {
537 1 : // Any error encountered during any of the operations in the previous
538 1 : // closure are considered fatal. Treating such errors as fatal is preferred
539 1 : // to attempting to unwind various file and b-tree reference counts, and
540 1 : // re-generating L0 sublevel metadata. This may change in the future, if
541 1 : // certain manifest / WAL operations become retryable. For more context, see
542 1 : // #1159 and #1792.
543 1 : vs.opts.Logger.Fatalf("%s", err)
544 1 : return 0, err
545 1 : }
546 :
547 1 : if requireRotation {
548 1 : // Successfully rotated.
549 1 : vs.rotationHelper.Rotate(nextSnapshotFilecount)
550 1 : }
551 : // Now that DB.mu is held again, initialize compacting file info in
552 : // L0Sublevels.
553 1 : inProgress := vu.InProgressCompactionsFn()
554 1 :
555 1 : zombieBlobs, localBlobLiveDelta := getZombieBlobFilesAndComputeLocalMetrics(ve, vs.provider)
556 1 : vs.latest.l0Organizer.PerformUpdate(l0Update, newVersion)
557 1 : vs.latest.l0Organizer.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
558 1 :
559 1 : // Update the zombie objects sets first, as installation of the new version
560 1 : // will unref the previous version which could result in addObsoleteLocked
561 1 : // being called.
562 1 : for _, b := range zombieBackings {
563 1 : vs.zombieTables.Add(objectInfo{
564 1 : fileInfo: fileInfo{
565 1 : FileNum: b.backing.DiskFileNum,
566 1 : FileSize: b.backing.Size,
567 1 : },
568 1 : isLocal: b.isLocal,
569 1 : })
570 1 : }
571 1 : for _, zb := range zombieBlobs {
572 1 : vs.zombieBlobs.Add(zb)
573 1 : }
574 : // Unref the removed backings and report those that already became obsolete.
575 : // Note that the only case where we report obsolete tables here is when
576 : // VirtualBackings.Protect/Unprotect was used to keep a backing alive without
577 : // it being used in the current version.
578 1 : var obsoleteVirtualBackings manifest.ObsoleteFiles
579 1 : for _, b := range removedVirtualBackings {
580 1 : if b.backing.Unref() == 0 {
581 1 : obsoleteVirtualBackings.TableBackings = append(obsoleteVirtualBackings.TableBackings, b.backing)
582 1 : }
583 : }
584 1 : vs.addObsoleteLocked(obsoleteVirtualBackings)
585 1 :
586 1 : // Install the new version.
587 1 : vs.append(newVersion)
588 1 :
589 1 : if ve.MinUnflushedLogNum != 0 {
590 1 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
591 1 : }
592 1 : if newManifestFileNum != 0 {
593 1 : if vs.manifestFileNum != 0 {
594 1 : vs.obsoleteManifests = append(vs.obsoleteManifests, obsoleteFile{
595 1 : fileType: base.FileTypeManifest,
596 1 : fs: vs.fs,
597 1 : path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
598 1 : fileNum: vs.manifestFileNum,
599 1 : fileSize: prevManifestFileSize,
600 1 : isLocal: true,
601 1 : })
602 1 : }
603 1 : vs.manifestFileNum = newManifestFileNum
604 : }
605 :
606 1 : vs.metrics.updateLevelMetrics(vu.Metrics)
607 1 : for i := range vs.metrics.Levels {
608 1 : l := &vs.metrics.Levels[i]
609 1 : l.TablesCount = int64(newVersion.Levels[i].Len())
610 1 : l.VirtualTablesCount = newVersion.Levels[i].NumVirtual
611 1 : l.VirtualTablesSize = newVersion.Levels[i].VirtualTableSize
612 1 : l.TablesSize = int64(newVersion.Levels[i].TableSize())
613 1 : l.EstimatedReferencesSize = newVersion.Levels[i].EstimatedReferenceSize()
614 1 : l.Sublevels = 0
615 1 : if l.TablesCount > 0 {
616 1 : l.Sublevels = 1
617 1 : }
618 1 : if invariants.Enabled {
619 1 : levelFiles := newVersion.Levels[i].Slice()
620 1 : if size := int64(levelFiles.TableSizeSum()); l.TablesSize != size {
621 0 : vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.TablesSize, size)
622 0 : }
623 1 : refSize := uint64(0)
624 1 : for f := range levelFiles.All() {
625 1 : refSize += f.EstimatedReferenceSize()
626 1 : }
627 1 : if refSize != l.EstimatedReferencesSize {
628 0 : vs.opts.Logger.Fatalf("versionSet metrics L%d EstimatedReferencesSize = %d, recomputed size = %d", i, l.EstimatedReferencesSize, refSize)
629 0 : }
630 :
631 1 : if nVirtual := levelFiles.NumVirtual(); nVirtual != l.VirtualTablesCount {
632 0 : vs.opts.Logger.Fatalf(
633 0 : "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
634 0 : i, l.VirtualTablesCount, nVirtual,
635 0 : )
636 0 : }
637 1 : if vSize := levelFiles.VirtualTableSizeSum(); vSize != l.VirtualTablesSize {
638 0 : vs.opts.Logger.Fatalf(
639 0 : "versionSet metrics L%d Virtual size = %d, actual size = %d",
640 0 : i, l.VirtualTablesSize, vSize,
641 0 : )
642 0 : }
643 : }
644 : }
645 1 : vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
646 1 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localTablesLiveDelta.size)
647 1 : vs.metrics.Table.Local.LiveCount = uint64(int64(vs.metrics.Table.Local.LiveCount) + localTablesLiveDelta.count)
648 1 : vs.metrics.BlobFiles.Local.LiveSize = uint64(int64(vs.metrics.BlobFiles.Local.LiveSize) + localBlobLiveDelta.size)
649 1 : vs.metrics.BlobFiles.Local.LiveCount = uint64(int64(vs.metrics.BlobFiles.Local.LiveCount) + localBlobLiveDelta.count)
650 1 :
651 1 : vs.setCompactionPicker(newCompactionPickerByScore(newVersion, vs.latest, vs.opts, inProgress))
652 1 : if !vs.dynamicBaseLevel {
653 1 : vs.picker.forceBaseLevel1()
654 1 : }
655 :
656 1 : return updateManifestStart.Elapsed(), nil
657 : }
658 :
659 1 : func (vs *versionSet) setCompactionPicker(picker *compactionPickerByScore) {
660 1 : vs.picker = picker
661 1 : vs.curCompactionConcurrency.Store(int32(picker.getCompactionConcurrency()))
662 1 : }
663 :
664 : type tableBackingInfo struct {
665 : backing *manifest.TableBacking
666 : isLocal bool
667 : }
668 :
669 : type fileMetricDelta struct {
670 : count int64
671 : size int64
672 : }
673 :
674 : // getZombieTablesAndUpdateVirtualBackings updates the virtual backings with the
675 : // changes in the versionEdit and populates ve.RemovedBackingTables.
676 : // Returns:
677 : // - zombieBackings: all backings (physical and virtual) that will no longer be
678 : // needed when we apply ve.
679 : // - removedVirtualBackings: the virtual backings that will be removed by the
680 : // VersionEdit and which must be Unref()ed by the caller. These backings
681 : // match ve.RemovedBackingTables.
682 : // - localLiveSizeDelta: the delta in local live bytes.
683 : func getZombieTablesAndUpdateVirtualBackings(
684 : ve *manifest.VersionEdit, virtualBackings *manifest.VirtualBackings, provider objstorage.Provider,
685 1 : ) (zombieBackings, removedVirtualBackings []tableBackingInfo, localLiveDelta fileMetricDelta) {
686 1 : // First, deal with the physical tables.
687 1 : //
688 1 : // A physical backing has become unused if it is in DeletedFiles but not in
689 1 : // NewFiles or CreatedBackingTables.
690 1 : //
691 1 : // Note that for the common case where there are very few elements, the map
692 1 : // will stay on the stack.
693 1 : stillUsed := make(map[base.DiskFileNum]struct{})
694 1 : for _, nf := range ve.NewTables {
695 1 : if !nf.Meta.Virtual {
696 1 : stillUsed[nf.Meta.TableBacking.DiskFileNum] = struct{}{}
697 1 : isLocal, localFileDelta := sizeIfLocal(nf.Meta.TableBacking, provider)
698 1 : localLiveDelta.size += localFileDelta
699 1 : if isLocal {
700 1 : localLiveDelta.count++
701 1 : }
702 : }
703 : }
704 1 : for _, b := range ve.CreatedBackingTables {
705 1 : stillUsed[b.DiskFileNum] = struct{}{}
706 1 : }
707 1 : for _, m := range ve.DeletedTables {
708 1 : if !m.Virtual {
709 1 : // NB: this deleted file may also be in NewFiles or
710 1 : // CreatedBackingTables, due to a file moving between levels, or
711 1 : // becoming virtualized. In which case there is no change due to this
712 1 : // file in the localLiveSizeDelta -- the subtraction below compensates
713 1 : // for the addition.
714 1 : isLocal, localFileDelta := sizeIfLocal(m.TableBacking, provider)
715 1 : localLiveDelta.size -= localFileDelta
716 1 : if isLocal {
717 1 : localLiveDelta.count--
718 1 : }
719 1 : if _, ok := stillUsed[m.TableBacking.DiskFileNum]; !ok {
720 1 : zombieBackings = append(zombieBackings, tableBackingInfo{
721 1 : backing: m.TableBacking,
722 1 : isLocal: isLocal,
723 1 : })
724 1 : }
725 : }
726 : }
727 :
728 : // Now deal with virtual tables.
729 : //
730 : // When a virtual table moves between levels we RemoveTable() then AddTable(),
731 : // which works out.
732 1 : for _, b := range ve.CreatedBackingTables {
733 1 : isLocal, localFileDelta := sizeIfLocal(b, provider)
734 1 : virtualBackings.AddAndRef(b, isLocal)
735 1 : localLiveDelta.size += localFileDelta
736 1 : if isLocal {
737 1 : localLiveDelta.count++
738 1 : }
739 : }
740 1 : for _, m := range ve.DeletedTables {
741 1 : if m.Virtual {
742 1 : virtualBackings.RemoveTable(m.TableBacking.DiskFileNum, m.TableNum)
743 1 : }
744 : }
745 1 : for _, nf := range ve.NewTables {
746 1 : if nf.Meta.Virtual {
747 1 : virtualBackings.AddTable(nf.Meta, nf.Level)
748 1 : }
749 : }
750 :
751 1 : if unused := virtualBackings.Unused(); len(unused) > 0 {
752 1 : // Virtual backings that are no longer used are zombies and are also added
753 1 : // to RemovedBackingTables (before the version edit is written to disk).
754 1 : ve.RemovedBackingTables = make([]base.DiskFileNum, len(unused))
755 1 : for i, b := range unused {
756 1 : isLocal, localFileDelta := sizeIfLocal(b, provider)
757 1 : localLiveDelta.size -= localFileDelta
758 1 : if isLocal {
759 1 : localLiveDelta.count--
760 1 : }
761 1 : ve.RemovedBackingTables[i] = b.DiskFileNum
762 1 : zombieBackings = append(zombieBackings, tableBackingInfo{
763 1 : backing: b,
764 1 : isLocal: isLocal,
765 1 : })
766 1 : virtualBackings.Remove(b.DiskFileNum)
767 : }
768 1 : removedVirtualBackings = zombieBackings[len(zombieBackings)-len(unused):]
769 : }
770 1 : return zombieBackings, removedVirtualBackings, localLiveDelta
771 : }
772 :
773 : // getZombieBlobFilesAndComputeLocalMetrics constructs objectInfos for all
774 : // zombie blob files, and computes the metric deltas for live files overall and
775 : // locally.
776 : func getZombieBlobFilesAndComputeLocalMetrics(
777 : ve *manifest.VersionEdit, provider objstorage.Provider,
778 1 : ) (zombieBlobFiles []objectInfo, localLiveDelta fileMetricDelta) {
779 1 : for _, b := range ve.NewBlobFiles {
780 1 : if objstorage.IsLocalBlobFile(provider, b.Physical.FileNum) {
781 1 : localLiveDelta.count++
782 1 : localLiveDelta.size += int64(b.Physical.Size)
783 1 : }
784 : }
785 1 : zombieBlobFiles = make([]objectInfo, 0, len(ve.DeletedBlobFiles))
786 1 : for _, physical := range ve.DeletedBlobFiles {
787 1 : isLocal := objstorage.IsLocalBlobFile(provider, physical.FileNum)
788 1 : if isLocal {
789 1 : localLiveDelta.count--
790 1 : localLiveDelta.size -= int64(physical.Size)
791 1 : }
792 1 : zombieBlobFiles = append(zombieBlobFiles, objectInfo{
793 1 : fileInfo: fileInfo{
794 1 : FileNum: physical.FileNum,
795 1 : FileSize: physical.Size,
796 1 : },
797 1 : isLocal: isLocal,
798 1 : })
799 : }
800 1 : return zombieBlobFiles, localLiveDelta
801 : }
802 :
803 : // sizeIfLocal returns backing.Size if the backing is a local file, else 0.
804 : func sizeIfLocal(
805 : backing *manifest.TableBacking, provider objstorage.Provider,
806 1 : ) (isLocal bool, localSize int64) {
807 1 : isLocal = objstorage.IsLocalTable(provider, backing.DiskFileNum)
808 1 : if isLocal {
809 1 : return true, int64(backing.Size)
810 1 : }
811 1 : return false, 0
812 : }
813 :
814 : func (vs *versionSet) incrementCompactions(
815 : kind compactionKind, extraLevels []*compactionLevel, bytesWritten int64, compactionErr error,
816 1 : ) {
817 1 : if kind == compactionKindFlush || kind == compactionKindIngestedFlushable {
818 1 : vs.metrics.Flush.Count++
819 1 : } else {
820 1 : vs.metrics.Compact.Count++
821 1 : if compactionErr != nil {
822 1 : if errors.Is(compactionErr, ErrCancelledCompaction) {
823 1 : vs.metrics.Compact.CancelledCount++
824 1 : vs.metrics.Compact.CancelledBytes += bytesWritten
825 1 : } else {
826 1 : vs.metrics.Compact.FailedCount++
827 1 : }
828 : }
829 : }
830 :
831 1 : switch kind {
832 1 : case compactionKindDefault:
833 1 : vs.metrics.Compact.DefaultCount++
834 :
835 1 : case compactionKindFlush, compactionKindIngestedFlushable:
836 :
837 1 : case compactionKindMove:
838 1 : vs.metrics.Compact.MoveCount++
839 :
840 1 : case compactionKindDeleteOnly:
841 1 : vs.metrics.Compact.DeleteOnlyCount++
842 :
843 1 : case compactionKindElisionOnly:
844 1 : vs.metrics.Compact.ElisionOnlyCount++
845 :
846 1 : case compactionKindRead:
847 1 : vs.metrics.Compact.ReadCount++
848 :
849 1 : case compactionKindTombstoneDensity:
850 1 : vs.metrics.Compact.TombstoneDensityCount++
851 :
852 1 : case compactionKindRewrite:
853 1 : vs.metrics.Compact.RewriteCount++
854 :
855 1 : case compactionKindCopy:
856 1 : vs.metrics.Compact.CopyCount++
857 :
858 1 : case compactionKindBlobFileRewrite:
859 1 : vs.metrics.Compact.BlobFileRewriteCount++
860 :
861 1 : case compactionKindVirtualRewrite:
862 1 : vs.metrics.Compact.VirtualRewriteCount++
863 :
864 0 : default:
865 0 : if invariants.Enabled {
866 0 : panic("unhandled compaction kind")
867 : }
868 :
869 : }
870 1 : if len(extraLevels) > 0 {
871 1 : vs.metrics.Compact.MultiLevelCount++
872 1 : }
873 : }
874 :
875 1 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
876 1 : vs.atomicInProgressBytes.Add(numBytes)
877 1 : }
878 :
879 : // createManifest creates a manifest file that contains a snapshot of vs.
880 : func (vs *versionSet) createManifest(
881 : dirname string,
882 : fileNum, minUnflushedLogNum base.DiskFileNum,
883 : nextFileNum uint64,
884 : virtualBackings []*manifest.TableBacking,
885 1 : ) (err error) {
886 1 : var (
887 1 : filename = base.MakeFilepath(vs.fs, dirname, base.FileTypeManifest, fileNum)
888 1 : manifestFile vfs.File
889 1 : manifestWriter *record.Writer
890 1 : )
891 1 : defer func() {
892 1 : if manifestWriter != nil {
893 0 : _ = manifestWriter.Close()
894 0 : }
895 1 : if manifestFile != nil {
896 0 : _ = manifestFile.Close()
897 0 : }
898 1 : if err != nil {
899 1 : _ = vs.fs.Remove(filename)
900 1 : }
901 : }()
902 1 : manifestFile, err = vs.fs.Create(filename, "pebble-manifest")
903 1 : if err != nil {
904 1 : return err
905 1 : }
906 1 : manifestWriter = record.NewWriter(manifestFile)
907 1 :
908 1 : snapshot := manifest.VersionEdit{
909 1 : ComparerName: vs.cmp.Name,
910 1 : // When creating a version snapshot for an existing DB, this snapshot
911 1 : // VersionEdit will be immediately followed by another VersionEdit
912 1 : // (being written in UpdateVersionLocked()). That VersionEdit always
913 1 : // contains a LastSeqNum, so we don't need to include that in the
914 1 : // snapshot. But it does not necessarily include MinUnflushedLogNum,
915 1 : // NextFileNum, so we initialize those using the corresponding fields in
916 1 : // the versionSet (which came from the latest preceding VersionEdit that
917 1 : // had those fields).
918 1 : MinUnflushedLogNum: minUnflushedLogNum,
919 1 : NextFileNum: nextFileNum,
920 1 : CreatedBackingTables: virtualBackings,
921 1 : NewBlobFiles: vs.latest.blobFiles.Metadatas(),
922 1 : }
923 1 :
924 1 : // Add all extant sstables in the current version.
925 1 : for level, levelMetadata := range vs.currentVersion().Levels {
926 1 : for meta := range levelMetadata.All() {
927 1 : snapshot.NewTables = append(snapshot.NewTables, manifest.NewTableEntry{
928 1 : Level: level,
929 1 : Meta: meta,
930 1 : })
931 1 : }
932 : }
933 :
934 : // Add all tables marked for compaction.
935 1 : if vs.getFormatMajorVersion() >= FormatMarkForCompactionInVersionEdit {
936 1 : for meta, level := range vs.currentVersion().MarkedForCompaction.Ascending() {
937 1 : snapshot.TablesMarkedForCompaction = append(snapshot.TablesMarkedForCompaction, manifest.TableMarkedForCompactionEntry{
938 1 : Level: level,
939 1 : TableNum: meta.TableNum,
940 1 : })
941 1 : }
942 : }
943 :
944 1 : w, err1 := manifestWriter.Next()
945 1 : if err1 != nil {
946 0 : return err1
947 0 : }
948 1 : if err := snapshot.Encode(w); err != nil {
949 0 : return err
950 0 : }
951 :
952 1 : if vs.manifest != nil {
953 1 : if err := vs.manifest.Close(); err != nil {
954 0 : return err
955 0 : }
956 1 : vs.manifest = nil
957 : }
958 1 : if vs.manifestFile != nil {
959 1 : if err := vs.manifestFile.Close(); err != nil {
960 0 : return err
961 0 : }
962 1 : vs.manifestFile = nil
963 : }
964 :
965 1 : vs.manifest, manifestWriter = manifestWriter, nil
966 1 : vs.manifestFile, manifestFile = manifestFile, nil
967 1 : return nil
968 : }
969 :
970 : // NB: This method is not safe for concurrent use. It is only safe
971 : // to be called when concurrent changes to nextFileNum are not expected.
972 1 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
973 1 : if vs.nextFileNum.Load() <= uint64(fileNum) {
974 1 : vs.nextFileNum.Store(uint64(fileNum + 1))
975 1 : }
976 : }
977 :
978 : // getNextTableNum returns a new table number.
979 : //
980 : // Can be called without the versionSet's mutex being held.
981 1 : func (vs *versionSet) getNextTableNum() base.TableNum {
982 1 : x := vs.nextFileNum.Add(1) - 1
983 1 : return base.TableNum(x)
984 1 : }
985 :
986 : // Can be called without the versionSet's mutex being held.
987 1 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
988 1 : x := vs.nextFileNum.Add(1) - 1
989 1 : return base.DiskFileNum(x)
990 1 : }
991 :
992 1 : func (vs *versionSet) append(v *manifest.Version) {
993 1 : if v.Refs() != 0 {
994 0 : panic("pebble: version should be unreferenced")
995 : }
996 1 : if !vs.versions.Empty() {
997 1 : vs.versions.Back().UnrefLocked()
998 1 : }
999 1 : v.Deleted = vs.obsoleteFn
1000 1 : v.Ref()
1001 1 : vs.versions.PushBack(v)
1002 1 : if invariants.Enabled {
1003 1 : // Verify that the virtualBackings contains all the backings referenced by
1004 1 : // the version.
1005 1 : for _, l := range v.Levels {
1006 1 : for f := range l.All() {
1007 1 : if f.Virtual {
1008 1 : if _, ok := vs.latest.virtualBackings.Get(f.TableBacking.DiskFileNum); !ok {
1009 0 : panic(fmt.Sprintf("%s is not in virtualBackings", f.TableBacking.DiskFileNum))
1010 : }
1011 : }
1012 : }
1013 : }
1014 : }
1015 : }
1016 :
1017 1 : func (vs *versionSet) currentVersion() *manifest.Version {
1018 1 : return vs.versions.Back()
1019 1 : }
1020 :
1021 1 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
1022 1 : current := vs.currentVersion()
1023 1 : for v := vs.versions.Front(); true; v = v.Next() {
1024 1 : for _, lm := range v.Levels {
1025 1 : for f := range lm.All() {
1026 1 : m[f.TableBacking.DiskFileNum] = struct{}{}
1027 1 : }
1028 : }
1029 1 : for bf := range v.BlobFiles.All() {
1030 1 : m[bf.Physical.FileNum] = struct{}{}
1031 1 : }
1032 1 : if v == current {
1033 1 : break
1034 : }
1035 : }
1036 : // virtualBackings contains backings that are referenced by some virtual
1037 : // tables in the latest version (which are handled above), and backings that
1038 : // are not but are still alive because of the protection mechanism (see
1039 : // manifset.VirtualBackings). This loop ensures the latter get added to the
1040 : // map.
1041 1 : for b := range vs.latest.virtualBackings.All() {
1042 1 : m[b.DiskFileNum] = struct{}{}
1043 1 : }
1044 : }
1045 :
1046 : // addObsoleteLocked will add the fileInfo associated with obsolete backing
1047 : // sstables to the obsolete tables list.
1048 : //
1049 : // The file backings in the obsolete list must not appear more than once.
1050 : //
1051 : // DB.mu must be held when addObsoleteLocked is called.
1052 1 : func (vs *versionSet) addObsoleteLocked(obsolete manifest.ObsoleteFiles) {
1053 1 : if obsolete.Count() == 0 {
1054 1 : return
1055 1 : }
1056 :
1057 : // Note that the zombie objects transition from zombie *to* obsolete, and
1058 : // will no longer be considered zombie.
1059 :
1060 1 : newlyObsoleteTables := make([]obsoleteFile, len(obsolete.TableBackings))
1061 1 : for i, bs := range obsolete.TableBackings {
1062 1 : newlyObsoleteTables[i] = vs.zombieTables.Extract(bs.DiskFileNum).
1063 1 : asObsoleteFile(vs.fs, base.FileTypeTable, vs.dirname)
1064 1 : }
1065 1 : vs.obsoleteTables = mergeObsoleteFiles(vs.obsoleteTables, newlyObsoleteTables)
1066 1 :
1067 1 : newlyObsoleteBlobFiles := make([]obsoleteFile, len(obsolete.BlobFiles))
1068 1 : for i, bf := range obsolete.BlobFiles {
1069 1 : newlyObsoleteBlobFiles[i] = vs.zombieBlobs.Extract(bf.FileNum).
1070 1 : asObsoleteFile(vs.fs, base.FileTypeBlob, vs.dirname)
1071 1 : }
1072 1 : vs.obsoleteBlobs = mergeObsoleteFiles(vs.obsoleteBlobs, newlyObsoleteBlobFiles)
1073 1 : vs.updateObsoleteObjectMetricsLocked()
1074 : }
1075 :
1076 : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
1077 : // called.
1078 1 : func (vs *versionSet) addObsolete(obsolete manifest.ObsoleteFiles) {
1079 1 : vs.mu.Lock()
1080 1 : defer vs.mu.Unlock()
1081 1 : vs.addObsoleteLocked(obsolete)
1082 1 : }
1083 :
1084 1 : func (vs *versionSet) updateObsoleteObjectMetricsLocked() {
1085 1 : // TODO(jackson): Ideally we would update vs.fileDeletions.queuedStats to
1086 1 : // include the files on vs.obsolete{Tables,Blobs}, but there's subtlety in
1087 1 : // deduplicating the files before computing the stats. It might also be
1088 1 : // possible to refactor to remove the vs.obsolete{Tables,Blobs} intermediary
1089 1 : // step. Revisit this.
1090 1 : vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
1091 1 : vs.metrics.Table.ObsoleteSize = 0
1092 1 : vs.metrics.Table.Local.ObsoleteSize = 0
1093 1 : vs.metrics.Table.Local.ObsoleteCount = 0
1094 1 : for _, fi := range vs.obsoleteTables {
1095 1 : vs.metrics.Table.ObsoleteSize += fi.fileSize
1096 1 : if fi.isLocal {
1097 1 : vs.metrics.Table.Local.ObsoleteSize += fi.fileSize
1098 1 : vs.metrics.Table.Local.ObsoleteCount++
1099 1 : }
1100 : }
1101 1 : vs.metrics.BlobFiles.ObsoleteCount = uint64(len(vs.obsoleteBlobs))
1102 1 : vs.metrics.BlobFiles.ObsoleteSize = 0
1103 1 : vs.metrics.BlobFiles.Local.ObsoleteSize = 0
1104 1 : vs.metrics.BlobFiles.Local.ObsoleteCount = 0
1105 1 : for _, fi := range vs.obsoleteBlobs {
1106 1 : vs.metrics.BlobFiles.ObsoleteSize += fi.fileSize
1107 1 : if fi.isLocal {
1108 1 : vs.metrics.BlobFiles.Local.ObsoleteSize += fi.fileSize
1109 1 : vs.metrics.BlobFiles.Local.ObsoleteCount++
1110 1 : }
1111 : }
1112 : }
1113 :
1114 : func findCurrentManifest(
1115 : fs vfs.FS, dirname string, ls []string,
1116 1 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
1117 1 : // Locating a marker should succeed even if the marker has never been placed.
1118 1 : var filename string
1119 1 : marker, filename, err = atomicfs.LocateMarkerInListing(fs, dirname, manifestMarkerName, ls)
1120 1 : if err != nil {
1121 1 : return nil, 0, false, err
1122 1 : }
1123 :
1124 1 : if filename == "" {
1125 1 : // The marker hasn't been set yet. This database doesn't exist.
1126 1 : return marker, 0, false, nil
1127 1 : }
1128 :
1129 1 : var ok bool
1130 1 : _, manifestNum, ok = base.ParseFilename(fs, filename)
1131 1 : if !ok {
1132 0 : return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
1133 0 : }
1134 1 : return marker, manifestNum, true, nil
1135 : }
1136 :
1137 1 : func newFileMetrics(newFiles []manifest.NewTableEntry) levelMetricsDelta {
1138 1 : var m levelMetricsDelta
1139 1 : for _, nf := range newFiles {
1140 1 : lm := m[nf.Level]
1141 1 : if lm == nil {
1142 1 : lm = &LevelMetrics{}
1143 1 : m[nf.Level] = lm
1144 1 : }
1145 1 : lm.TablesCount++
1146 1 : lm.TablesSize += int64(nf.Meta.Size)
1147 1 : lm.EstimatedReferencesSize += nf.Meta.EstimatedReferenceSize()
1148 : }
1149 1 : return m
1150 : }
|