Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "slices"
10 : "sync"
11 : "sync/atomic"
12 : "time"
13 :
14 : "github.com/cockroachdb/crlib/crtime"
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/deletepacer"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/internal/manifest"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/record"
22 : "github.com/cockroachdb/pebble/vfs"
23 : "github.com/cockroachdb/pebble/vfs/atomicfs"
24 : )
25 :
26 : const numLevels = manifest.NumLevels
27 :
28 : const manifestMarkerName = `manifest`
29 :
30 : // versionSet manages a collection of immutable versions, and manages the
31 : // creation of a new version from the most recent version. A new version is
32 : // created from an existing version by applying a version edit which is just
33 : // like it sounds: a delta from the previous version. Version edits are logged
34 : // to the MANIFEST file, which is replayed at startup.
35 : type versionSet struct {
36 : // Next seqNum to use for WAL writes.
37 : //
38 : // Note that unflushed WALs may contain sequence numbers less than, equal
39 : // to, or greater than logSeqNum. This is because every version edit
40 : // includes the current logSeqNum, so any version edits applied after a
41 : // write and before the flush of the write to a sstable will update
42 : // logSeqNum beyond the write's sequence number.
43 : logSeqNum base.AtomicSeqNum
44 :
45 : // The upper bound on sequence numbers that have been assigned so far. A
46 : // suffix of these sequence numbers may not have been written to a WAL. Both
47 : // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
48 : // visibleSeqNum is <= logSeqNum.
49 : visibleSeqNum base.AtomicSeqNum
50 :
51 : // Number of bytes present in sstables being written by in-progress
52 : // compactions. This value will be zero if there are no in-progress
53 : // compactions. Updated and read atomically.
54 : atomicInProgressBytes atomic.Int64
55 :
56 : // Immutable fields.
57 : dirname string
58 : provider objstorage.Provider
59 : // Set to DB.mu.
60 : mu *sync.Mutex
61 : opts *Options
62 : fs vfs.FS
63 : cmp *base.Comparer
64 : // Dynamic base level allows the dynamic base level computation to be
65 : // disabled. Used by tests which want to create specific LSM structures.
66 : dynamicBaseLevel bool
67 :
68 : // Mutable fields.
69 : versions manifest.VersionList
70 : latest *latestVersionState
71 : picker compactionPicker
72 : // curCompactionConcurrency is updated whenever picker is updated.
73 : // INVARIANT: >= 1.
74 : curCompactionConcurrency atomic.Int32
75 :
76 : // metrics contains the subset of Metrics that are maintained by the version
77 : // set. The rest of the metrics are computed in DB.Metrics().
78 : metrics struct {
79 : Levels AllLevelMetrics
80 : Compact CompactMetrics
81 : Ingest IngestMetrics
82 : Flush FlushMetrics
83 : Keys KeysMetrics
84 : }
85 :
86 : // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
87 : // on the creation of every version.
88 : obsoleteFn func(manifest.ObsoleteFiles)
89 : // obsolete{Tables,Blobs,Manifests,Options} are sorted by file number ascending.
90 : // TODO(jackson, radu): Investigate if we still need this intermediary step or
91 : // we can directly enqueue all deletions.
92 : obsoleteTables []deletepacer.ObsoleteFile
93 : obsoleteBlobs []deletepacer.ObsoleteFile
94 : obsoleteManifests []deletepacer.ObsoleteFile
95 : obsoleteOptions []deletepacer.ObsoleteFile
96 :
97 : // Zombie tables which have been removed from the current version but are
98 : // still referenced by an inuse iterator.
99 : zombieTables zombieObjects
100 : // Zombie blobs which have been removed from the current version but are
101 : // still referenced by an inuse iterator.
102 : zombieBlobs zombieObjects
103 :
104 : // minUnflushedLogNum is the smallest WAL log file number corresponding to
105 : // mutations that have not been flushed to an sstable.
106 : minUnflushedLogNum base.DiskFileNum
107 :
108 : // The next file number. A single counter is used to assign file
109 : // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
110 : nextFileNum atomic.Uint64
111 :
112 : // The current manifest file number.
113 : manifestFileNum base.DiskFileNum
114 : manifestMarker *atomicfs.Marker
115 :
116 : manifestFile vfs.File
117 : manifest *record.Writer
118 : getFormatMajorVersion func() FormatMajorVersion
119 :
120 : writing bool
121 : writerCond sync.Cond
122 : // State for deciding when to write a snapshot. Protected by mu.
123 : rotationHelper record.RotationHelper
124 :
125 : pickedCompactionCache pickedCompactionCache
126 : }
127 :
128 : // latestVersionState maintains mutable state describing only the most recent
129 : // version. Unlike a *Version, the state within this struct is updated as new
130 : // version edits are applied.
131 : type latestVersionState struct {
132 : l0Organizer *manifest.L0Organizer
133 : // blobFiles is the set of blob files referenced by the current version.
134 : // blobFiles is protected by the manifest logLock (not vs.mu).
135 : blobFiles manifest.CurrentBlobFileSet
136 : // virtualBackings contains information about the FileBackings which support
137 : // virtual sstables in the latest version. It is mainly used to determine when
138 : // a backing is no longer in use by the tables in the latest version; this is
139 : // not a trivial problem because compactions and other operations can happen
140 : // in parallel (and they can finish in unpredictable order).
141 : //
142 : // This mechanism is complementary to the backing Ref/Unref mechanism, which
143 : // determines when a backing is no longer used by *any* live version and can
144 : // be removed.
145 : //
146 : // In principle this should have been a copy-on-write structure, with each
147 : // Version having its own record of its virtual backings (similar to the
148 : // B-tree that holds the tables). However, in practice we only need it for the
149 : // latest version, so we use a simpler structure and store it in the
150 : // versionSet instead.
151 : //
152 : // virtualBackings is modified under DB.mu and the log lock. If it is accessed
153 : // under DB.mu and a version update is in progress, it reflects the state of
154 : // the next version.
155 : virtualBackings manifest.VirtualBackings
156 : }
157 :
158 : func (vs *versionSet) init(
159 : dirname string,
160 : provider objstorage.Provider,
161 : opts *Options,
162 : marker *atomicfs.Marker,
163 : getFMV func() FormatMajorVersion,
164 : mu *sync.Mutex,
165 1 : ) {
166 1 : vs.dirname = dirname
167 1 : vs.provider = provider
168 1 : vs.mu = mu
169 1 : vs.writerCond.L = mu
170 1 : vs.opts = opts
171 1 : vs.fs = opts.FS
172 1 : vs.cmp = opts.Comparer
173 1 : vs.dynamicBaseLevel = true
174 1 : vs.versions.Init(mu)
175 1 : vs.obsoleteFn = vs.addObsoleteLocked
176 1 : vs.zombieTables = makeZombieObjects()
177 1 : vs.zombieBlobs = makeZombieObjects()
178 1 : vs.nextFileNum.Store(1)
179 1 : vs.logSeqNum.Store(base.SeqNumStart)
180 1 : vs.manifestMarker = marker
181 1 : vs.getFormatMajorVersion = getFMV
182 1 : }
183 :
184 : // initNewDB creates a version set for a fresh database, persisting an initial
185 : // manifest file.
186 : func (vs *versionSet) initNewDB(
187 : jobID JobID,
188 : dirname string,
189 : provider objstorage.Provider,
190 : opts *Options,
191 : marker *atomicfs.Marker,
192 : getFormatMajorVersion func() FormatMajorVersion,
193 : blobRewriteHeuristic manifest.BlobRewriteHeuristic,
194 : mu *sync.Mutex,
195 1 : ) error {
196 1 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
197 1 : vs.latest = &latestVersionState{
198 1 : l0Organizer: manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes),
199 1 : virtualBackings: manifest.MakeVirtualBackings(),
200 1 : }
201 1 : vs.latest.blobFiles.Init(nil, blobRewriteHeuristic)
202 1 : emptyVersion := manifest.NewInitialVersion(opts.Comparer)
203 1 : vs.append(emptyVersion)
204 1 :
205 1 : vs.setCompactionPicker(
206 1 : newCompactionPickerByScore(emptyVersion, vs.latest, vs.opts, nil))
207 1 : // Note that a "snapshot" version edit is written to the manifest when it is
208 1 : // created.
209 1 : vs.manifestFileNum = vs.getNextDiskFileNum()
210 1 : err := vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum.Load(), nil /* virtualBackings */)
211 1 : if err == nil {
212 1 : if err = vs.manifest.Flush(); err != nil {
213 1 : vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
214 1 : }
215 : }
216 1 : if err == nil {
217 1 : if err = vs.manifestFile.Sync(); err != nil {
218 1 : vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
219 1 : }
220 : }
221 1 : if err == nil {
222 1 : // NB: Move() is responsible for syncing the data directory.
223 1 : if err = vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, vs.manifestFileNum)); err != nil {
224 1 : vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
225 1 : }
226 : }
227 :
228 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
229 1 : JobID: int(jobID),
230 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
231 1 : FileNum: vs.manifestFileNum,
232 1 : Err: err,
233 1 : })
234 1 : if err != nil {
235 1 : return err
236 1 : }
237 1 : return nil
238 : }
239 :
240 : // initRecoveredDB initializes the version set from the existing database state
241 : // recovered from an existing manifest file.
242 : func (vs *versionSet) initRecoveredDB(
243 : dirname string,
244 : provider objstorage.Provider,
245 : opts *Options,
246 : rv *recoveredVersion,
247 : marker *atomicfs.Marker,
248 : getFormatMajorVersion func() FormatMajorVersion,
249 : mu *sync.Mutex,
250 1 : ) error {
251 1 : vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
252 1 : vs.manifestFileNum = rv.manifestFileNum
253 1 : vs.minUnflushedLogNum = rv.minUnflushedLogNum
254 1 : vs.nextFileNum.Store(uint64(rv.nextFileNum))
255 1 : vs.logSeqNum.Store(rv.logSeqNum)
256 1 : vs.latest = rv.latest
257 1 : setBasicLevelMetrics(&vs.metrics.Levels, rv.version)
258 1 : vs.append(rv.version)
259 1 : vs.setCompactionPicker(newCompactionPickerByScore(
260 1 : rv.version, vs.latest, vs.opts, nil))
261 1 : return nil
262 1 : }
263 :
264 1 : func (vs *versionSet) close() error {
265 1 : if vs.manifestFile != nil {
266 1 : if err := vs.manifestFile.Close(); err != nil {
267 0 : return err
268 0 : }
269 : }
270 1 : if vs.manifestMarker != nil {
271 1 : if err := vs.manifestMarker.Close(); err != nil {
272 0 : return err
273 0 : }
274 : }
275 1 : return nil
276 : }
277 :
278 : // logLock locks the manifest for writing. The lock must be released by
279 : // a call to logUnlock.
280 : //
281 : // DB.mu must be held when calling this method, but the mutex may be dropped and
282 : // re-acquired during the course of this method.
283 1 : func (vs *versionSet) logLock() {
284 1 : // Wait for any existing writing to the manifest to complete, then mark the
285 1 : // manifest as busy.
286 1 : for vs.writing {
287 1 : // Note: writerCond.L is DB.mu, so we unlock it while we wait.
288 1 : vs.writerCond.Wait()
289 1 : }
290 1 : vs.writing = true
291 : }
292 :
293 : // logUnlock releases the lock for manifest writing.
294 : //
295 : // DB.mu must be held when calling this method.
296 1 : func (vs *versionSet) logUnlock() {
297 1 : if !vs.writing {
298 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
299 0 : }
300 1 : vs.writing = false
301 1 : vs.writerCond.Signal()
302 : }
303 :
304 1 : func (vs *versionSet) logUnlockAndInvalidatePickedCompactionCache() {
305 1 : vs.pickedCompactionCache.invalidate()
306 1 : vs.logUnlock()
307 1 : }
308 :
309 : // versionUpdate is returned by the function passed to UpdateVersionLocked.
310 : //
311 : // If VE is nil, there is no update to apply (but it is not an error).
312 : type versionUpdate struct {
313 : VE *manifest.VersionEdit
314 : JobID JobID
315 : Metrics levelMetricsDelta
316 : // InProgressCompactionFn is called while DB.mu is held after the I/O part of
317 : // the update was performed. It should return any compactions that are
318 : // in-progress (excluding than the one that is being applied).
319 : InProgressCompactionsFn func() []compactionInfo
320 : ForceManifestRotation bool
321 : }
322 :
323 : // UpdateVersionLocked is used to update the current version, returning the
324 : // duration of the manifest update. If the manifest update is unsuccessful, the
325 : // duration will be unset (0).
326 : //
327 : // DB.mu must be held. UpdateVersionLocked first waits for any other version
328 : // update to complete, releasing and reacquiring DB.mu.
329 : //
330 : // UpdateVersionLocked then calls updateFn which builds a versionUpdate, while
331 : // holding DB.mu. The updateFn can release and reacquire DB.mu (it should
332 : // attempt to do as much work as possible outside of the lock).
333 : //
334 : // UpdateVersionLocked fills in the following fields of the VersionEdit:
335 : // NextFileNum, LastSeqNum, RemovedBackingTables. The removed backing tables are
336 : // those backings that are no longer used (in the new version) after applying
337 : // the edit (as per vs.virtualBackings). Other than these fields, the
338 : // VersionEdit must be complete.
339 : //
340 : // New table backing references (TableBacking.Ref) are taken as part of applying
341 : // the version edit. The state of the virtual backings (vs.virtualBackings) is
342 : // updated before logging to the manifest and installing the latest version;
343 : // this is ok because any failure in those steps is fatal.
344 : //
345 : // If updateFn returns an error, no update is applied and that same error is returned.
346 : // If versionUpdate.VE is nil, the no update is applied (and no error is returned).
347 : func (vs *versionSet) UpdateVersionLocked(
348 : updateFn func() (versionUpdate, error),
349 1 : ) (time.Duration, error) {
350 1 : vs.logLock()
351 1 : defer vs.logUnlockAndInvalidatePickedCompactionCache()
352 1 :
353 1 : vu, err := updateFn()
354 1 : if err != nil || vu.VE == nil {
355 1 : return 0, err
356 1 : }
357 :
358 1 : if !vs.writing {
359 0 : vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
360 0 : }
361 :
362 1 : updateManifestStart := crtime.NowMono()
363 1 : ve := vu.VE
364 1 : if ve.MinUnflushedLogNum != 0 {
365 1 : if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
366 1 : vs.nextFileNum.Load() <= uint64(ve.MinUnflushedLogNum) {
367 0 : panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
368 0 : ve.MinUnflushedLogNum))
369 : }
370 : }
371 :
372 : // This is the next manifest filenum, but if the current file is too big we
373 : // will write this ve to the next file which means what ve encodes is the
374 : // current filenum and not the next one.
375 : //
376 : // TODO(sbhola): figure out why this is correct and update comment.
377 1 : ve.NextFileNum = vs.nextFileNum.Load()
378 1 :
379 1 : // LastSeqNum is set to the current upper bound on the assigned sequence
380 1 : // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
381 1 : // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
382 1 : // replay. It must be higher than or equal to any than any sequence number
383 1 : // written to an sstable, including sequence numbers in ingested files.
384 1 : // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
385 1 : // number. This is fallout from ingestion which allows a sequence number X to
386 1 : // be assigned to an ingested sstable even though sequence number X-1 resides
387 1 : // in an unflushed memtable. logSeqNum is the _next_ sequence number that
388 1 : // will be assigned, so subtract that by 1 to get the upper bound on the
389 1 : // last assigned sequence number.
390 1 : logSeqNum := vs.logSeqNum.Load()
391 1 : ve.LastSeqNum = logSeqNum - 1
392 1 : if logSeqNum == 0 {
393 0 : // logSeqNum is initialized to 1 in Open() if there are no previous WAL
394 0 : // or manifest records, so this case should never happen.
395 0 : vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
396 0 : }
397 :
398 1 : currentVersion := vs.currentVersion()
399 1 : var newVersion *manifest.Version
400 1 :
401 1 : // Generate a new manifest if we don't currently have one, or forceRotation
402 1 : // is true, or the current one is too large.
403 1 : //
404 1 : // For largeness, we do not exclusively use MaxManifestFileSize size
405 1 : // threshold since we have had incidents where due to either large keys or
406 1 : // large numbers of files, each edit results in a snapshot + write of the
407 1 : // edit. This slows the system down since each flush or compaction is
408 1 : // writing a new manifest snapshot. The primary goal of the size-based
409 1 : // rollover logic is to ensure that when reopening a DB, the number of edits
410 1 : // that need to be replayed on top of the snapshot is "sane". Rolling over
411 1 : // to a new manifest after each edit is not relevant to that goal.
412 1 : //
413 1 : // Consider the following cases:
414 1 : // - The number of live files F in the DB is roughly stable: after writing
415 1 : // the snapshot (with F files), say we require that there be enough edits
416 1 : // such that the cumulative number of files in those edits, E, be greater
417 1 : // than F. This will ensure that the total amount of time in
418 1 : // UpdateVersionLocked that is spent in snapshot writing is ~50%.
419 1 : //
420 1 : // - The number of live files F in the DB is shrinking drastically, say from
421 1 : // F to F/10: This can happen for various reasons, like wide range
422 1 : // tombstones, or large numbers of smaller than usual files that are being
423 1 : // merged together into larger files. And say the new files generated
424 1 : // during this shrinkage is insignificant compared to F/10, and so for
425 1 : // this example we will assume it is effectively 0. After this shrinking,
426 1 : // E = 0.9F, and so if we used the previous snapshot file count, F, as the
427 1 : // threshold that needs to be exceeded, we will further delay the snapshot
428 1 : // writing. Which means on DB reopen we will need to replay 0.9F edits to
429 1 : // get to a version with 0.1F files. It would be better to create a new
430 1 : // snapshot when E exceeds the number of files in the current version.
431 1 : //
432 1 : // - The number of live files F in the DB is growing via perfect ingests
433 1 : // into L6: Say we wrote the snapshot when there were F files and now we
434 1 : // have 10F files, so E = 9F. We will further delay writing a new
435 1 : // snapshot. This case can be critiqued as contrived, but we consider it
436 1 : // nonetheless.
437 1 : //
438 1 : // The logic below uses the min of the last snapshot file count and the file
439 1 : // count in the current version.
440 1 : vs.rotationHelper.AddRecord(int64(len(ve.DeletedTables) + len(ve.NewTables)))
441 1 : sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
442 1 : requireRotation := vu.ForceManifestRotation || vs.manifest == nil
443 1 :
444 1 : var nextSnapshotFilecount int64
445 1 : for i := range vs.metrics.Levels {
446 1 : nextSnapshotFilecount += int64(vs.metrics.Levels[i].Tables.Count)
447 1 : }
448 1 : if sizeExceeded && !requireRotation {
449 1 : requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
450 1 : }
451 1 : var newManifestFileNum base.DiskFileNum
452 1 : var prevManifestFileSize uint64
453 1 : var newManifestVirtualBackings []*manifest.TableBacking
454 1 : if requireRotation {
455 1 : newManifestFileNum = vs.getNextDiskFileNum()
456 1 : prevManifestFileSize = uint64(vs.manifest.Size())
457 1 :
458 1 : // We want the virtual backings *before* applying the version edit, because
459 1 : // the new manifest will contain the pre-apply version plus the last version
460 1 : // edit.
461 1 : newManifestVirtualBackings = slices.Collect(vs.latest.virtualBackings.All())
462 1 : }
463 :
464 : // Grab certain values before releasing vs.mu, in case createManifest() needs
465 : // to be called.
466 1 : minUnflushedLogNum := vs.minUnflushedLogNum
467 1 : nextFileNum := vs.nextFileNum.Load()
468 1 :
469 1 : // Note: this call populates ve.RemovedBackingTables.
470 1 : zombieBackings, removedVirtualBackings :=
471 1 : getZombieTablesAndUpdateVirtualBackings(ve, &vs.latest.virtualBackings, vs.provider)
472 1 :
473 1 : var l0Update manifest.L0PreparedUpdate
474 1 : if err := func() error {
475 1 : vs.mu.Unlock()
476 1 : defer vs.mu.Lock()
477 1 :
478 1 : if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
479 0 : return base.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
480 0 : }
481 :
482 : // Rotate the manifest if necessary. Rotating the manifest involves
483 : // creating a new file and writing an initial version edit containing a
484 : // snapshot of the current version. This initial version edit will
485 : // reflect the Version prior to the pending version edit (`ve`). Once
486 : // we've created the new manifest with the previous version state, we'll
487 : // append the version edit `ve` to the tail of the new manifest.
488 1 : if newManifestFileNum != 0 {
489 1 : if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum, newManifestVirtualBackings); err != nil {
490 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
491 1 : JobID: int(vu.JobID),
492 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
493 1 : FileNum: newManifestFileNum,
494 1 : Err: err,
495 1 : })
496 1 : return errors.Wrap(err, "MANIFEST create failed")
497 1 : }
498 : }
499 :
500 : // Call ApplyAndUpdateVersionEdit before accumulating the version edit.
501 : // If any blob files are no longer referenced, the version edit will be
502 : // updated to explicitly record the deletion of the blob files. This can
503 : // happen here because vs.blobFiles is protected by the manifest logLock
504 : // (NOT vs.mu). We only read or write vs.blobFiles while holding the
505 : // manifest lock.
506 1 : if err := vs.latest.blobFiles.ApplyAndUpdateVersionEdit(ve); err != nil {
507 0 : return errors.Wrap(err, "MANIFEST blob files apply and update failed")
508 0 : }
509 :
510 1 : var bulkEdit manifest.BulkVersionEdit
511 1 : err := bulkEdit.Accumulate(ve)
512 1 : if err != nil {
513 0 : return errors.Wrap(err, "MANIFEST accumulate failed")
514 0 : }
515 1 : newVersion, err = bulkEdit.Apply(currentVersion, vs.opts.Experimental.ReadCompactionRate)
516 1 : if err != nil {
517 0 : return errors.Wrap(err, "MANIFEST apply failed")
518 0 : }
519 1 : l0Update = vs.latest.l0Organizer.PrepareUpdate(&bulkEdit, newVersion)
520 1 :
521 1 : w, err := vs.manifest.Next()
522 1 : if err != nil {
523 0 : return errors.Wrap(err, "MANIFEST next record write failed")
524 0 : }
525 :
526 : // NB: Any error from this point on is considered fatal as we don't know if
527 : // the MANIFEST write occurred or not. Trying to determine that is
528 : // fraught. Instead we rely on the standard recovery mechanism run when a
529 : // database is open. In particular, that mechanism generates a new MANIFEST
530 : // and ensures it is synced.
531 1 : if err := ve.Encode(w); err != nil {
532 0 : return errors.Wrap(err, "MANIFEST write failed")
533 0 : }
534 1 : if err := vs.manifest.Flush(); err != nil {
535 1 : return errors.Wrap(err, "MANIFEST flush failed")
536 1 : }
537 1 : if err := vs.manifestFile.Sync(); err != nil {
538 1 : return errors.Wrap(err, "MANIFEST sync failed")
539 1 : }
540 1 : if newManifestFileNum != 0 {
541 1 : // NB: Move() is responsible for syncing the data directory.
542 1 : if err := vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, newManifestFileNum)); err != nil {
543 0 : return errors.Wrap(err, "MANIFEST set current failed")
544 0 : }
545 1 : vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
546 1 : JobID: int(vu.JobID),
547 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
548 1 : FileNum: newManifestFileNum,
549 1 : })
550 : }
551 1 : return nil
552 1 : }(); err != nil {
553 1 : // Any error encountered during any of the operations in the previous
554 1 : // closure are considered fatal. Treating such errors as fatal is preferred
555 1 : // to attempting to unwind various file and b-tree reference counts, and
556 1 : // re-generating L0 sublevel metadata. This may change in the future, if
557 1 : // certain manifest / WAL operations become retryable. For more context, see
558 1 : // #1159 and #1792.
559 1 : vs.opts.Logger.Fatalf("%s", err)
560 1 : return 0, err
561 1 : }
562 :
563 1 : if requireRotation {
564 1 : // Successfully rotated.
565 1 : vs.rotationHelper.Rotate(nextSnapshotFilecount)
566 1 : }
567 : // Now that DB.mu is held again, initialize compacting file info in
568 : // L0Sublevels.
569 1 : inProgress := vu.InProgressCompactionsFn()
570 1 :
571 1 : zombieBlobs := getZombieBlobFiles(ve, vs.provider)
572 1 : vs.latest.l0Organizer.PerformUpdate(l0Update, newVersion)
573 1 : vs.latest.l0Organizer.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
574 1 :
575 1 : // Update the zombie objects sets first, as installation of the new version
576 1 : // will unref the previous version which could result in addObsoleteLocked
577 1 : // being called.
578 1 : for _, b := range zombieBackings {
579 1 : vs.zombieTables.Add(objectInfo{
580 1 : fileInfo: fileInfo{
581 1 : FileNum: b.backing.DiskFileNum,
582 1 : FileSize: b.backing.Size,
583 1 : },
584 1 : placement: b.placement,
585 1 : })
586 1 : }
587 1 : for _, zb := range zombieBlobs {
588 1 : vs.zombieBlobs.Add(zb)
589 1 : }
590 : // Unref the removed backings and report those that already became obsolete.
591 : // Note that the only case where we report obsolete tables here is when
592 : // VirtualBackings.Protect/Unprotect was used to keep a backing alive without
593 : // it being used in the current version.
594 1 : var obsoleteVirtualBackings manifest.ObsoleteFiles
595 1 : for _, b := range removedVirtualBackings {
596 1 : if b.backing.Unref() == 0 {
597 1 : obsoleteVirtualBackings.TableBackings = append(obsoleteVirtualBackings.TableBackings, b.backing)
598 1 : }
599 : }
600 1 : vs.addObsoleteLocked(obsoleteVirtualBackings)
601 1 :
602 1 : // Install the new version.
603 1 : vs.append(newVersion)
604 1 :
605 1 : if ve.MinUnflushedLogNum != 0 {
606 1 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
607 1 : }
608 1 : if newManifestFileNum != 0 {
609 1 : if vs.manifestFileNum != 0 {
610 1 : vs.obsoleteManifests = append(vs.obsoleteManifests, deletepacer.ObsoleteFile{
611 1 : FileType: base.FileTypeManifest,
612 1 : FS: vs.fs,
613 1 : Path: base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
614 1 : FileNum: vs.manifestFileNum,
615 1 : FileSize: prevManifestFileSize,
616 1 : Placement: base.Local,
617 1 : })
618 1 : }
619 1 : vs.manifestFileNum = newManifestFileNum
620 : }
621 :
622 1 : vs.metrics.Levels.update(vu.Metrics)
623 1 : setBasicLevelMetrics(&vs.metrics.Levels, newVersion)
624 1 :
625 1 : vs.setCompactionPicker(newCompactionPickerByScore(newVersion, vs.latest, vs.opts, inProgress))
626 1 : if !vs.dynamicBaseLevel {
627 1 : vs.picker.forceBaseLevel1()
628 1 : }
629 :
630 1 : return updateManifestStart.Elapsed(), nil
631 : }
632 :
633 1 : func (vs *versionSet) setCompactionPicker(picker *compactionPickerByScore) {
634 1 : vs.picker = picker
635 1 : vs.curCompactionConcurrency.Store(int32(picker.getCompactionConcurrency()))
636 1 : }
637 :
638 : type tableBackingInfo struct {
639 : backing *manifest.TableBacking
640 : placement base.Placement
641 : }
642 :
643 : // getZombieTablesAndUpdateVirtualBackings updates the virtual backings with the
644 : // changes in the versionEdit and populates ve.RemovedBackingTables.
645 : // Returns:
646 : // - zombieBackings: all backings (physical and virtual) that will no longer be
647 : // needed when we apply ve.
648 : // - removedVirtualBackings: the virtual backings that will be removed by the
649 : // VersionEdit and which must be Unref()ed by the caller. These backings
650 : // match ve.RemovedBackingTables.
651 : func getZombieTablesAndUpdateVirtualBackings(
652 : ve *manifest.VersionEdit, virtualBackings *manifest.VirtualBackings, provider objstorage.Provider,
653 1 : ) (zombieBackings, removedVirtualBackings []tableBackingInfo) {
654 1 : // First, deal with the physical tables.
655 1 : //
656 1 : // A physical backing has become unused if it is in DeletedFiles but not in
657 1 : // NewFiles or CreatedBackingTables.
658 1 : //
659 1 : // Note that for the common case where there are very few elements, the map
660 1 : // will stay on the stack.
661 1 : stillUsed := make(map[base.DiskFileNum]struct{})
662 1 : for _, nf := range ve.NewTables {
663 1 : if !nf.Meta.Virtual {
664 1 : stillUsed[nf.Meta.TableBacking.DiskFileNum] = struct{}{}
665 1 : }
666 : }
667 1 : for _, b := range ve.CreatedBackingTables {
668 1 : stillUsed[b.DiskFileNum] = struct{}{}
669 1 : }
670 1 : for _, m := range ve.DeletedTables {
671 1 : if !m.Virtual {
672 1 : if _, ok := stillUsed[m.TableBacking.DiskFileNum]; !ok {
673 1 : zombieBackings = append(zombieBackings, tableBackingInfo{
674 1 : backing: m.TableBacking,
675 1 : placement: objstorage.Placement(provider, base.FileTypeTable, m.TableBacking.DiskFileNum),
676 1 : })
677 1 : }
678 : }
679 : }
680 :
681 : // Now deal with virtual tables.
682 : //
683 : // When a virtual table moves between levels we RemoveTable() then AddTable(),
684 : // which works out.
685 1 : for _, b := range ve.CreatedBackingTables {
686 1 : placement := objstorage.Placement(provider, base.FileTypeTable, b.DiskFileNum)
687 1 : virtualBackings.AddAndRef(b, placement)
688 1 : }
689 1 : for _, m := range ve.DeletedTables {
690 1 : if m.Virtual {
691 1 : virtualBackings.RemoveTable(m.TableBacking.DiskFileNum, m.TableNum)
692 1 : }
693 : }
694 1 : for _, nf := range ve.NewTables {
695 1 : if nf.Meta.Virtual {
696 1 : virtualBackings.AddTable(nf.Meta, nf.Level)
697 1 : }
698 : }
699 :
700 1 : if unused := virtualBackings.Unused(); len(unused) > 0 {
701 1 : // Virtual backings that are no longer used are zombies and are also added
702 1 : // to RemovedBackingTables (before the version edit is written to disk).
703 1 : ve.RemovedBackingTables = make([]base.DiskFileNum, len(unused))
704 1 : for i, b := range unused {
705 1 : placement := objstorage.Placement(provider, base.FileTypeTable, b.DiskFileNum)
706 1 : ve.RemovedBackingTables[i] = b.DiskFileNum
707 1 : zombieBackings = append(zombieBackings, tableBackingInfo{
708 1 : backing: b,
709 1 : placement: placement,
710 1 : })
711 1 : virtualBackings.Remove(b.DiskFileNum)
712 1 : }
713 1 : removedVirtualBackings = zombieBackings[len(zombieBackings)-len(unused):]
714 : }
715 1 : return zombieBackings, removedVirtualBackings
716 : }
717 :
718 : // getZombieBlobFiles constructs objectInfos for all zombie blob files.
719 : func getZombieBlobFiles(
720 : ve *manifest.VersionEdit, provider objstorage.Provider,
721 1 : ) (zombieBlobFiles []objectInfo) {
722 1 : zombieBlobFiles = make([]objectInfo, 0, len(ve.DeletedBlobFiles))
723 1 : for _, physical := range ve.DeletedBlobFiles {
724 1 : placement := objstorage.Placement(provider, base.FileTypeBlob, physical.FileNum)
725 1 : zombieBlobFiles = append(zombieBlobFiles, objectInfo{
726 1 : fileInfo: fileInfo{
727 1 : FileNum: physical.FileNum,
728 1 : FileSize: physical.Size,
729 1 : },
730 1 : placement: placement,
731 1 : })
732 1 : }
733 1 : return zombieBlobFiles
734 : }
735 :
736 : func (vs *versionSet) incrementCompactions(
737 : kind compactionKind, extraLevels []*compactionLevel, bytesWritten int64, compactionErr error,
738 1 : ) {
739 1 : if kind == compactionKindFlush || kind == compactionKindIngestedFlushable {
740 1 : vs.metrics.Flush.Count++
741 1 : } else {
742 1 : vs.metrics.Compact.Count++
743 1 : if compactionErr != nil {
744 1 : if errors.Is(compactionErr, ErrCancelledCompaction) {
745 1 : vs.metrics.Compact.CancelledCount++
746 1 : vs.metrics.Compact.CancelledBytes += bytesWritten
747 1 : } else {
748 1 : vs.metrics.Compact.FailedCount++
749 1 : }
750 : }
751 : }
752 :
753 1 : switch kind {
754 1 : case compactionKindDefault:
755 1 : vs.metrics.Compact.DefaultCount++
756 :
757 1 : case compactionKindFlush, compactionKindIngestedFlushable:
758 :
759 1 : case compactionKindMove:
760 1 : vs.metrics.Compact.MoveCount++
761 :
762 1 : case compactionKindDeleteOnly:
763 1 : vs.metrics.Compact.DeleteOnlyCount++
764 :
765 1 : case compactionKindElisionOnly:
766 1 : vs.metrics.Compact.ElisionOnlyCount++
767 :
768 1 : case compactionKindRead:
769 1 : vs.metrics.Compact.ReadCount++
770 :
771 1 : case compactionKindTombstoneDensity:
772 1 : vs.metrics.Compact.TombstoneDensityCount++
773 :
774 1 : case compactionKindRewrite:
775 1 : vs.metrics.Compact.RewriteCount++
776 :
777 1 : case compactionKindCopy:
778 1 : vs.metrics.Compact.CopyCount++
779 :
780 1 : case compactionKindBlobFileRewrite:
781 1 : vs.metrics.Compact.BlobFileRewriteCount++
782 :
783 1 : case compactionKindVirtualRewrite:
784 1 : vs.metrics.Compact.VirtualRewriteCount++
785 :
786 0 : default:
787 0 : if invariants.Enabled {
788 0 : panic("unhandled compaction kind")
789 : }
790 :
791 : }
792 1 : if len(extraLevels) > 0 {
793 1 : vs.metrics.Compact.MultiLevelCount++
794 1 : }
795 : }
796 :
797 1 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
798 1 : vs.atomicInProgressBytes.Add(numBytes)
799 1 : }
800 :
801 : // createManifest creates a manifest file that contains a snapshot of vs.
802 : func (vs *versionSet) createManifest(
803 : dirname string,
804 : fileNum, minUnflushedLogNum base.DiskFileNum,
805 : nextFileNum uint64,
806 : virtualBackings []*manifest.TableBacking,
807 1 : ) (err error) {
808 1 : var (
809 1 : filename = base.MakeFilepath(vs.fs, dirname, base.FileTypeManifest, fileNum)
810 1 : manifestFile vfs.File
811 1 : manifestWriter *record.Writer
812 1 : )
813 1 : defer func() {
814 1 : if manifestWriter != nil {
815 0 : _ = manifestWriter.Close()
816 0 : }
817 1 : if manifestFile != nil {
818 0 : _ = manifestFile.Close()
819 0 : }
820 1 : if err != nil {
821 1 : _ = vs.fs.Remove(filename)
822 1 : }
823 : }()
824 1 : manifestFile, err = vs.fs.Create(filename, "pebble-manifest")
825 1 : if err != nil {
826 1 : return err
827 1 : }
828 1 : manifestWriter = record.NewWriter(manifestFile)
829 1 :
830 1 : snapshot := manifest.VersionEdit{
831 1 : ComparerName: vs.cmp.Name,
832 1 : // When creating a version snapshot for an existing DB, this snapshot
833 1 : // VersionEdit will be immediately followed by another VersionEdit
834 1 : // (being written in UpdateVersionLocked()). That VersionEdit always
835 1 : // contains a LastSeqNum, so we don't need to include that in the
836 1 : // snapshot. But it does not necessarily include MinUnflushedLogNum,
837 1 : // NextFileNum, so we initialize those using the corresponding fields in
838 1 : // the versionSet (which came from the latest preceding VersionEdit that
839 1 : // had those fields).
840 1 : MinUnflushedLogNum: minUnflushedLogNum,
841 1 : NextFileNum: nextFileNum,
842 1 : CreatedBackingTables: virtualBackings,
843 1 : NewBlobFiles: vs.latest.blobFiles.Metadatas(),
844 1 : }
845 1 :
846 1 : // Add all extant sstables in the current version.
847 1 : for level, levelMetadata := range vs.currentVersion().Levels {
848 1 : for meta := range levelMetadata.All() {
849 1 : snapshot.NewTables = append(snapshot.NewTables, manifest.NewTableEntry{
850 1 : Level: level,
851 1 : Meta: meta,
852 1 : })
853 1 : }
854 : }
855 :
856 : // Add all tables marked for compaction.
857 1 : if vs.getFormatMajorVersion() >= FormatMarkForCompactionInVersionEdit {
858 1 : for meta, level := range vs.currentVersion().MarkedForCompaction.Ascending() {
859 1 : snapshot.TablesMarkedForCompaction = append(snapshot.TablesMarkedForCompaction, manifest.TableMarkedForCompactionEntry{
860 1 : Level: level,
861 1 : TableNum: meta.TableNum,
862 1 : })
863 1 : }
864 : }
865 :
866 1 : w, err1 := manifestWriter.Next()
867 1 : if err1 != nil {
868 0 : return err1
869 0 : }
870 1 : if err := snapshot.Encode(w); err != nil {
871 0 : return err
872 0 : }
873 :
874 1 : if vs.manifest != nil {
875 1 : if err := vs.manifest.Close(); err != nil {
876 0 : return err
877 0 : }
878 1 : vs.manifest = nil
879 : }
880 1 : if vs.manifestFile != nil {
881 1 : if err := vs.manifestFile.Close(); err != nil {
882 0 : return err
883 0 : }
884 1 : vs.manifestFile = nil
885 : }
886 :
887 1 : vs.manifest, manifestWriter = manifestWriter, nil
888 1 : vs.manifestFile, manifestFile = manifestFile, nil
889 1 : return nil
890 : }
891 :
892 : // NB: This method is not safe for concurrent use. It is only safe
893 : // to be called when concurrent changes to nextFileNum are not expected.
894 1 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
895 1 : if vs.nextFileNum.Load() <= uint64(fileNum) {
896 1 : vs.nextFileNum.Store(uint64(fileNum + 1))
897 1 : }
898 : }
899 :
900 : // getNextTableNum returns a new table number.
901 : //
902 : // Can be called without the versionSet's mutex being held.
903 1 : func (vs *versionSet) getNextTableNum() base.TableNum {
904 1 : x := vs.nextFileNum.Add(1) - 1
905 1 : return base.TableNum(x)
906 1 : }
907 :
908 : // Can be called without the versionSet's mutex being held.
909 1 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
910 1 : x := vs.nextFileNum.Add(1) - 1
911 1 : return base.DiskFileNum(x)
912 1 : }
913 :
914 1 : func (vs *versionSet) append(v *manifest.Version) {
915 1 : if v.Refs() != 0 {
916 0 : panic("pebble: version should be unreferenced")
917 : }
918 1 : if !vs.versions.Empty() {
919 1 : vs.versions.Back().UnrefLocked()
920 1 : }
921 1 : v.Deleted = vs.obsoleteFn
922 1 : v.Ref()
923 1 : vs.versions.PushBack(v)
924 1 : if invariants.Enabled {
925 1 : // Verify that the virtualBackings contains all the backings referenced by
926 1 : // the version.
927 1 : for _, l := range v.Levels {
928 1 : for f := range l.All() {
929 1 : if f.Virtual {
930 1 : if _, ok := vs.latest.virtualBackings.Get(f.TableBacking.DiskFileNum); !ok {
931 0 : panic(fmt.Sprintf("%s is not in virtualBackings", f.TableBacking.DiskFileNum))
932 : }
933 : }
934 : }
935 : }
936 : }
937 : }
938 :
939 1 : func (vs *versionSet) currentVersion() *manifest.Version {
940 1 : return vs.versions.Back()
941 1 : }
942 :
943 1 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
944 1 : current := vs.currentVersion()
945 1 : for v := vs.versions.Front(); true; v = v.Next() {
946 1 : for _, lm := range v.Levels {
947 1 : for f := range lm.All() {
948 1 : m[f.TableBacking.DiskFileNum] = struct{}{}
949 1 : }
950 : }
951 1 : for bf := range v.BlobFiles.All() {
952 1 : m[bf.Physical.FileNum] = struct{}{}
953 1 : }
954 1 : if v == current {
955 1 : break
956 : }
957 : }
958 : // virtualBackings contains backings that are referenced by some virtual
959 : // tables in the latest version (which are handled above), and backings that
960 : // are not but are still alive because of the protection mechanism (see
961 : // manifset.VirtualBackings). This loop ensures the latter get added to the
962 : // map.
963 1 : for b := range vs.latest.virtualBackings.All() {
964 1 : m[b.DiskFileNum] = struct{}{}
965 1 : }
966 : }
967 :
968 : // addObsoleteLocked will add the fileInfo associated with obsolete backing
969 : // sstables to the obsolete tables list.
970 : //
971 : // The file backings in the obsolete list must not appear more than once.
972 : //
973 : // DB.mu must be held when addObsoleteLocked is called.
974 1 : func (vs *versionSet) addObsoleteLocked(obsolete manifest.ObsoleteFiles) {
975 1 : if obsolete.Count() == 0 {
976 1 : return
977 1 : }
978 :
979 : // Note that the zombie objects transition from zombie *to* obsolete, and
980 : // will no longer be considered zombie.
981 :
982 1 : newlyObsoleteTables := make([]deletepacer.ObsoleteFile, len(obsolete.TableBackings))
983 1 : for i, bs := range obsolete.TableBackings {
984 1 : newlyObsoleteTables[i] = vs.zombieTables.Extract(bs.DiskFileNum).
985 1 : asObsoleteFile(vs.fs, base.FileTypeTable, vs.dirname)
986 1 : }
987 1 : vs.obsoleteTables = mergeObsoleteFiles(vs.obsoleteTables, newlyObsoleteTables)
988 1 :
989 1 : newlyObsoleteBlobFiles := make([]deletepacer.ObsoleteFile, len(obsolete.BlobFiles))
990 1 : for i, bf := range obsolete.BlobFiles {
991 1 : newlyObsoleteBlobFiles[i] = vs.zombieBlobs.Extract(bf.FileNum).
992 1 : asObsoleteFile(vs.fs, base.FileTypeBlob, vs.dirname)
993 1 : }
994 1 : vs.obsoleteBlobs = mergeObsoleteFiles(vs.obsoleteBlobs, newlyObsoleteBlobFiles)
995 : }
996 :
997 : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
998 : // called.
999 1 : func (vs *versionSet) addObsolete(obsolete manifest.ObsoleteFiles) {
1000 1 : vs.mu.Lock()
1001 1 : defer vs.mu.Unlock()
1002 1 : vs.addObsoleteLocked(obsolete)
1003 1 : }
1004 :
1005 : // This method sets the following fields of m.Levels[*]:
1006 : // - [Virtual]Tables{Count,Size}
1007 : // - Sublevels
1008 : // - EstimatedReferencesSize
1009 1 : func setBasicLevelMetrics(lm *AllLevelMetrics, newVersion *manifest.Version) {
1010 1 : for i := range lm {
1011 1 : l := &lm[i]
1012 1 : l.Tables.Count = uint64(newVersion.Levels[i].Len())
1013 1 : l.Tables.Bytes = newVersion.Levels[i].TableSize()
1014 1 : l.VirtualTables = newVersion.Levels[i].VirtualTables()
1015 1 : l.EstimatedReferencesSize = newVersion.Levels[i].EstimatedReferenceSize()
1016 1 : l.Sublevels = 0
1017 1 : if l.Tables.Count > 0 {
1018 1 : l.Sublevels = 1
1019 1 : }
1020 1 : if invariants.Enabled {
1021 1 : levelFiles := newVersion.Levels[i].Slice()
1022 1 : if size := levelFiles.TableSizeSum(); l.Tables.Bytes != size {
1023 0 : panic(fmt.Sprintf("versionSet metrics L%d Size = %d, actual size = %d", i, l.Tables.Bytes, size))
1024 : }
1025 1 : refSize := uint64(0)
1026 1 : for f := range levelFiles.All() {
1027 1 : refSize += f.EstimatedReferenceSize()
1028 1 : }
1029 1 : if refSize != l.EstimatedReferencesSize {
1030 0 : panic(fmt.Sprintf(
1031 0 : "versionSet metrics L%d EstimatedReferencesSize = %d, recomputed size = %d",
1032 0 : i, l.EstimatedReferencesSize, refSize,
1033 0 : ))
1034 : }
1035 :
1036 1 : if nVirtual := levelFiles.NumVirtual(); nVirtual != l.VirtualTables.Count {
1037 0 : panic(fmt.Sprintf(
1038 0 : "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
1039 0 : i, l.VirtualTables.Count, nVirtual,
1040 0 : ))
1041 : }
1042 1 : if vSize := levelFiles.VirtualTableSizeSum(); vSize != l.VirtualTables.Bytes {
1043 0 : panic(fmt.Sprintf(
1044 0 : "versionSet metrics L%d Virtual size = %d, actual size = %d",
1045 0 : i, l.VirtualTables.Bytes, vSize,
1046 0 : ))
1047 : }
1048 : }
1049 : }
1050 1 : lm[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
1051 : }
1052 :
1053 : func findCurrentManifest(
1054 : fs vfs.FS, dirname string, ls []string,
1055 1 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
1056 1 : // Locating a marker should succeed even if the marker has never been placed.
1057 1 : var filename string
1058 1 : marker, filename, err = atomicfs.LocateMarkerInListing(fs, dirname, manifestMarkerName, ls)
1059 1 : if err != nil {
1060 1 : return nil, 0, false, err
1061 1 : }
1062 :
1063 1 : if filename == "" {
1064 1 : // The marker hasn't been set yet. This database doesn't exist.
1065 1 : return marker, 0, false, nil
1066 1 : }
1067 :
1068 1 : var ok bool
1069 1 : _, manifestNum, ok = base.ParseFilename(fs, filename)
1070 1 : if !ok {
1071 0 : return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
1072 0 : }
1073 1 : return marker, manifestNum, true, nil
1074 : }
|