Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "math"
14 : "os"
15 : "slices"
16 : "sync/atomic"
17 : "time"
18 :
19 : "github.com/cockroachdb/crlib/crtime"
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/errors/oserror"
22 : "github.com/cockroachdb/pebble/batchrepr"
23 : "github.com/cockroachdb/pebble/internal/arenaskl"
24 : "github.com/cockroachdb/pebble/internal/base"
25 : "github.com/cockroachdb/pebble/internal/cache"
26 : "github.com/cockroachdb/pebble/internal/constants"
27 : "github.com/cockroachdb/pebble/internal/invariants"
28 : "github.com/cockroachdb/pebble/internal/keyspan"
29 : "github.com/cockroachdb/pebble/internal/manifest"
30 : "github.com/cockroachdb/pebble/internal/manual"
31 : "github.com/cockroachdb/pebble/objstorage"
32 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
33 : "github.com/cockroachdb/pebble/objstorage/remote"
34 : "github.com/cockroachdb/pebble/record"
35 : "github.com/cockroachdb/pebble/vfs"
36 : "github.com/cockroachdb/pebble/wal"
37 : "github.com/prometheus/client_golang/prometheus"
38 : )
39 :
40 : const (
41 : initialMemTableSize = 256 << 10 // 256 KB
42 :
43 : // The max batch size is limited by the uint32 offsets stored in
44 : // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
45 : //
46 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
47 : // end of an allocation fits in uint32.
48 : //
49 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
50 : // 2GB).
51 : maxBatchSize = constants.MaxUint32OrInt
52 :
53 : // The max memtable size is limited by the uint32 offsets stored in
54 : // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
55 : //
56 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
57 : // end of an allocation fits in uint32.
58 : //
59 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
60 : // 2GB).
61 : maxMemTableSize = constants.MaxUint32OrInt
62 : )
63 :
64 : // FileCacheSize can be used to determine the file
65 : // cache size for a single db, given the maximum open
66 : // files which can be used by a file cache which is
67 : // only used by a single db.
68 1 : func FileCacheSize(maxOpenFiles int) int {
69 1 : fileCacheSize := maxOpenFiles - numNonFileCacheFiles
70 1 : if fileCacheSize < minFileCacheSize {
71 0 : fileCacheSize = minFileCacheSize
72 0 : }
73 1 : return fileCacheSize
74 : }
75 :
76 : // Open opens a DB whose files live in the given directory.
77 : //
78 : // IsCorruptionError() can be use to determine if the error is caused by on-disk
79 : // corruption.
80 1 : func Open(dirname string, opts *Options) (db *DB, err error) {
81 1 : // Make a copy of the options so that we don't mutate the passed in options.
82 1 : opts = opts.Clone()
83 1 : opts.EnsureDefaults()
84 1 : if opts.Experimental.CompactionScheduler == nil {
85 1 : opts.Experimental.CompactionScheduler = newConcurrencyLimitScheduler(defaultTimeSource{})
86 1 : }
87 1 : if err := opts.Validate(); err != nil {
88 0 : return nil, err
89 0 : }
90 1 : if opts.LoggerAndTracer == nil {
91 1 : opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
92 1 : } else {
93 1 : opts.Logger = opts.LoggerAndTracer
94 1 : }
95 :
96 1 : if invariants.Sometimes(5) {
97 1 : assertComparer := base.MakeAssertComparer(*opts.Comparer)
98 1 : opts.Comparer = &assertComparer
99 1 : }
100 :
101 : // In all error cases, we return db = nil; this is used by various
102 : // deferred cleanups.
103 :
104 : // Open the database and WAL directories first.
105 1 : walDirname, dataDir, err := prepareAndOpenDirs(dirname, opts)
106 1 : if err != nil {
107 1 : return nil, errors.Wrapf(err, "error opening database at %q", dirname)
108 1 : }
109 1 : defer func() {
110 1 : if db == nil {
111 1 : dataDir.Close()
112 1 : }
113 : }()
114 :
115 : // Lock the database directory.
116 1 : var fileLock *Lock
117 1 : if opts.Lock != nil {
118 1 : // The caller already acquired the database lock. Ensure that the
119 1 : // directory matches.
120 1 : if err := opts.Lock.pathMatches(dirname); err != nil {
121 0 : return nil, err
122 0 : }
123 1 : if err := opts.Lock.refForOpen(); err != nil {
124 1 : return nil, err
125 1 : }
126 1 : fileLock = opts.Lock
127 1 : } else {
128 1 : fileLock, err = LockDirectory(dirname, opts.FS)
129 1 : if err != nil {
130 1 : return nil, err
131 1 : }
132 : }
133 1 : defer func() {
134 1 : if db == nil {
135 1 : _ = fileLock.Close()
136 1 : }
137 : }()
138 :
139 : // List the directory contents. This also happens to include WAL log files, if
140 : // they are in the same dir, but we will ignore those below. The provider is
141 : // also given this list, but it ignores non sstable files.
142 1 : ls, err := opts.FS.List(dirname)
143 1 : if err != nil {
144 1 : return nil, err
145 1 : }
146 :
147 : // Establish the format major version.
148 1 : formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
149 1 : if err != nil {
150 1 : return nil, err
151 1 : }
152 1 : defer func() {
153 1 : if db == nil {
154 1 : _ = formatVersionMarker.Close()
155 1 : }
156 : }()
157 :
158 1 : noFormatVersionMarker := formatVersion == FormatDefault
159 1 : if noFormatVersionMarker {
160 1 : // We will initialize the store at the minimum possible format, then upgrade
161 1 : // the format to the desired one. This helps test the format upgrade code.
162 1 : formatVersion = FormatMinSupported
163 1 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
164 1 : formatVersion = FormatMinForSharedObjects
165 1 : }
166 : // There is no format version marker file. There are three cases:
167 : // - we are trying to open an existing store that was created at
168 : // FormatMostCompatible (the only one without a version marker file)
169 : // - we are creating a new store;
170 : // - we are retrying a failed creation.
171 : //
172 : // To error in the first case, we set ErrorIfNotPristine.
173 1 : opts.ErrorIfNotPristine = true
174 1 : defer func() {
175 1 : if err != nil && errors.Is(err, ErrDBNotPristine) {
176 0 : // We must be trying to open an existing store at FormatMostCompatible.
177 0 : // Correct the error in this case -we
178 0 : err = errors.Newf(
179 0 : "pebble: database %q written in format major version 1 which is no longer supported",
180 0 : dirname)
181 0 : }
182 : }()
183 1 : } else {
184 1 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone && formatVersion < FormatMinForSharedObjects {
185 0 : return nil, errors.Newf(
186 0 : "pebble: database %q configured with shared objects but written in too old format major version %d",
187 0 : dirname, formatVersion)
188 0 : }
189 : }
190 :
191 : // Find the currently active manifest, if there is one.
192 1 : manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
193 1 : if err != nil {
194 1 : return nil, errors.Wrapf(err, "pebble: database %q", dirname)
195 1 : }
196 1 : defer func() {
197 1 : if db == nil {
198 1 : _ = manifestMarker.Close()
199 1 : }
200 : }()
201 :
202 : // Atomic markers may leave behind obsolete files if there's a crash
203 : // mid-update. Clean these up if we're not in read-only mode.
204 1 : if !opts.ReadOnly {
205 1 : if err := formatVersionMarker.RemoveObsolete(); err != nil {
206 0 : return nil, err
207 0 : }
208 1 : if err := manifestMarker.RemoveObsolete(); err != nil {
209 0 : return nil, err
210 0 : }
211 : }
212 :
213 1 : if opts.Cache == nil {
214 1 : opts.Cache = cache.New(opts.CacheSize)
215 1 : defer opts.Cache.Unref()
216 1 : }
217 :
218 1 : d := &DB{
219 1 : cacheHandle: opts.Cache.NewHandle(),
220 1 : dirname: dirname,
221 1 : opts: opts,
222 1 : cmp: opts.Comparer.Compare,
223 1 : equal: opts.Comparer.Equal,
224 1 : merge: opts.Merger.Merge,
225 1 : split: opts.Comparer.Split,
226 1 : abbreviatedKey: opts.Comparer.AbbreviatedKey,
227 1 : largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
228 1 : fileLock: fileLock,
229 1 : dataDir: dataDir,
230 1 : closed: new(atomic.Value),
231 1 : closedCh: make(chan struct{}),
232 1 : }
233 1 : d.mu.versions = &versionSet{}
234 1 : d.diskAvailBytes.Store(math.MaxUint64)
235 1 : d.problemSpans.Init(manifest.NumLevels, opts.Comparer.Compare)
236 1 :
237 1 : defer func() {
238 1 : // If an error or panic occurs during open, attempt to release the manually
239 1 : // allocated memory resources. Note that rather than look for an error, we
240 1 : // look for the return of a nil DB pointer.
241 1 : if r := recover(); db == nil {
242 1 : // If there's an unused, recycled memtable, we need to release its memory.
243 1 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
244 1 : d.freeMemTable(obsoleteMemTable)
245 1 : }
246 :
247 1 : if d.fileCache != nil {
248 1 : _ = d.fileCache.Close()
249 1 : }
250 1 : d.cacheHandle.Close()
251 1 :
252 1 : for _, mem := range d.mu.mem.queue {
253 1 : switch t := mem.flushable.(type) {
254 1 : case *memTable:
255 1 : manual.Free(manual.MemTable, t.arenaBuf)
256 1 : t.arenaBuf = manual.Buf{}
257 : }
258 : }
259 1 : if d.cleanupManager != nil {
260 1 : d.cleanupManager.Close()
261 1 : }
262 1 : if d.objProvider != nil {
263 1 : _ = d.objProvider.Close()
264 1 : }
265 1 : if r != nil {
266 1 : panic(r)
267 : }
268 : }
269 : }()
270 :
271 1 : d.commit = newCommitPipeline(commitEnv{
272 1 : logSeqNum: &d.mu.versions.logSeqNum,
273 1 : visibleSeqNum: &d.mu.versions.visibleSeqNum,
274 1 : apply: d.commitApply,
275 1 : write: d.commitWrite,
276 1 : })
277 1 : d.mu.nextJobID = 1
278 1 : d.mu.mem.nextSize = opts.MemTableSize
279 1 : if d.mu.mem.nextSize > initialMemTableSize {
280 1 : d.mu.mem.nextSize = initialMemTableSize
281 1 : }
282 1 : d.mu.compact.cond.L = &d.mu.Mutex
283 1 : d.mu.compact.inProgress = make(map[*compaction]struct{})
284 1 : d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
285 1 : d.mu.snapshots.init()
286 1 : // logSeqNum is the next sequence number that will be assigned.
287 1 : // Start assigning sequence numbers from base.SeqNumStart to leave
288 1 : // room for reserved sequence numbers (see comments around
289 1 : // SeqNumStart).
290 1 : d.mu.versions.logSeqNum.Store(base.SeqNumStart)
291 1 : d.mu.formatVers.vers.Store(uint64(formatVersion))
292 1 : d.mu.formatVers.marker = formatVersionMarker
293 1 :
294 1 : d.timeNow = time.Now
295 1 : d.openedAt = d.timeNow()
296 1 :
297 1 : d.mu.Lock()
298 1 : defer d.mu.Unlock()
299 1 :
300 1 : jobID := d.newJobIDLocked()
301 1 :
302 1 : providerSettings := objstorageprovider.Settings{
303 1 : Logger: opts.Logger,
304 1 : FS: opts.FS,
305 1 : FSDirName: dirname,
306 1 : FSDirInitialListing: ls,
307 1 : FSCleaner: opts.Cleaner,
308 1 : NoSyncOnClose: opts.NoSyncOnClose,
309 1 : BytesPerSync: opts.BytesPerSync,
310 1 : }
311 1 : providerSettings.Local.ReadaheadConfig = opts.Local.ReadaheadConfig
312 1 : providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
313 1 : providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
314 1 : providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
315 1 : providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
316 1 :
317 1 : d.objProvider, err = objstorageprovider.Open(providerSettings)
318 1 : if err != nil {
319 1 : return nil, err
320 1 : }
321 :
322 1 : if !manifestExists {
323 1 : // DB does not exist.
324 1 : if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
325 1 : return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
326 1 : }
327 :
328 : // Create the DB.
329 1 : if err := d.mu.versions.create(
330 1 : jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
331 1 : return nil, err
332 1 : }
333 1 : } else {
334 1 : if opts.ErrorIfExists {
335 1 : return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
336 1 : }
337 : // Load the version set.
338 1 : if err := d.mu.versions.load(
339 1 : dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
340 1 : return nil, err
341 1 : }
342 1 : if opts.ErrorIfNotPristine {
343 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
344 1 : d.mu.versions.addLiveFileNums(liveFileNums)
345 1 : if len(liveFileNums) != 0 {
346 1 : return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
347 1 : }
348 : }
349 : }
350 :
351 : // In read-only mode, we replay directly into the mutable memtable but never
352 : // flush it. We need to delay creation of the memtable until we know the
353 : // sequence number of the first batch that will be inserted.
354 1 : if !d.opts.ReadOnly {
355 1 : var entry *flushableEntry
356 1 : d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
357 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
358 1 : }
359 :
360 1 : d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
361 1 : Buckets: FsyncLatencyBuckets,
362 1 : })
363 1 : walOpts := wal.Options{
364 1 : Primary: wal.Dir{FS: opts.FS, Dirname: walDirname},
365 1 : Secondary: wal.Dir{},
366 1 : MinUnflushedWALNum: wal.NumWAL(d.mu.versions.minUnflushedLogNum),
367 1 : MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
368 1 : NoSyncOnClose: opts.NoSyncOnClose,
369 1 : BytesPerSync: opts.WALBytesPerSync,
370 1 : PreallocateSize: d.walPreallocateSize,
371 1 : MinSyncInterval: opts.WALMinSyncInterval,
372 1 : FsyncLatency: d.mu.log.metrics.fsyncLatency,
373 1 : QueueSemChan: d.commit.logSyncQSem,
374 1 : Logger: opts.Logger,
375 1 : EventListener: walEventListenerAdaptor{l: opts.EventListener},
376 1 : WriteWALSyncOffsets: func() bool { return d.FormatMajorVersion() >= FormatWALSyncChunks },
377 : }
378 1 : if opts.WALFailover != nil {
379 1 : walOpts.Secondary = opts.WALFailover.Secondary
380 1 : walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
381 1 : walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
382 1 : Buckets: FsyncLatencyBuckets,
383 1 : })
384 1 : }
385 1 : walDirs := append(walOpts.Dirs(), opts.WALRecoveryDirs...)
386 1 : wals, err := wal.Scan(walDirs...)
387 1 : if err != nil {
388 1 : return nil, err
389 1 : }
390 1 : walManager, err := wal.Init(walOpts, wals)
391 1 : if err != nil {
392 1 : return nil, err
393 1 : }
394 1 : defer func() {
395 1 : if db == nil {
396 1 : _ = walManager.Close()
397 1 : }
398 : }()
399 :
400 1 : d.mu.log.manager = walManager
401 1 :
402 1 : d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
403 1 :
404 1 : if manifestExists && !opts.DisableConsistencyCheck {
405 1 : curVersion := d.mu.versions.currentVersion()
406 1 : if err := checkConsistency(curVersion, d.objProvider); err != nil {
407 1 : return nil, err
408 1 : }
409 : }
410 :
411 1 : fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
412 1 : if opts.FileCache == nil {
413 1 : opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
414 1 : defer opts.FileCache.Unref()
415 1 : }
416 1 : d.fileCache = opts.FileCache.newHandle(d.cacheHandle, d.objProvider, d.opts.LoggerAndTracer, d.opts.MakeReaderOptions(), d.reportCorruption)
417 1 : d.newIters = d.fileCache.newIters
418 1 : d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
419 1 :
420 1 : d.mu.annotators.totalSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
421 1 : return true
422 1 : })
423 1 : d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
424 1 : meta, err := d.objProvider.Lookup(base.FileTypeTable, f.FileBacking.DiskFileNum)
425 1 : if err != nil {
426 0 : return false
427 0 : }
428 1 : return meta.IsRemote()
429 : })
430 1 : d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.TableMetadata) bool {
431 1 : meta, err := d.objProvider.Lookup(base.FileTypeTable, f.FileBacking.DiskFileNum)
432 1 : if err != nil {
433 0 : return false
434 0 : }
435 1 : return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
436 : })
437 :
438 1 : var previousOptionsFileNum base.DiskFileNum
439 1 : var previousOptionsFilename string
440 1 : for _, filename := range ls {
441 1 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
442 1 : if !ok {
443 1 : continue
444 : }
445 :
446 : // Don't reuse any obsolete file numbers to avoid modifying an
447 : // ingested sstable's original external file.
448 1 : d.mu.versions.markFileNumUsed(fn)
449 1 :
450 1 : switch ft {
451 0 : case base.FileTypeLog:
452 : // Ignore.
453 1 : case base.FileTypeOptions:
454 1 : if previousOptionsFileNum < fn {
455 1 : previousOptionsFileNum = fn
456 1 : previousOptionsFilename = filename
457 1 : }
458 1 : case base.FileTypeTemp, base.FileTypeOldTemp:
459 1 : if !d.opts.ReadOnly {
460 1 : // Some codepaths write to a temporary file and then
461 1 : // rename it to its final location when complete. A
462 1 : // temp file is leftover if a process exits before the
463 1 : // rename. Remove it.
464 1 : err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
465 1 : if err != nil {
466 0 : return nil, err
467 0 : }
468 : }
469 : }
470 : }
471 1 : if n := len(wals); n > 0 {
472 1 : // Don't reuse any obsolete file numbers to avoid modifying an
473 1 : // ingested sstable's original external file.
474 1 : d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
475 1 : }
476 :
477 : // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
478 : // objProvider. This avoids FileNum collisions with obsolete sstables.
479 1 : objects := d.objProvider.List()
480 1 : for _, obj := range objects {
481 1 : d.mu.versions.markFileNumUsed(obj.DiskFileNum)
482 1 : }
483 :
484 : // Validate the most-recent OPTIONS file, if there is one.
485 1 : if previousOptionsFilename != "" {
486 1 : path := opts.FS.PathJoin(dirname, previousOptionsFilename)
487 1 : previousOptions, err := readOptionsFile(opts, path)
488 1 : if err != nil {
489 0 : return nil, err
490 0 : }
491 1 : if err := opts.CheckCompatibility(previousOptions); err != nil {
492 1 : return nil, err
493 1 : }
494 : }
495 :
496 : // Replay any newer log files than the ones named in the manifest.
497 1 : var replayWALs wal.Logs
498 1 : for i, w := range wals {
499 1 : if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
500 1 : replayWALs = wals[i:]
501 1 : break
502 : }
503 : }
504 1 : var flushableIngests []*ingestedFlushable
505 1 : for i, lf := range replayWALs {
506 1 : // WALs other than the last one would have been closed cleanly.
507 1 : //
508 1 : // Note: we used to never require strict WAL tails when reading from older
509 1 : // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
510 1 : // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
511 1 : // compatible Pebble format is newer and guarantees a clean EOF.
512 1 : strictWALTail := i < len(replayWALs)-1
513 1 : fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
514 1 : if err != nil {
515 1 : return nil, err
516 1 : }
517 1 : if len(fi) > 0 {
518 1 : flushableIngests = append(flushableIngests, fi...)
519 1 : }
520 1 : if d.mu.versions.logSeqNum.Load() < maxSeqNum {
521 1 : d.mu.versions.logSeqNum.Store(maxSeqNum)
522 1 : }
523 : }
524 1 : if d.mu.mem.mutable == nil {
525 1 : // Recreate the mutable memtable if replayWAL got rid of it.
526 1 : var entry *flushableEntry
527 1 : d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
528 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
529 1 : }
530 1 : d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
531 1 :
532 1 : // Register with the CompactionScheduler before calling
533 1 : // d.maybeScheduleFlush, since completion of the flush can trigger
534 1 : // compactions.
535 1 : d.opts.Experimental.CompactionScheduler.Register(2, d)
536 1 : if !d.opts.ReadOnly {
537 1 : d.maybeScheduleFlush()
538 1 : for d.mu.compact.flushing {
539 1 : d.mu.compact.cond.Wait()
540 1 : }
541 :
542 : // Create an empty .log file for the mutable memtable.
543 1 : newLogNum := d.mu.versions.getNextDiskFileNum()
544 1 : d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
545 1 : if err != nil {
546 1 : return nil, err
547 1 : }
548 :
549 : // This isn't strictly necessary as we don't use the log number for
550 : // memtables being flushed, only for the next unflushed memtable.
551 1 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
552 : }
553 1 : d.updateReadStateLocked(d.opts.DebugCheck)
554 1 :
555 1 : if !d.opts.ReadOnly {
556 1 : // If the Options specify a format major version higher than the
557 1 : // loaded database's, upgrade it. If this is a new database, this
558 1 : // code path also performs an initial upgrade from the starting
559 1 : // implicit MinSupported version.
560 1 : //
561 1 : // We ratchet the version this far into Open so that migrations have a read
562 1 : // state available. Note that this also results in creating/updating the
563 1 : // format version marker file.
564 1 : if opts.FormatMajorVersion > d.FormatMajorVersion() {
565 1 : if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
566 0 : return nil, err
567 0 : }
568 1 : } else if noFormatVersionMarker {
569 1 : // We are creating a new store. Create the format version marker file.
570 1 : if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
571 1 : return nil, err
572 1 : }
573 : }
574 :
575 : // Write the current options to disk.
576 1 : d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
577 1 : tmpPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeTemp, d.optionsFileNum)
578 1 : optionsPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeOptions, d.optionsFileNum)
579 1 :
580 1 : // Write them to a temporary file first, in case we crash before
581 1 : // we're done. A corrupt options file prevents opening the
582 1 : // database.
583 1 : optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
584 1 : if err != nil {
585 1 : return nil, err
586 1 : }
587 1 : serializedOpts := []byte(opts.String())
588 1 : if _, err := optionsFile.Write(serializedOpts); err != nil {
589 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
590 1 : }
591 1 : d.optionsFileSize = uint64(len(serializedOpts))
592 1 : if err := optionsFile.Sync(); err != nil {
593 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
594 1 : }
595 1 : if err := optionsFile.Close(); err != nil {
596 0 : return nil, err
597 0 : }
598 : // Atomically rename to the OPTIONS-XXXXXX path. This rename is
599 : // guaranteed to be atomic because the destination path does not
600 : // exist.
601 1 : if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
602 1 : return nil, err
603 1 : }
604 1 : if err := d.dataDir.Sync(); err != nil {
605 1 : return nil, err
606 1 : }
607 : }
608 :
609 1 : if !d.opts.ReadOnly {
610 1 : // Get a fresh list of files, in case some of the earlier flushes/compactions
611 1 : // have deleted some files.
612 1 : ls, err := opts.FS.List(dirname)
613 1 : if err != nil {
614 1 : return nil, err
615 1 : }
616 1 : d.scanObsoleteFiles(ls, flushableIngests)
617 1 : d.deleteObsoleteFiles(jobID)
618 : }
619 : // Else, nothing is obsolete.
620 :
621 1 : d.mu.tableStats.cond.L = &d.mu.Mutex
622 1 : d.mu.tableValidation.cond.L = &d.mu.Mutex
623 1 : if !d.opts.ReadOnly {
624 1 : d.maybeCollectTableStatsLocked()
625 1 : }
626 1 : d.calculateDiskAvailableBytes()
627 1 :
628 1 : d.maybeScheduleFlush()
629 1 : d.maybeScheduleCompaction()
630 1 :
631 1 : // Note: this is a no-op if invariants are disabled or race is enabled.
632 1 : //
633 1 : // Setting a finalizer on *DB causes *DB to never be reclaimed and the
634 1 : // finalizer to never be run. The problem is due to this limitation of
635 1 : // finalizers mention in the SetFinalizer docs:
636 1 : //
637 1 : // If a cyclic structure includes a block with a finalizer, that cycle is
638 1 : // not guaranteed to be garbage collected and the finalizer is not
639 1 : // guaranteed to run, because there is no ordering that respects the
640 1 : // dependencies.
641 1 : //
642 1 : // DB has cycles with several of its internal structures: readState,
643 1 : // newIters, fileCache, versions, etc. Each of this individually cause a
644 1 : // cycle and prevent the finalizer from being run. But we can workaround this
645 1 : // finializer limitation by setting a finalizer on another object that is
646 1 : // tied to the lifetime of DB: the DB.closed atomic.Value.
647 1 : dPtr := fmt.Sprintf("%p", d)
648 1 : invariants.SetFinalizer(d.closed, func(obj interface{}) {
649 0 : v := obj.(*atomic.Value)
650 0 : if err := v.Load(); err == nil {
651 0 : fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
652 0 : os.Exit(1)
653 0 : }
654 : })
655 :
656 1 : return d, nil
657 : }
658 :
659 : // prepareAndOpenDirs opens the directories for the store (and creates them if
660 : // necessary).
661 : //
662 : // Returns an error if ReadOnly is set and the directories don't exist.
663 : func prepareAndOpenDirs(
664 : dirname string, opts *Options,
665 1 : ) (walDirname string, dataDir vfs.File, err error) {
666 1 : walDirname = opts.WALDir
667 1 : if opts.WALDir == "" {
668 1 : walDirname = dirname
669 1 : }
670 :
671 : // Create directories if needed.
672 1 : if !opts.ReadOnly {
673 1 : f, err := mkdirAllAndSyncParents(opts.FS, dirname)
674 1 : if err != nil {
675 1 : return "", nil, err
676 1 : }
677 1 : f.Close()
678 1 : if walDirname != dirname {
679 1 : f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
680 1 : if err != nil {
681 0 : return "", nil, err
682 0 : }
683 1 : f.Close()
684 : }
685 1 : if opts.WALFailover != nil {
686 1 : secondary := opts.WALFailover.Secondary
687 1 : f, err := mkdirAllAndSyncParents(secondary.FS, secondary.Dirname)
688 1 : if err != nil {
689 0 : return "", nil, err
690 0 : }
691 1 : f.Close()
692 : }
693 : }
694 :
695 1 : dataDir, err = opts.FS.OpenDir(dirname)
696 1 : if err != nil {
697 1 : if opts.ReadOnly && oserror.IsNotExist(err) {
698 1 : return "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
699 1 : }
700 1 : return "", nil, err
701 : }
702 1 : if opts.ReadOnly && walDirname != dirname {
703 1 : // Check that the wal dir exists.
704 1 : walDir, err := opts.FS.OpenDir(walDirname)
705 1 : if err != nil {
706 1 : dataDir.Close()
707 1 : return "", nil, err
708 1 : }
709 0 : walDir.Close()
710 : }
711 :
712 1 : return walDirname, dataDir, nil
713 : }
714 :
715 : // GetVersion returns the engine version string from the latest options
716 : // file present in dir. Used to check what Pebble or RocksDB version was last
717 : // used to write to the database stored in this directory. An empty string is
718 : // returned if no valid OPTIONS file with a version key was found.
719 1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
720 1 : ls, err := fs.List(dir)
721 1 : if err != nil {
722 0 : return "", err
723 0 : }
724 1 : var version string
725 1 : lastOptionsSeen := base.DiskFileNum(0)
726 1 : for _, filename := range ls {
727 1 : ft, fn, ok := base.ParseFilename(fs, filename)
728 1 : if !ok {
729 1 : continue
730 : }
731 1 : switch ft {
732 1 : case base.FileTypeOptions:
733 1 : // If this file has a higher number than the last options file
734 1 : // processed, reset version. This is because rocksdb often
735 1 : // writes multiple options files without deleting previous ones.
736 1 : // Otherwise, skip parsing this options file.
737 1 : if fn > lastOptionsSeen {
738 1 : version = ""
739 1 : lastOptionsSeen = fn
740 1 : } else {
741 0 : continue
742 : }
743 1 : f, err := fs.Open(fs.PathJoin(dir, filename))
744 1 : if err != nil {
745 0 : return "", err
746 0 : }
747 1 : data, err := io.ReadAll(f)
748 1 : f.Close()
749 1 :
750 1 : if err != nil {
751 0 : return "", err
752 0 : }
753 1 : err = parseOptions(string(data), parseOptionsFuncs{
754 1 : visitKeyValue: func(i, j int, section, key, value string) error {
755 1 : switch {
756 1 : case section == "Version":
757 1 : switch key {
758 1 : case "pebble_version":
759 1 : version = value
760 1 : case "rocksdb_version":
761 1 : version = fmt.Sprintf("rocksdb v%s", value)
762 : }
763 : }
764 1 : return nil
765 : },
766 : })
767 1 : if err != nil {
768 0 : return "", err
769 0 : }
770 : }
771 : }
772 1 : return version, nil
773 : }
774 :
775 : func (d *DB) replayIngestedFlushable(
776 : b *Batch, logNum base.DiskFileNum,
777 1 : ) (entry *flushableEntry, err error) {
778 1 : br := b.Reader()
779 1 : seqNum := b.SeqNum()
780 1 :
781 1 : fileNums := make([]base.DiskFileNum, 0, b.Count())
782 1 : var exciseSpan KeyRange
783 1 : addFileNum := func(encodedFileNum []byte) {
784 1 : fileNum, n := binary.Uvarint(encodedFileNum)
785 1 : if n <= 0 {
786 0 : panic("pebble: ingest sstable file num is invalid")
787 : }
788 1 : fileNums = append(fileNums, base.DiskFileNum(fileNum))
789 : }
790 :
791 1 : for i := 0; i < int(b.Count()); i++ {
792 1 : kind, key, val, ok, err := br.Next()
793 1 : if err != nil {
794 0 : return nil, err
795 0 : }
796 1 : if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
797 0 : panic("pebble: invalid batch key kind")
798 : }
799 1 : if !ok {
800 0 : panic("pebble: invalid batch count")
801 : }
802 1 : if kind == base.InternalKeyKindExcise {
803 0 : if exciseSpan.Valid() {
804 0 : panic("pebble: multiple excise spans in a single batch")
805 : }
806 0 : exciseSpan.Start = slices.Clone(key)
807 0 : exciseSpan.End = slices.Clone(val)
808 0 : continue
809 : }
810 1 : addFileNum(key)
811 : }
812 :
813 1 : if _, _, _, ok, err := br.Next(); err != nil {
814 0 : return nil, err
815 1 : } else if ok {
816 0 : panic("pebble: invalid number of entries in batch")
817 : }
818 :
819 1 : meta := make([]*tableMetadata, len(fileNums))
820 1 : var lastRangeKey keyspan.Span
821 1 : for i, n := range fileNums {
822 1 : readable, err := d.objProvider.OpenForReading(context.TODO(), base.FileTypeTable, n,
823 1 : objstorage.OpenOptions{MustExist: true})
824 1 : if err != nil {
825 0 : return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
826 0 : }
827 : // NB: ingestLoad1 will close readable.
828 1 : meta[i], lastRangeKey, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
829 1 : readable, d.cacheHandle, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
830 1 : if err != nil {
831 0 : return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
832 0 : }
833 : }
834 1 : if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
835 0 : return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
836 0 : d.opts.Comparer.FormatKey(lastRangeKey.End))
837 0 : }
838 :
839 1 : numFiles := len(meta)
840 1 : if exciseSpan.Valid() {
841 0 : numFiles++
842 0 : }
843 1 : if uint32(numFiles) != b.Count() {
844 0 : panic("pebble: couldn't load all files in WAL entry")
845 : }
846 :
847 1 : return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
848 : }
849 :
850 : // replayWAL replays the edits in the specified WAL. If the DB is in read
851 : // only mode, then the WALs are replayed into memtables and not flushed. If
852 : // the DB is not in read only mode, then the contents of the WAL are
853 : // guaranteed to be flushed when a flush is scheduled after this method is run.
854 : // Note that this flushing is very important for guaranteeing durability:
855 : // the application may have had a number of pending
856 : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
857 : // happened but the corresponding data may now be readable from the WAL (while
858 : // sitting in write-back caches in the kernel or the storage device). By
859 : // reading the WAL (including the non-fsynced data) and then flushing all
860 : // these changes (flush does fsyncs), we are able to guarantee that the
861 : // initial state of the DB is durable.
862 : //
863 : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
864 : // WALs into the flushable queue. Flushing of the queue is expected to be handled
865 : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
866 : //
867 : // d.mu must be held when calling this, but the mutex may be dropped and
868 : // re-acquired during the course of this method.
869 : func (d *DB) replayWAL(
870 : jobID JobID, ll wal.LogicalLog, strictWALTail bool,
871 1 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
872 1 : rr := ll.OpenForRead()
873 1 : defer func() { _ = rr.Close() }()
874 1 : var (
875 1 : b Batch
876 1 : buf bytes.Buffer
877 1 : mem *memTable
878 1 : entry *flushableEntry
879 1 : offset wal.Offset
880 1 : lastFlushOffset int64
881 1 : keysReplayed int64 // number of keys replayed
882 1 : batchesReplayed int64 // number of batches replayed
883 1 : )
884 1 :
885 1 : // TODO(jackson): This function is interspersed with panics, in addition to
886 1 : // corruption error propagation. Audit them to ensure we're truly only
887 1 : // panicking where the error points to Pebble bug and not user or
888 1 : // hardware-induced corruption.
889 1 :
890 1 : // "Flushes" (ie. closes off) the current memtable, if not nil.
891 1 : flushMem := func() {
892 1 : if mem == nil {
893 1 : return
894 1 : }
895 1 : mem.writerUnref()
896 1 : if d.mu.mem.mutable == mem {
897 1 : d.mu.mem.mutable = nil
898 1 : }
899 1 : entry.flushForced = !d.opts.ReadOnly
900 1 : var logSize uint64
901 1 : mergedOffset := offset.Physical + offset.PreviousFilesBytes
902 1 : if mergedOffset >= lastFlushOffset {
903 1 : logSize = uint64(mergedOffset - lastFlushOffset)
904 1 : }
905 : // Else, this was the initial memtable in the read-only case which must have
906 : // been empty, but we need to flush it since we don't want to add to it later.
907 1 : lastFlushOffset = mergedOffset
908 1 : entry.logSize = logSize
909 1 : mem, entry = nil, nil
910 : }
911 :
912 1 : mem = d.mu.mem.mutable
913 1 : if mem != nil {
914 1 : entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
915 1 : if !d.opts.ReadOnly {
916 1 : flushMem()
917 1 : }
918 : }
919 :
920 : // Creates a new memtable if there is no current memtable.
921 1 : ensureMem := func(seqNum base.SeqNum) {
922 1 : if mem != nil {
923 1 : return
924 1 : }
925 1 : mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
926 1 : d.mu.mem.mutable = mem
927 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
928 : }
929 :
930 1 : defer func() {
931 1 : if err != nil {
932 1 : err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
933 1 : }
934 : }()
935 :
936 1 : for {
937 1 : var r io.Reader
938 1 : var err error
939 1 : r, offset, err = rr.NextRecord()
940 1 : if err == nil {
941 1 : _, err = io.Copy(&buf, r)
942 1 : }
943 1 : if err != nil {
944 1 : // It is common to encounter a zeroed or invalid chunk due to WAL
945 1 : // preallocation and WAL recycling. However zeroed or invalid chunks
946 1 : // can also be a consequence of corruption / disk rot. When the log
947 1 : // reader encounters one of these cases, it attempts to disambiguate
948 1 : // by reading ahead looking for a future record. If a future chunk
949 1 : // indicates the chunk at the original offset should've been valid, it
950 1 : // surfaces record.ErrInvalidChunk or record.ErrZeroedChunk. These
951 1 : // errors are always indicative of corruption and data loss.
952 1 : //
953 1 : // Otherwise, the reader surfaces record.ErrUnexpectedEOF indicating
954 1 : // that the WAL terminated uncleanly and ambiguously. If the WAL is
955 1 : // the most recent logical WAL, the caller passes in
956 1 : // (strictWALTail=false), indicating we should tolerate the unclean
957 1 : // ending. If the WAL is an older WAL, the caller passes in
958 1 : // (strictWALTail=true), indicating that the WAL should have been
959 1 : // closed cleanly, and we should interpret the
960 1 : // `record.ErrUnexpectedEOF` as corruption and stop recovery.
961 1 : if errors.Is(err, io.EOF) {
962 1 : break
963 1 : } else if errors.Is(err, record.ErrUnexpectedEOF) && !strictWALTail {
964 1 : break
965 1 : } else if (errors.Is(err, record.ErrUnexpectedEOF) && strictWALTail) ||
966 1 : errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) {
967 1 : // If a read-ahead returns record.ErrInvalidChunk or
968 1 : // record.ErrZeroedChunk, then there's definitively corruption.
969 1 : //
970 1 : // If strictWALTail=true, then record.ErrUnexpectedEOF should
971 1 : // also be considered corruption because the strictWALTail
972 1 : // indicates we expect a clean end to the WAL.
973 1 : //
974 1 : // Other I/O related errors should not be marked with corruption
975 1 : // and simply returned.
976 1 : err = errors.Mark(err, ErrCorruption)
977 1 : }
978 :
979 1 : return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
980 : }
981 :
982 1 : if buf.Len() < batchrepr.HeaderLen {
983 0 : return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
984 0 : errors.Safe(base.DiskFileNum(ll.Num)), offset)
985 0 : }
986 :
987 1 : if d.opts.ErrorIfNotPristine {
988 1 : return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
989 1 : }
990 :
991 : // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
992 : // which is used below.
993 1 : b = Batch{}
994 1 : b.db = d
995 1 : if err := b.SetRepr(buf.Bytes()); err != nil {
996 0 : return nil, 0, err
997 0 : }
998 1 : seqNum := b.SeqNum()
999 1 : maxSeqNum = seqNum + base.SeqNum(b.Count())
1000 1 : keysReplayed += int64(b.Count())
1001 1 : batchesReplayed++
1002 1 : {
1003 1 : br := b.Reader()
1004 1 : if kind, _, _, ok, err := br.Next(); err != nil {
1005 0 : return nil, 0, err
1006 1 : } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
1007 1 : // We're in the flushable ingests (+ possibly excises) case.
1008 1 : //
1009 1 : // Ingests require an up-to-date view of the LSM to determine the target
1010 1 : // level of ingested sstables, and to accurately compute excises. Instead of
1011 1 : // doing an ingest in this function, we just enqueue a flushable ingest
1012 1 : // in the flushables queue and run a regular flush.
1013 1 : flushMem()
1014 1 : // mem is nil here.
1015 1 : entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
1016 1 : if err != nil {
1017 0 : return nil, 0, err
1018 0 : }
1019 1 : fi := entry.flushable.(*ingestedFlushable)
1020 1 : flushableIngests = append(flushableIngests, fi)
1021 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1022 1 : // A flushable ingest is always followed by a WAL rotation.
1023 1 : break
1024 : }
1025 : }
1026 :
1027 1 : if b.memTableSize >= uint64(d.largeBatchThreshold) {
1028 1 : flushMem()
1029 1 : // Make a copy of the data slice since it is currently owned by buf and will
1030 1 : // be reused in the next iteration.
1031 1 : b.data = slices.Clone(b.data)
1032 1 : b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
1033 1 : if err != nil {
1034 0 : return nil, 0, err
1035 0 : }
1036 1 : entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
1037 1 : // Disable memory accounting by adding a reader ref that will never be
1038 1 : // removed.
1039 1 : entry.readerRefs.Add(1)
1040 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1041 1 : } else {
1042 1 : ensureMem(seqNum)
1043 1 : if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
1044 0 : return nil, 0, err
1045 0 : }
1046 : // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
1047 : // batch may not initially fit, but will eventually fit (since it is smaller than
1048 : // largeBatchThreshold).
1049 1 : for err == arenaskl.ErrArenaFull {
1050 1 : flushMem()
1051 1 : ensureMem(seqNum)
1052 1 : err = mem.prepare(&b)
1053 1 : if err != nil && err != arenaskl.ErrArenaFull {
1054 0 : return nil, 0, err
1055 0 : }
1056 : }
1057 1 : if err = mem.apply(&b, seqNum); err != nil {
1058 0 : return nil, 0, err
1059 0 : }
1060 1 : mem.writerUnref()
1061 : }
1062 1 : buf.Reset()
1063 : }
1064 :
1065 1 : d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
1066 1 : jobID, ll.String(), offset, keysReplayed, batchesReplayed)
1067 1 : if !d.opts.ReadOnly {
1068 1 : flushMem()
1069 1 : }
1070 :
1071 : // mem is nil here, if !ReadOnly.
1072 1 : return flushableIngests, maxSeqNum, err
1073 : }
1074 :
1075 1 : func readOptionsFile(opts *Options, path string) (string, error) {
1076 1 : f, err := opts.FS.Open(path)
1077 1 : if err != nil {
1078 0 : return "", err
1079 0 : }
1080 1 : defer f.Close()
1081 1 :
1082 1 : data, err := io.ReadAll(f)
1083 1 : if err != nil {
1084 0 : return "", err
1085 0 : }
1086 1 : return string(data), nil
1087 : }
1088 :
1089 : // DBDesc briefly describes high-level state about a database.
1090 : type DBDesc struct {
1091 : // Exists is true if an existing database was found.
1092 : Exists bool
1093 : // FormatMajorVersion indicates the database's current format
1094 : // version.
1095 : FormatMajorVersion FormatMajorVersion
1096 : // ManifestFilename is the filename of the current active manifest,
1097 : // if the database exists.
1098 : ManifestFilename string
1099 : // OptionsFilename is the filename of the most recent OPTIONS file, if it
1100 : // exists.
1101 : OptionsFilename string
1102 : }
1103 :
1104 : // String implements fmt.Stringer.
1105 1 : func (d *DBDesc) String() string {
1106 1 : if !d.Exists {
1107 1 : return "uninitialized"
1108 1 : }
1109 1 : var buf bytes.Buffer
1110 1 : fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
1111 1 : fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
1112 1 : fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
1113 1 : return buf.String()
1114 : }
1115 :
1116 : // Peek looks for an existing database in dirname on the provided FS. It
1117 : // returns a brief description of the database. Peek is read-only and
1118 : // does not open the database
1119 1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
1120 1 : ls, err := fs.List(dirname)
1121 1 : if err != nil {
1122 1 : return nil, err
1123 1 : }
1124 :
1125 1 : vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
1126 1 : if err != nil {
1127 0 : return nil, err
1128 0 : }
1129 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1130 : // PeekMarker variant that avoids opening the directory.
1131 1 : if err := versMarker.Close(); err != nil {
1132 0 : return nil, err
1133 0 : }
1134 :
1135 : // Find the currently active manifest, if there is one.
1136 1 : manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
1137 1 : if err != nil {
1138 0 : return nil, err
1139 0 : }
1140 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1141 : // PeekMarker variant that avoids opening the directory.
1142 1 : if err := manifestMarker.Close(); err != nil {
1143 0 : return nil, err
1144 0 : }
1145 :
1146 1 : desc := &DBDesc{
1147 1 : Exists: exists,
1148 1 : FormatMajorVersion: vers,
1149 1 : }
1150 1 :
1151 1 : // Find the OPTIONS file with the highest file number within the list of
1152 1 : // directory entries.
1153 1 : var previousOptionsFileNum base.DiskFileNum
1154 1 : for _, filename := range ls {
1155 1 : ft, fn, ok := base.ParseFilename(fs, filename)
1156 1 : if !ok || ft != base.FileTypeOptions || fn < previousOptionsFileNum {
1157 1 : continue
1158 : }
1159 1 : previousOptionsFileNum = fn
1160 1 : desc.OptionsFilename = fs.PathJoin(dirname, filename)
1161 : }
1162 :
1163 1 : if exists {
1164 1 : desc.ManifestFilename = base.MakeFilepath(fs, dirname, base.FileTypeManifest, manifestFileNum)
1165 1 : }
1166 1 : return desc, nil
1167 : }
1168 :
1169 : // LockDirectory acquires the database directory lock in the named directory,
1170 : // preventing another process from opening the database. LockDirectory returns a
1171 : // handle to the held lock that may be passed to Open through Options.Lock to
1172 : // subsequently open the database, skipping lock acquistion during Open.
1173 : //
1174 : // LockDirectory may be used to expand the critical section protected by the
1175 : // database lock to include setup before the call to Open.
1176 1 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
1177 1 : fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, base.FileTypeLock, base.DiskFileNum(0)))
1178 1 : if err != nil {
1179 1 : return nil, err
1180 1 : }
1181 1 : l := &Lock{dirname: dirname, fileLock: fileLock}
1182 1 : l.refs.Store(1)
1183 1 : invariants.SetFinalizer(l, func(obj interface{}) {
1184 1 : if refs := obj.(*Lock).refs.Load(); refs > 0 {
1185 0 : panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
1186 : }
1187 : })
1188 1 : return l, nil
1189 : }
1190 :
1191 : // Lock represents a file lock on a directory. It may be passed to Open through
1192 : // Options.Lock to elide lock aquisition during Open.
1193 : type Lock struct {
1194 : dirname string
1195 : fileLock io.Closer
1196 : // refs is a count of the number of handles on the lock. refs must be 0, 1
1197 : // or 2.
1198 : //
1199 : // When acquired by the client and passed to Open, refs = 1 and the Open
1200 : // call increments it to 2. When the database is closed, it's decremented to
1201 : // 1. Finally when the original caller, calls Close on the Lock, it's
1202 : // drecemented to zero and the underlying file lock is released.
1203 : //
1204 : // When Open acquires the file lock, refs remains at 1 until the database is
1205 : // closed.
1206 : refs atomic.Int32
1207 : }
1208 :
1209 1 : func (l *Lock) refForOpen() error {
1210 1 : // During Open, when a user passed in a lock, the reference count must be
1211 1 : // exactly 1. If it's zero, the lock is no longer held and is invalid. If
1212 1 : // it's 2, the lock is already in use by another database within the
1213 1 : // process.
1214 1 : if !l.refs.CompareAndSwap(1, 2) {
1215 1 : return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
1216 1 : }
1217 1 : return nil
1218 : }
1219 :
1220 : // Close releases the lock, permitting another process to lock and open the
1221 : // database. Close must not be called until after a database using the Lock has
1222 : // been closed.
1223 1 : func (l *Lock) Close() error {
1224 1 : if l.refs.Add(-1) > 0 {
1225 1 : return nil
1226 1 : }
1227 1 : defer func() { l.fileLock = nil }()
1228 1 : return l.fileLock.Close()
1229 : }
1230 :
1231 1 : func (l *Lock) pathMatches(dirname string) error {
1232 1 : if dirname == l.dirname {
1233 1 : return nil
1234 1 : }
1235 : // Check for relative paths, symlinks, etc. This isn't ideal because we're
1236 : // circumventing the vfs.FS interface here.
1237 : //
1238 : // TODO(jackson): We could add support for retrieving file inodes through Stat
1239 : // calls in the VFS interface on platforms where it's available and use that
1240 : // to differentiate.
1241 1 : dirStat, err1 := os.Stat(dirname)
1242 1 : lockDirStat, err2 := os.Stat(l.dirname)
1243 1 : if err1 == nil && err2 == nil && os.SameFile(dirStat, lockDirStat) {
1244 1 : return nil
1245 1 : }
1246 0 : return errors.Join(
1247 0 : errors.Newf("pebble: opts.Lock acquired in %q not %q", l.dirname, dirname),
1248 0 : err1, err2)
1249 : }
1250 :
1251 : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
1252 : // does not exist.
1253 : //
1254 : // Note that errors can be wrapped with more details; use errors.Is().
1255 : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
1256 :
1257 : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
1258 : // already exists.
1259 : //
1260 : // Note that errors can be wrapped with more details; use errors.Is().
1261 : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
1262 :
1263 : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
1264 : // already exists and is not pristine.
1265 : //
1266 : // Note that errors can be wrapped with more details; use errors.Is().
1267 : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
1268 :
1269 1 : func checkConsistency(v *manifest.Version, objProvider objstorage.Provider) error {
1270 1 : var errs []error
1271 1 : dedup := make(map[base.DiskFileNum]struct{})
1272 1 : for level, files := range v.Levels {
1273 1 : for f := range files.All() {
1274 1 : backingState := f.FileBacking
1275 1 : if _, ok := dedup[backingState.DiskFileNum]; ok {
1276 1 : continue
1277 : }
1278 1 : dedup[backingState.DiskFileNum] = struct{}{}
1279 1 : fileNum := backingState.DiskFileNum
1280 1 : fileSize := backingState.Size
1281 1 : // We skip over remote objects; those are instead checked asynchronously
1282 1 : // by the table stats loading job.
1283 1 : meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
1284 1 : var size int64
1285 1 : if err == nil {
1286 1 : if meta.IsRemote() {
1287 1 : continue
1288 : }
1289 1 : size, err = objProvider.Size(meta)
1290 : }
1291 1 : if err != nil {
1292 1 : errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
1293 1 : continue
1294 : }
1295 :
1296 1 : if size != int64(fileSize) {
1297 0 : errs = append(errs, errors.Errorf(
1298 0 : "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
1299 0 : errors.Safe(level), fileNum, objProvider.Path(meta),
1300 0 : errors.Safe(size), errors.Safe(fileSize)))
1301 0 : continue
1302 : }
1303 : }
1304 : }
1305 1 : return errors.Join(errs...)
1306 : }
1307 :
1308 : type walEventListenerAdaptor struct {
1309 : l *EventListener
1310 : }
1311 :
1312 1 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
1313 1 : // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
1314 1 : // is insufficient to infer whether primary or secondary.
1315 1 : wci := WALCreateInfo{
1316 1 : JobID: ci.JobID,
1317 1 : Path: ci.Path,
1318 1 : FileNum: base.DiskFileNum(ci.Num),
1319 1 : RecycledFileNum: ci.RecycledFileNum,
1320 1 : Err: ci.Err,
1321 1 : }
1322 1 : l.l.WALCreated(wci)
1323 1 : }
|