Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "math"
14 : "os"
15 : "slices"
16 : "sync/atomic"
17 : "time"
18 :
19 : "github.com/cockroachdb/crlib/crtime"
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/errors/oserror"
22 : "github.com/cockroachdb/pebble/batchrepr"
23 : "github.com/cockroachdb/pebble/internal/arenaskl"
24 : "github.com/cockroachdb/pebble/internal/base"
25 : "github.com/cockroachdb/pebble/internal/cache"
26 : "github.com/cockroachdb/pebble/internal/constants"
27 : "github.com/cockroachdb/pebble/internal/invariants"
28 : "github.com/cockroachdb/pebble/internal/keyspan"
29 : "github.com/cockroachdb/pebble/internal/manifest"
30 : "github.com/cockroachdb/pebble/internal/manual"
31 : "github.com/cockroachdb/pebble/objstorage"
32 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
33 : "github.com/cockroachdb/pebble/objstorage/remote"
34 : "github.com/cockroachdb/pebble/record"
35 : "github.com/cockroachdb/pebble/sstable"
36 : "github.com/cockroachdb/pebble/vfs"
37 : "github.com/cockroachdb/pebble/wal"
38 : "github.com/prometheus/client_golang/prometheus"
39 : )
40 :
41 : const (
42 : initialMemTableSize = 256 << 10 // 256 KB
43 :
44 : // The max batch size is limited by the uint32 offsets stored in
45 : // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
46 : //
47 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
48 : // end of an allocation fits in uint32.
49 : //
50 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
51 : // 2GB).
52 : maxBatchSize = constants.MaxUint32OrInt
53 :
54 : // The max memtable size is limited by the uint32 offsets stored in
55 : // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
56 : //
57 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
58 : // end of an allocation fits in uint32.
59 : //
60 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
61 : // 2GB).
62 : maxMemTableSize = constants.MaxUint32OrInt
63 : )
64 :
65 : // FileCacheSize can be used to determine the file
66 : // cache size for a single db, given the maximum open
67 : // files which can be used by a file cache which is
68 : // only used by a single db.
69 2 : func FileCacheSize(maxOpenFiles int) int {
70 2 : fileCacheSize := maxOpenFiles - numNonFileCacheFiles
71 2 : if fileCacheSize < minFileCacheSize {
72 1 : fileCacheSize = minFileCacheSize
73 1 : }
74 2 : return fileCacheSize
75 : }
76 :
77 : // Open opens a DB whose files live in the given directory.
78 2 : func Open(dirname string, opts *Options) (db *DB, err error) {
79 2 : // Make a copy of the options so that we don't mutate the passed in options.
80 2 : opts = opts.Clone()
81 2 : opts = opts.EnsureDefaults()
82 2 : if err := opts.Validate(); err != nil {
83 0 : return nil, err
84 0 : }
85 2 : if opts.LoggerAndTracer == nil {
86 2 : opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
87 2 : } else {
88 1 : opts.Logger = opts.LoggerAndTracer
89 1 : }
90 :
91 2 : if invariants.Sometimes(5) {
92 2 : assertComparer := base.MakeAssertComparer(*opts.Comparer)
93 2 : opts.Comparer = &assertComparer
94 2 : }
95 :
96 : // In all error cases, we return db = nil; this is used by various
97 : // deferred cleanups.
98 :
99 : // Open the database and WAL directories first.
100 2 : walDirname, dataDir, err := prepareAndOpenDirs(dirname, opts)
101 2 : if err != nil {
102 1 : return nil, errors.Wrapf(err, "error opening database at %q", dirname)
103 1 : }
104 2 : defer func() {
105 2 : if db == nil {
106 1 : dataDir.Close()
107 1 : }
108 : }()
109 :
110 : // Lock the database directory.
111 2 : var fileLock *Lock
112 2 : if opts.Lock != nil {
113 1 : // The caller already acquired the database lock. Ensure that the
114 1 : // directory matches.
115 1 : if err := opts.Lock.pathMatches(dirname); err != nil {
116 0 : return nil, err
117 0 : }
118 1 : if err := opts.Lock.refForOpen(); err != nil {
119 1 : return nil, err
120 1 : }
121 1 : fileLock = opts.Lock
122 2 : } else {
123 2 : fileLock, err = LockDirectory(dirname, opts.FS)
124 2 : if err != nil {
125 1 : return nil, err
126 1 : }
127 : }
128 2 : defer func() {
129 2 : if db == nil {
130 1 : fileLock.Close()
131 1 : }
132 : }()
133 :
134 : // List the directory contents. This also happens to include WAL log files, if
135 : // they are in the same dir, but we will ignore those below. The provider is
136 : // also given this list, but it ignores non sstable files.
137 2 : ls, err := opts.FS.List(dirname)
138 2 : if err != nil {
139 1 : return nil, err
140 1 : }
141 :
142 : // Establish the format major version.
143 2 : formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
144 2 : if err != nil {
145 1 : return nil, err
146 1 : }
147 2 : defer func() {
148 2 : if db == nil {
149 1 : formatVersionMarker.Close()
150 1 : }
151 : }()
152 :
153 2 : noFormatVersionMarker := formatVersion == FormatDefault
154 2 : if noFormatVersionMarker {
155 2 : // We will initialize the store at the minimum possible format, then upgrade
156 2 : // the format to the desired one. This helps test the format upgrade code.
157 2 : formatVersion = FormatMinSupported
158 2 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
159 2 : formatVersion = FormatMinForSharedObjects
160 2 : }
161 : // There is no format version marker file. There are three cases:
162 : // - we are trying to open an existing store that was created at
163 : // FormatMostCompatible (the only one without a version marker file)
164 : // - we are creating a new store;
165 : // - we are retrying a failed creation.
166 : //
167 : // To error in the first case, we set ErrorIfNotPristine.
168 2 : opts.ErrorIfNotPristine = true
169 2 : defer func() {
170 2 : if err != nil && errors.Is(err, ErrDBNotPristine) {
171 0 : // We must be trying to open an existing store at FormatMostCompatible.
172 0 : // Correct the error in this case -we
173 0 : err = errors.Newf(
174 0 : "pebble: database %q written in format major version 1 which is no longer supported",
175 0 : dirname)
176 0 : }
177 : }()
178 2 : } else {
179 2 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone && formatVersion < FormatMinForSharedObjects {
180 0 : return nil, errors.Newf(
181 0 : "pebble: database %q configured with shared objects but written in too old format major version %d",
182 0 : dirname, formatVersion)
183 0 : }
184 : }
185 :
186 : // Find the currently active manifest, if there is one.
187 2 : manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
188 2 : if err != nil {
189 1 : return nil, errors.Wrapf(err, "pebble: database %q", dirname)
190 1 : }
191 2 : defer func() {
192 2 : if db == nil {
193 1 : manifestMarker.Close()
194 1 : }
195 : }()
196 :
197 : // Atomic markers may leave behind obsolete files if there's a crash
198 : // mid-update. Clean these up if we're not in read-only mode.
199 2 : if !opts.ReadOnly {
200 2 : if err := formatVersionMarker.RemoveObsolete(); err != nil {
201 0 : return nil, err
202 0 : }
203 2 : if err := manifestMarker.RemoveObsolete(); err != nil {
204 0 : return nil, err
205 0 : }
206 : }
207 :
208 2 : if opts.Cache == nil {
209 1 : opts.Cache = cache.New(cacheDefaultSize)
210 2 : } else {
211 2 : opts.Cache.Ref()
212 2 : }
213 :
214 2 : d := &DB{
215 2 : cacheID: opts.Cache.NewID(),
216 2 : dirname: dirname,
217 2 : opts: opts,
218 2 : cmp: opts.Comparer.Compare,
219 2 : equal: opts.Comparer.Equal,
220 2 : merge: opts.Merger.Merge,
221 2 : split: opts.Comparer.Split,
222 2 : abbreviatedKey: opts.Comparer.AbbreviatedKey,
223 2 : largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
224 2 : fileLock: fileLock,
225 2 : dataDir: dataDir,
226 2 : closed: new(atomic.Value),
227 2 : closedCh: make(chan struct{}),
228 2 : }
229 2 : d.mu.versions = &versionSet{}
230 2 : d.diskAvailBytes.Store(math.MaxUint64)
231 2 :
232 2 : defer func() {
233 2 : // If an error or panic occurs during open, attempt to release the manually
234 2 : // allocated memory resources. Note that rather than look for an error, we
235 2 : // look for the return of a nil DB pointer.
236 2 : if r := recover(); db == nil {
237 1 : // If there's an unused, recycled memtable, we need to release its memory.
238 1 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
239 1 : d.freeMemTable(obsoleteMemTable)
240 1 : }
241 :
242 : // Release our references to the Cache. Note that both the DB, and
243 : // fileCache have a reference. When we release the reference to
244 : // the fileCache, and if there are no other references to
245 : // the fileCache, then the fileCache will also release its
246 : // reference to the cache.
247 1 : opts.Cache.Unref()
248 1 :
249 1 : if d.fileCache != nil {
250 1 : _ = d.fileCache.close()
251 1 : }
252 :
253 1 : for _, mem := range d.mu.mem.queue {
254 1 : switch t := mem.flushable.(type) {
255 1 : case *memTable:
256 1 : manual.Free(manual.MemTable, t.arenaBuf)
257 1 : t.arenaBuf = nil
258 : }
259 : }
260 1 : if d.cleanupManager != nil {
261 1 : d.cleanupManager.Close()
262 1 : }
263 1 : if d.objProvider != nil {
264 1 : d.objProvider.Close()
265 1 : }
266 1 : if r != nil {
267 1 : panic(r)
268 : }
269 : }
270 : }()
271 :
272 2 : d.commit = newCommitPipeline(commitEnv{
273 2 : logSeqNum: &d.mu.versions.logSeqNum,
274 2 : visibleSeqNum: &d.mu.versions.visibleSeqNum,
275 2 : apply: d.commitApply,
276 2 : write: d.commitWrite,
277 2 : })
278 2 : d.mu.nextJobID = 1
279 2 : d.mu.mem.nextSize = opts.MemTableSize
280 2 : if d.mu.mem.nextSize > initialMemTableSize {
281 2 : d.mu.mem.nextSize = initialMemTableSize
282 2 : }
283 2 : d.mu.compact.cond.L = &d.mu.Mutex
284 2 : d.mu.compact.inProgress = make(map[*compaction]struct{})
285 2 : d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
286 2 : d.mu.snapshots.init()
287 2 : // logSeqNum is the next sequence number that will be assigned.
288 2 : // Start assigning sequence numbers from base.SeqNumStart to leave
289 2 : // room for reserved sequence numbers (see comments around
290 2 : // SeqNumStart).
291 2 : d.mu.versions.logSeqNum.Store(base.SeqNumStart)
292 2 : d.mu.formatVers.vers.Store(uint64(formatVersion))
293 2 : d.mu.formatVers.marker = formatVersionMarker
294 2 :
295 2 : d.timeNow = time.Now
296 2 : d.openedAt = d.timeNow()
297 2 :
298 2 : d.mu.Lock()
299 2 : defer d.mu.Unlock()
300 2 :
301 2 : jobID := d.newJobIDLocked()
302 2 :
303 2 : providerSettings := objstorageprovider.Settings{
304 2 : Logger: opts.Logger,
305 2 : FS: opts.FS,
306 2 : FSDirName: dirname,
307 2 : FSDirInitialListing: ls,
308 2 : FSCleaner: opts.Cleaner,
309 2 : NoSyncOnClose: opts.NoSyncOnClose,
310 2 : BytesPerSync: opts.BytesPerSync,
311 2 : }
312 2 : providerSettings.Local.ReadaheadConfig = opts.Local.ReadaheadConfig
313 2 : providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
314 2 : providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
315 2 : providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
316 2 : providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
317 2 :
318 2 : d.objProvider, err = objstorageprovider.Open(providerSettings)
319 2 : if err != nil {
320 1 : return nil, err
321 1 : }
322 :
323 2 : if !manifestExists {
324 2 : // DB does not exist.
325 2 : if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
326 1 : return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
327 1 : }
328 :
329 : // Create the DB.
330 2 : if err := d.mu.versions.create(
331 2 : jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
332 1 : return nil, err
333 1 : }
334 2 : } else {
335 2 : if opts.ErrorIfExists {
336 1 : return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
337 1 : }
338 : // Load the version set.
339 2 : if err := d.mu.versions.load(
340 2 : dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
341 1 : return nil, err
342 1 : }
343 2 : if opts.ErrorIfNotPristine {
344 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
345 1 : d.mu.versions.addLiveFileNums(liveFileNums)
346 1 : if len(liveFileNums) != 0 {
347 1 : return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
348 1 : }
349 : }
350 : }
351 :
352 : // In read-only mode, we replay directly into the mutable memtable but never
353 : // flush it. We need to delay creation of the memtable until we know the
354 : // sequence number of the first batch that will be inserted.
355 2 : if !d.opts.ReadOnly {
356 2 : var entry *flushableEntry
357 2 : d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
358 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
359 2 : }
360 :
361 2 : d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
362 2 : Buckets: FsyncLatencyBuckets,
363 2 : })
364 2 : walOpts := wal.Options{
365 2 : Primary: wal.Dir{FS: opts.FS, Dirname: walDirname},
366 2 : Secondary: wal.Dir{},
367 2 : MinUnflushedWALNum: wal.NumWAL(d.mu.versions.minUnflushedLogNum),
368 2 : MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
369 2 : NoSyncOnClose: opts.NoSyncOnClose,
370 2 : BytesPerSync: opts.WALBytesPerSync,
371 2 : PreallocateSize: d.walPreallocateSize,
372 2 : MinSyncInterval: opts.WALMinSyncInterval,
373 2 : FsyncLatency: d.mu.log.metrics.fsyncLatency,
374 2 : QueueSemChan: d.commit.logSyncQSem,
375 2 : Logger: opts.Logger,
376 2 : EventListener: walEventListenerAdaptor{l: opts.EventListener},
377 2 : }
378 2 : if opts.WALFailover != nil {
379 2 : walOpts.Secondary = opts.WALFailover.Secondary
380 2 : walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
381 2 : walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
382 2 : Buckets: FsyncLatencyBuckets,
383 2 : })
384 2 : }
385 2 : walDirs := append(walOpts.Dirs(), opts.WALRecoveryDirs...)
386 2 : wals, err := wal.Scan(walDirs...)
387 2 : if err != nil {
388 1 : return nil, err
389 1 : }
390 2 : walManager, err := wal.Init(walOpts, wals)
391 2 : if err != nil {
392 1 : return nil, err
393 1 : }
394 2 : defer func() {
395 2 : if db == nil {
396 1 : walManager.Close()
397 1 : }
398 : }()
399 :
400 2 : d.mu.log.manager = walManager
401 2 :
402 2 : d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
403 2 :
404 2 : if manifestExists && !opts.DisableConsistencyCheck {
405 2 : curVersion := d.mu.versions.currentVersion()
406 2 : if err := checkConsistency(curVersion, dirname, d.objProvider); err != nil {
407 1 : return nil, err
408 1 : }
409 : }
410 :
411 2 : fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
412 2 : d.fileCache = newFileCacheContainer(
413 2 : opts.FileCache, d.cacheID, d.objProvider, d.opts, fileCacheSize,
414 2 : &sstable.CategoryStatsCollector{})
415 2 : d.newIters = d.fileCache.newIters
416 2 : d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
417 2 :
418 2 : d.mu.annotators.totalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
419 1 : return true
420 1 : })
421 2 : d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
422 1 : meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
423 1 : if err != nil {
424 0 : return false
425 0 : }
426 1 : return meta.IsRemote()
427 : })
428 2 : d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
429 1 : meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
430 1 : if err != nil {
431 0 : return false
432 0 : }
433 1 : return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
434 : })
435 :
436 2 : var previousOptionsFileNum base.DiskFileNum
437 2 : var previousOptionsFilename string
438 2 : for _, filename := range ls {
439 2 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
440 2 : if !ok {
441 2 : continue
442 : }
443 :
444 : // Don't reuse any obsolete file numbers to avoid modifying an
445 : // ingested sstable's original external file.
446 2 : d.mu.versions.markFileNumUsed(fn)
447 2 :
448 2 : switch ft {
449 0 : case fileTypeLog:
450 : // Ignore.
451 2 : case fileTypeOptions:
452 2 : if previousOptionsFileNum < fn {
453 2 : previousOptionsFileNum = fn
454 2 : previousOptionsFilename = filename
455 2 : }
456 1 : case fileTypeTemp, fileTypeOldTemp:
457 1 : if !d.opts.ReadOnly {
458 1 : // Some codepaths write to a temporary file and then
459 1 : // rename it to its final location when complete. A
460 1 : // temp file is leftover if a process exits before the
461 1 : // rename. Remove it.
462 1 : err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
463 1 : if err != nil {
464 0 : return nil, err
465 0 : }
466 : }
467 : }
468 : }
469 2 : if n := len(wals); n > 0 {
470 2 : // Don't reuse any obsolete file numbers to avoid modifying an
471 2 : // ingested sstable's original external file.
472 2 : d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
473 2 : }
474 :
475 : // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
476 : // objProvider. This avoids FileNum collisions with obsolete sstables.
477 2 : objects := d.objProvider.List()
478 2 : for _, obj := range objects {
479 2 : d.mu.versions.markFileNumUsed(obj.DiskFileNum)
480 2 : }
481 :
482 : // Validate the most-recent OPTIONS file, if there is one.
483 2 : if previousOptionsFilename != "" {
484 2 : path := opts.FS.PathJoin(dirname, previousOptionsFilename)
485 2 : previousOptions, err := readOptionsFile(opts, path)
486 2 : if err != nil {
487 0 : return nil, err
488 0 : }
489 2 : if err := opts.CheckCompatibility(previousOptions); err != nil {
490 1 : return nil, err
491 1 : }
492 : }
493 :
494 : // Replay any newer log files than the ones named in the manifest.
495 2 : var replayWALs wal.Logs
496 2 : for i, w := range wals {
497 2 : if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
498 2 : replayWALs = wals[i:]
499 2 : break
500 : }
501 : }
502 2 : var flushableIngests []*ingestedFlushable
503 2 : for i, lf := range replayWALs {
504 2 : // WALs other than the last one would have been closed cleanly.
505 2 : //
506 2 : // Note: we used to never require strict WAL tails when reading from older
507 2 : // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
508 2 : // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
509 2 : // compatible Pebble format is newer and guarantees a clean EOF.
510 2 : strictWALTail := i < len(replayWALs)-1
511 2 : fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
512 2 : if err != nil {
513 1 : return nil, err
514 1 : }
515 2 : if len(fi) > 0 {
516 2 : flushableIngests = append(flushableIngests, fi...)
517 2 : }
518 2 : if d.mu.versions.logSeqNum.Load() < maxSeqNum {
519 2 : d.mu.versions.logSeqNum.Store(maxSeqNum)
520 2 : }
521 : }
522 2 : if d.mu.mem.mutable == nil {
523 2 : // Recreate the mutable memtable if replayWAL got rid of it.
524 2 : var entry *flushableEntry
525 2 : d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
526 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
527 2 : }
528 2 : d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
529 2 :
530 2 : if !d.opts.ReadOnly {
531 2 : d.maybeScheduleFlush()
532 2 : for d.mu.compact.flushing {
533 2 : d.mu.compact.cond.Wait()
534 2 : }
535 :
536 : // Create an empty .log file for the mutable memtable.
537 2 : newLogNum := d.mu.versions.getNextDiskFileNum()
538 2 : d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
539 2 : if err != nil {
540 1 : return nil, err
541 1 : }
542 :
543 : // This isn't strictly necessary as we don't use the log number for
544 : // memtables being flushed, only for the next unflushed memtable.
545 2 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
546 : }
547 2 : d.updateReadStateLocked(d.opts.DebugCheck)
548 2 :
549 2 : if !d.opts.ReadOnly {
550 2 : // If the Options specify a format major version higher than the
551 2 : // loaded database's, upgrade it. If this is a new database, this
552 2 : // code path also performs an initial upgrade from the starting
553 2 : // implicit MinSupported version.
554 2 : //
555 2 : // We ratchet the version this far into Open so that migrations have a read
556 2 : // state available. Note that this also results in creating/updating the
557 2 : // format version marker file.
558 2 : if opts.FormatMajorVersion > d.FormatMajorVersion() {
559 2 : if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
560 0 : return nil, err
561 0 : }
562 2 : } else if noFormatVersionMarker {
563 2 : // We are creating a new store. Create the format version marker file.
564 2 : if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
565 1 : return nil, err
566 1 : }
567 : }
568 :
569 : // Write the current options to disk.
570 2 : d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
571 2 : tmpPath := base.MakeFilepath(opts.FS, dirname, fileTypeTemp, d.optionsFileNum)
572 2 : optionsPath := base.MakeFilepath(opts.FS, dirname, fileTypeOptions, d.optionsFileNum)
573 2 :
574 2 : // Write them to a temporary file first, in case we crash before
575 2 : // we're done. A corrupt options file prevents opening the
576 2 : // database.
577 2 : optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
578 2 : if err != nil {
579 1 : return nil, err
580 1 : }
581 2 : serializedOpts := []byte(opts.String())
582 2 : if _, err := optionsFile.Write(serializedOpts); err != nil {
583 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
584 1 : }
585 2 : d.optionsFileSize = uint64(len(serializedOpts))
586 2 : if err := optionsFile.Sync(); err != nil {
587 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
588 1 : }
589 2 : if err := optionsFile.Close(); err != nil {
590 0 : return nil, err
591 0 : }
592 : // Atomically rename to the OPTIONS-XXXXXX path. This rename is
593 : // guaranteed to be atomic because the destination path does not
594 : // exist.
595 2 : if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
596 1 : return nil, err
597 1 : }
598 2 : if err := d.dataDir.Sync(); err != nil {
599 1 : return nil, err
600 1 : }
601 : }
602 :
603 2 : if !d.opts.ReadOnly {
604 2 : // Get a fresh list of files, in case some of the earlier flushes/compactions
605 2 : // have deleted some files.
606 2 : ls, err := opts.FS.List(dirname)
607 2 : if err != nil {
608 1 : return nil, err
609 1 : }
610 2 : d.scanObsoleteFiles(ls, flushableIngests)
611 2 : d.deleteObsoleteFiles(jobID)
612 : }
613 : // Else, nothing is obsolete.
614 :
615 2 : d.mu.tableStats.cond.L = &d.mu.Mutex
616 2 : d.mu.tableValidation.cond.L = &d.mu.Mutex
617 2 : if !d.opts.ReadOnly {
618 2 : d.maybeCollectTableStatsLocked()
619 2 : }
620 2 : d.calculateDiskAvailableBytes()
621 2 :
622 2 : d.maybeScheduleFlush()
623 2 : d.maybeScheduleCompaction()
624 2 :
625 2 : // Note: this is a no-op if invariants are disabled or race is enabled.
626 2 : //
627 2 : // Setting a finalizer on *DB causes *DB to never be reclaimed and the
628 2 : // finalizer to never be run. The problem is due to this limitation of
629 2 : // finalizers mention in the SetFinalizer docs:
630 2 : //
631 2 : // If a cyclic structure includes a block with a finalizer, that cycle is
632 2 : // not guaranteed to be garbage collected and the finalizer is not
633 2 : // guaranteed to run, because there is no ordering that respects the
634 2 : // dependencies.
635 2 : //
636 2 : // DB has cycles with several of its internal structures: readState,
637 2 : // newIters, fileCache, versions, etc. Each of this individually cause a
638 2 : // cycle and prevent the finalizer from being run. But we can workaround this
639 2 : // finializer limitation by setting a finalizer on another object that is
640 2 : // tied to the lifetime of DB: the DB.closed atomic.Value.
641 2 : dPtr := fmt.Sprintf("%p", d)
642 2 : invariants.SetFinalizer(d.closed, func(obj interface{}) {
643 0 : v := obj.(*atomic.Value)
644 0 : if err := v.Load(); err == nil {
645 0 : fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
646 0 : os.Exit(1)
647 0 : }
648 : })
649 :
650 2 : return d, nil
651 : }
652 :
653 : // prepareAndOpenDirs opens the directories for the store (and creates them if
654 : // necessary).
655 : //
656 : // Returns an error if ReadOnly is set and the directories don't exist.
657 : func prepareAndOpenDirs(
658 : dirname string, opts *Options,
659 2 : ) (walDirname string, dataDir vfs.File, err error) {
660 2 : walDirname = opts.WALDir
661 2 : if opts.WALDir == "" {
662 2 : walDirname = dirname
663 2 : }
664 :
665 : // Create directories if needed.
666 2 : if !opts.ReadOnly {
667 2 : f, err := mkdirAllAndSyncParents(opts.FS, dirname)
668 2 : if err != nil {
669 1 : return "", nil, err
670 1 : }
671 2 : f.Close()
672 2 : if walDirname != dirname {
673 2 : f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
674 2 : if err != nil {
675 0 : return "", nil, err
676 0 : }
677 2 : f.Close()
678 : }
679 2 : if opts.WALFailover != nil {
680 2 : secondary := opts.WALFailover.Secondary
681 2 : f, err := mkdirAllAndSyncParents(secondary.FS, secondary.Dirname)
682 2 : if err != nil {
683 0 : return "", nil, err
684 0 : }
685 2 : f.Close()
686 : }
687 : }
688 :
689 2 : dataDir, err = opts.FS.OpenDir(dirname)
690 2 : if err != nil {
691 1 : if opts.ReadOnly && oserror.IsNotExist(err) {
692 1 : return "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
693 1 : }
694 1 : return "", nil, err
695 : }
696 2 : if opts.ReadOnly && walDirname != dirname {
697 1 : // Check that the wal dir exists.
698 1 : walDir, err := opts.FS.OpenDir(walDirname)
699 1 : if err != nil {
700 1 : dataDir.Close()
701 1 : return "", nil, err
702 1 : }
703 1 : walDir.Close()
704 : }
705 :
706 2 : return walDirname, dataDir, nil
707 : }
708 :
709 : // GetVersion returns the engine version string from the latest options
710 : // file present in dir. Used to check what Pebble or RocksDB version was last
711 : // used to write to the database stored in this directory. An empty string is
712 : // returned if no valid OPTIONS file with a version key was found.
713 1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
714 1 : ls, err := fs.List(dir)
715 1 : if err != nil {
716 0 : return "", err
717 0 : }
718 1 : var version string
719 1 : lastOptionsSeen := base.DiskFileNum(0)
720 1 : for _, filename := range ls {
721 1 : ft, fn, ok := base.ParseFilename(fs, filename)
722 1 : if !ok {
723 1 : continue
724 : }
725 1 : switch ft {
726 1 : case fileTypeOptions:
727 1 : // If this file has a higher number than the last options file
728 1 : // processed, reset version. This is because rocksdb often
729 1 : // writes multiple options files without deleting previous ones.
730 1 : // Otherwise, skip parsing this options file.
731 1 : if fn > lastOptionsSeen {
732 1 : version = ""
733 1 : lastOptionsSeen = fn
734 1 : } else {
735 1 : continue
736 : }
737 1 : f, err := fs.Open(fs.PathJoin(dir, filename))
738 1 : if err != nil {
739 0 : return "", err
740 0 : }
741 1 : data, err := io.ReadAll(f)
742 1 : f.Close()
743 1 :
744 1 : if err != nil {
745 0 : return "", err
746 0 : }
747 1 : err = parseOptions(string(data), parseOptionsFuncs{
748 1 : visitKeyValue: func(i, j int, section, key, value string) error {
749 1 : switch {
750 1 : case section == "Version":
751 1 : switch key {
752 1 : case "pebble_version":
753 1 : version = value
754 1 : case "rocksdb_version":
755 1 : version = fmt.Sprintf("rocksdb v%s", value)
756 : }
757 : }
758 1 : return nil
759 : },
760 : })
761 1 : if err != nil {
762 0 : return "", err
763 0 : }
764 : }
765 : }
766 1 : return version, nil
767 : }
768 :
769 : func (d *DB) replayIngestedFlushable(
770 : b *Batch, logNum base.DiskFileNum,
771 2 : ) (entry *flushableEntry, err error) {
772 2 : br := b.Reader()
773 2 : seqNum := b.SeqNum()
774 2 :
775 2 : fileNums := make([]base.DiskFileNum, 0, b.Count())
776 2 : var exciseSpan KeyRange
777 2 : addFileNum := func(encodedFileNum []byte) {
778 2 : fileNum, n := binary.Uvarint(encodedFileNum)
779 2 : if n <= 0 {
780 0 : panic("pebble: ingest sstable file num is invalid")
781 : }
782 2 : fileNums = append(fileNums, base.DiskFileNum(fileNum))
783 : }
784 :
785 2 : for i := 0; i < int(b.Count()); i++ {
786 2 : kind, key, val, ok, err := br.Next()
787 2 : if err != nil {
788 0 : return nil, err
789 0 : }
790 2 : if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
791 0 : panic("pebble: invalid batch key kind")
792 : }
793 2 : if !ok {
794 0 : panic("pebble: invalid batch count")
795 : }
796 2 : if kind == base.InternalKeyKindExcise {
797 2 : if exciseSpan.Valid() {
798 0 : panic("pebble: multiple excise spans in a single batch")
799 : }
800 2 : exciseSpan.Start = slices.Clone(key)
801 2 : exciseSpan.End = slices.Clone(val)
802 2 : continue
803 : }
804 2 : addFileNum(key)
805 : }
806 :
807 2 : if _, _, _, ok, err := br.Next(); err != nil {
808 0 : return nil, err
809 2 : } else if ok {
810 0 : panic("pebble: invalid number of entries in batch")
811 : }
812 :
813 2 : meta := make([]*fileMetadata, len(fileNums))
814 2 : var lastRangeKey keyspan.Span
815 2 : for i, n := range fileNums {
816 2 : readable, err := d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true})
817 2 : if err != nil {
818 0 : return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
819 0 : }
820 : // NB: ingestLoad1 will close readable.
821 2 : meta[i], lastRangeKey, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
822 2 : readable, d.cacheID, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
823 2 : if err != nil {
824 0 : return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
825 0 : }
826 : }
827 2 : if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
828 0 : return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
829 0 : d.opts.Comparer.FormatKey(lastRangeKey.End))
830 0 : }
831 :
832 2 : numFiles := len(meta)
833 2 : if exciseSpan.Valid() {
834 2 : numFiles++
835 2 : }
836 2 : if uint32(numFiles) != b.Count() {
837 0 : panic("pebble: couldn't load all files in WAL entry")
838 : }
839 :
840 2 : return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
841 : }
842 :
843 : // replayWAL replays the edits in the specified WAL. If the DB is in read
844 : // only mode, then the WALs are replayed into memtables and not flushed. If
845 : // the DB is not in read only mode, then the contents of the WAL are
846 : // guaranteed to be flushed when a flush is scheduled after this method is run.
847 : // Note that this flushing is very important for guaranteeing durability:
848 : // the application may have had a number of pending
849 : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
850 : // happened but the corresponding data may now be readable from the WAL (while
851 : // sitting in write-back caches in the kernel or the storage device). By
852 : // reading the WAL (including the non-fsynced data) and then flushing all
853 : // these changes (flush does fsyncs), we are able to guarantee that the
854 : // initial state of the DB is durable.
855 : //
856 : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
857 : // WALs into the flushable queue. Flushing of the queue is expected to be handled
858 : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
859 : //
860 : // d.mu must be held when calling this, but the mutex may be dropped and
861 : // re-acquired during the course of this method.
862 : func (d *DB) replayWAL(
863 : jobID JobID, ll wal.LogicalLog, strictWALTail bool,
864 2 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
865 2 : rr := ll.OpenForRead()
866 2 : defer rr.Close()
867 2 : var (
868 2 : b Batch
869 2 : buf bytes.Buffer
870 2 : mem *memTable
871 2 : entry *flushableEntry
872 2 : offset wal.Offset
873 2 : lastFlushOffset int64
874 2 : keysReplayed int64 // number of keys replayed
875 2 : batchesReplayed int64 // number of batches replayed
876 2 : )
877 2 :
878 2 : // TODO(jackson): This function is interspersed with panics, in addition to
879 2 : // corruption error propagation. Audit them to ensure we're truly only
880 2 : // panicking where the error points to Pebble bug and not user or
881 2 : // hardware-induced corruption.
882 2 :
883 2 : // "Flushes" (ie. closes off) the current memtable, if not nil.
884 2 : flushMem := func() {
885 2 : if mem == nil {
886 2 : return
887 2 : }
888 2 : mem.writerUnref()
889 2 : if d.mu.mem.mutable == mem {
890 2 : d.mu.mem.mutable = nil
891 2 : }
892 2 : entry.flushForced = !d.opts.ReadOnly
893 2 : var logSize uint64
894 2 : mergedOffset := offset.Physical + offset.PreviousFilesBytes
895 2 : if mergedOffset >= lastFlushOffset {
896 2 : logSize = uint64(mergedOffset - lastFlushOffset)
897 2 : }
898 : // Else, this was the initial memtable in the read-only case which must have
899 : // been empty, but we need to flush it since we don't want to add to it later.
900 2 : lastFlushOffset = mergedOffset
901 2 : entry.logSize = logSize
902 2 : mem, entry = nil, nil
903 : }
904 :
905 2 : mem = d.mu.mem.mutable
906 2 : if mem != nil {
907 2 : entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
908 2 : if !d.opts.ReadOnly {
909 2 : flushMem()
910 2 : }
911 : }
912 :
913 : // Creates a new memtable if there is no current memtable.
914 2 : ensureMem := func(seqNum base.SeqNum) {
915 2 : if mem != nil {
916 2 : return
917 2 : }
918 2 : mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
919 2 : d.mu.mem.mutable = mem
920 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
921 : }
922 :
923 2 : defer func() {
924 2 : if err != nil {
925 1 : err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
926 1 : }
927 : }()
928 :
929 2 : for {
930 2 : var r io.Reader
931 2 : var err error
932 2 : r, offset, err = rr.NextRecord()
933 2 : if err == nil {
934 2 : _, err = io.Copy(&buf, r)
935 2 : }
936 2 : if err != nil {
937 2 : // It is common to encounter a zeroed or invalid chunk due to WAL
938 2 : // preallocation and WAL recycling. We need to distinguish these
939 2 : // errors from EOF in order to recognize that the record was
940 2 : // truncated and to avoid replaying subsequent WALs, but want
941 2 : // to otherwise treat them like EOF.
942 2 : if err == io.EOF {
943 2 : break
944 1 : } else if record.IsInvalidRecord(err) && !strictWALTail {
945 1 : break
946 : }
947 1 : return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
948 : }
949 :
950 2 : if buf.Len() < batchrepr.HeaderLen {
951 0 : return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
952 0 : errors.Safe(base.DiskFileNum(ll.Num)), offset)
953 0 : }
954 :
955 2 : if d.opts.ErrorIfNotPristine {
956 1 : return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
957 1 : }
958 :
959 : // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
960 : // which is used below.
961 2 : b = Batch{}
962 2 : b.db = d
963 2 : b.SetRepr(buf.Bytes())
964 2 : seqNum := b.SeqNum()
965 2 : maxSeqNum = seqNum + base.SeqNum(b.Count())
966 2 : keysReplayed += int64(b.Count())
967 2 : batchesReplayed++
968 2 : {
969 2 : br := b.Reader()
970 2 : if kind, _, _, ok, err := br.Next(); err != nil {
971 0 : return nil, 0, err
972 2 : } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
973 2 : // We're in the flushable ingests (+ possibly excises) case.
974 2 : //
975 2 : // Ingests require an up-to-date view of the LSM to determine the target
976 2 : // level of ingested sstables, and to accurately compute excises. Instead of
977 2 : // doing an ingest in this function, we just enqueue a flushable ingest
978 2 : // in the flushables queue and run a regular flush.
979 2 : flushMem()
980 2 : // mem is nil here.
981 2 : entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
982 2 : if err != nil {
983 0 : return nil, 0, err
984 0 : }
985 2 : fi := entry.flushable.(*ingestedFlushable)
986 2 : flushableIngests = append(flushableIngests, fi)
987 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
988 2 : // A flushable ingest is always followed by a WAL rotation.
989 2 : break
990 : }
991 : }
992 :
993 2 : if b.memTableSize >= uint64(d.largeBatchThreshold) {
994 2 : flushMem()
995 2 : // Make a copy of the data slice since it is currently owned by buf and will
996 2 : // be reused in the next iteration.
997 2 : b.data = slices.Clone(b.data)
998 2 : b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
999 2 : if err != nil {
1000 0 : return nil, 0, err
1001 0 : }
1002 2 : entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
1003 2 : // Disable memory accounting by adding a reader ref that will never be
1004 2 : // removed.
1005 2 : entry.readerRefs.Add(1)
1006 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1007 2 : } else {
1008 2 : ensureMem(seqNum)
1009 2 : if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
1010 0 : return nil, 0, err
1011 0 : }
1012 : // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
1013 : // batch may not initially fit, but will eventually fit (since it is smaller than
1014 : // largeBatchThreshold).
1015 2 : for err == arenaskl.ErrArenaFull {
1016 2 : flushMem()
1017 2 : ensureMem(seqNum)
1018 2 : err = mem.prepare(&b)
1019 2 : if err != nil && err != arenaskl.ErrArenaFull {
1020 0 : return nil, 0, err
1021 0 : }
1022 : }
1023 2 : if err = mem.apply(&b, seqNum); err != nil {
1024 0 : return nil, 0, err
1025 0 : }
1026 2 : mem.writerUnref()
1027 : }
1028 2 : buf.Reset()
1029 : }
1030 :
1031 2 : d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
1032 2 : jobID, ll.String(), offset, keysReplayed, batchesReplayed)
1033 2 : if !d.opts.ReadOnly {
1034 2 : flushMem()
1035 2 : }
1036 :
1037 : // mem is nil here, if !ReadOnly.
1038 2 : return flushableIngests, maxSeqNum, err
1039 : }
1040 :
1041 2 : func readOptionsFile(opts *Options, path string) (string, error) {
1042 2 : f, err := opts.FS.Open(path)
1043 2 : if err != nil {
1044 0 : return "", err
1045 0 : }
1046 2 : defer f.Close()
1047 2 :
1048 2 : data, err := io.ReadAll(f)
1049 2 : if err != nil {
1050 0 : return "", err
1051 0 : }
1052 2 : return string(data), nil
1053 : }
1054 :
1055 : // DBDesc briefly describes high-level state about a database.
1056 : type DBDesc struct {
1057 : // Exists is true if an existing database was found.
1058 : Exists bool
1059 : // FormatMajorVersion indicates the database's current format
1060 : // version.
1061 : FormatMajorVersion FormatMajorVersion
1062 : // ManifestFilename is the filename of the current active manifest,
1063 : // if the database exists.
1064 : ManifestFilename string
1065 : // OptionsFilename is the filename of the most recent OPTIONS file, if it
1066 : // exists.
1067 : OptionsFilename string
1068 : }
1069 :
1070 : // String implements fmt.Stringer.
1071 1 : func (d *DBDesc) String() string {
1072 1 : if !d.Exists {
1073 1 : return "uninitialized"
1074 1 : }
1075 1 : var buf bytes.Buffer
1076 1 : fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
1077 1 : fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
1078 1 : fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
1079 1 : return buf.String()
1080 : }
1081 :
1082 : // Peek looks for an existing database in dirname on the provided FS. It
1083 : // returns a brief description of the database. Peek is read-only and
1084 : // does not open the database
1085 1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
1086 1 : ls, err := fs.List(dirname)
1087 1 : if err != nil {
1088 1 : return nil, err
1089 1 : }
1090 :
1091 1 : vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
1092 1 : if err != nil {
1093 0 : return nil, err
1094 0 : }
1095 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1096 : // PeekMarker variant that avoids opening the directory.
1097 1 : if err := versMarker.Close(); err != nil {
1098 0 : return nil, err
1099 0 : }
1100 :
1101 : // Find the currently active manifest, if there is one.
1102 1 : manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
1103 1 : if err != nil {
1104 0 : return nil, err
1105 0 : }
1106 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1107 : // PeekMarker variant that avoids opening the directory.
1108 1 : if err := manifestMarker.Close(); err != nil {
1109 0 : return nil, err
1110 0 : }
1111 :
1112 1 : desc := &DBDesc{
1113 1 : Exists: exists,
1114 1 : FormatMajorVersion: vers,
1115 1 : }
1116 1 :
1117 1 : // Find the OPTIONS file with the highest file number within the list of
1118 1 : // directory entries.
1119 1 : var previousOptionsFileNum base.DiskFileNum
1120 1 : for _, filename := range ls {
1121 1 : ft, fn, ok := base.ParseFilename(fs, filename)
1122 1 : if !ok || ft != fileTypeOptions || fn < previousOptionsFileNum {
1123 1 : continue
1124 : }
1125 1 : previousOptionsFileNum = fn
1126 1 : desc.OptionsFilename = fs.PathJoin(dirname, filename)
1127 : }
1128 :
1129 1 : if exists {
1130 1 : desc.ManifestFilename = base.MakeFilepath(fs, dirname, fileTypeManifest, manifestFileNum)
1131 1 : }
1132 1 : return desc, nil
1133 : }
1134 :
1135 : // LockDirectory acquires the database directory lock in the named directory,
1136 : // preventing another process from opening the database. LockDirectory returns a
1137 : // handle to the held lock that may be passed to Open through Options.Lock to
1138 : // subsequently open the database, skipping lock acquistion during Open.
1139 : //
1140 : // LockDirectory may be used to expand the critical section protected by the
1141 : // database lock to include setup before the call to Open.
1142 2 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
1143 2 : fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, fileTypeLock, base.DiskFileNum(0)))
1144 2 : if err != nil {
1145 1 : return nil, err
1146 1 : }
1147 2 : l := &Lock{dirname: dirname, fileLock: fileLock}
1148 2 : l.refs.Store(1)
1149 2 : invariants.SetFinalizer(l, func(obj interface{}) {
1150 2 : if refs := obj.(*Lock).refs.Load(); refs > 0 {
1151 0 : panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
1152 : }
1153 : })
1154 2 : return l, nil
1155 : }
1156 :
1157 : // Lock represents a file lock on a directory. It may be passed to Open through
1158 : // Options.Lock to elide lock aquisition during Open.
1159 : type Lock struct {
1160 : dirname string
1161 : fileLock io.Closer
1162 : // refs is a count of the number of handles on the lock. refs must be 0, 1
1163 : // or 2.
1164 : //
1165 : // When acquired by the client and passed to Open, refs = 1 and the Open
1166 : // call increments it to 2. When the database is closed, it's decremented to
1167 : // 1. Finally when the original caller, calls Close on the Lock, it's
1168 : // drecemented to zero and the underlying file lock is released.
1169 : //
1170 : // When Open acquires the file lock, refs remains at 1 until the database is
1171 : // closed.
1172 : refs atomic.Int32
1173 : }
1174 :
1175 1 : func (l *Lock) refForOpen() error {
1176 1 : // During Open, when a user passed in a lock, the reference count must be
1177 1 : // exactly 1. If it's zero, the lock is no longer held and is invalid. If
1178 1 : // it's 2, the lock is already in use by another database within the
1179 1 : // process.
1180 1 : if !l.refs.CompareAndSwap(1, 2) {
1181 1 : return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
1182 1 : }
1183 1 : return nil
1184 : }
1185 :
1186 : // Close releases the lock, permitting another process to lock and open the
1187 : // database. Close must not be called until after a database using the Lock has
1188 : // been closed.
1189 2 : func (l *Lock) Close() error {
1190 2 : if l.refs.Add(-1) > 0 {
1191 1 : return nil
1192 1 : }
1193 2 : defer func() { l.fileLock = nil }()
1194 2 : return l.fileLock.Close()
1195 : }
1196 :
1197 1 : func (l *Lock) pathMatches(dirname string) error {
1198 1 : if dirname == l.dirname {
1199 1 : return nil
1200 1 : }
1201 : // Check for relative paths, symlinks, etc. This isn't ideal because we're
1202 : // circumventing the vfs.FS interface here.
1203 : //
1204 : // TODO(jackson): We could add support for retrieving file inodes through Stat
1205 : // calls in the VFS interface on platforms where it's available and use that
1206 : // to differentiate.
1207 1 : dirStat, err1 := os.Stat(dirname)
1208 1 : lockDirStat, err2 := os.Stat(l.dirname)
1209 1 : if err1 == nil && err2 == nil && os.SameFile(dirStat, lockDirStat) {
1210 1 : return nil
1211 1 : }
1212 0 : return errors.Join(
1213 0 : errors.Newf("pebble: opts.Lock acquired in %q not %q", l.dirname, dirname),
1214 0 : err1, err2)
1215 : }
1216 :
1217 : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
1218 : // does not exist.
1219 : //
1220 : // Note that errors can be wrapped with more details; use errors.Is().
1221 : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
1222 :
1223 : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
1224 : // already exists.
1225 : //
1226 : // Note that errors can be wrapped with more details; use errors.Is().
1227 : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
1228 :
1229 : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
1230 : // already exists and is not pristine.
1231 : //
1232 : // Note that errors can be wrapped with more details; use errors.Is().
1233 : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
1234 :
1235 : // IsCorruptionError returns true if the given error indicates database
1236 : // corruption.
1237 1 : func IsCorruptionError(err error) bool {
1238 1 : return errors.Is(err, base.ErrCorruption)
1239 1 : }
1240 :
1241 2 : func checkConsistency(v *manifest.Version, dirname string, objProvider objstorage.Provider) error {
1242 2 : var errs []error
1243 2 : dedup := make(map[base.DiskFileNum]struct{})
1244 2 : for level, files := range v.Levels {
1245 2 : iter := files.Iter()
1246 2 : for f := iter.First(); f != nil; f = iter.Next() {
1247 2 : backingState := f.FileBacking
1248 2 : if _, ok := dedup[backingState.DiskFileNum]; ok {
1249 2 : continue
1250 : }
1251 2 : dedup[backingState.DiskFileNum] = struct{}{}
1252 2 : fileNum := backingState.DiskFileNum
1253 2 : fileSize := backingState.Size
1254 2 : // We skip over remote objects; those are instead checked asynchronously
1255 2 : // by the table stats loading job.
1256 2 : meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
1257 2 : var size int64
1258 2 : if err == nil {
1259 2 : if meta.IsRemote() {
1260 2 : continue
1261 : }
1262 2 : size, err = objProvider.Size(meta)
1263 : }
1264 2 : if err != nil {
1265 1 : errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
1266 1 : continue
1267 : }
1268 :
1269 2 : if size != int64(fileSize) {
1270 0 : errs = append(errs, errors.Errorf(
1271 0 : "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
1272 0 : errors.Safe(level), fileNum, objProvider.Path(meta),
1273 0 : errors.Safe(size), errors.Safe(fileSize)))
1274 0 : continue
1275 : }
1276 : }
1277 : }
1278 2 : return errors.Join(errs...)
1279 : }
1280 :
1281 : type walEventListenerAdaptor struct {
1282 : l *EventListener
1283 : }
1284 :
1285 2 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
1286 2 : // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
1287 2 : // is insufficient to infer whether primary or secondary.
1288 2 : wci := WALCreateInfo{
1289 2 : JobID: ci.JobID,
1290 2 : Path: ci.Path,
1291 2 : FileNum: base.DiskFileNum(ci.Num),
1292 2 : RecycledFileNum: ci.RecycledFileNum,
1293 2 : Err: ci.Err,
1294 2 : }
1295 2 : l.l.WALCreated(wci)
1296 2 : }
|