Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "math"
14 : "os"
15 : "slices"
16 : "sync/atomic"
17 : "time"
18 :
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/errors/oserror"
21 : "github.com/cockroachdb/pebble/batchrepr"
22 : "github.com/cockroachdb/pebble/internal/arenaskl"
23 : "github.com/cockroachdb/pebble/internal/base"
24 : "github.com/cockroachdb/pebble/internal/cache"
25 : "github.com/cockroachdb/pebble/internal/constants"
26 : "github.com/cockroachdb/pebble/internal/invariants"
27 : "github.com/cockroachdb/pebble/internal/manifest"
28 : "github.com/cockroachdb/pebble/internal/manual"
29 : "github.com/cockroachdb/pebble/objstorage"
30 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
31 : "github.com/cockroachdb/pebble/objstorage/remote"
32 : "github.com/cockroachdb/pebble/record"
33 : "github.com/cockroachdb/pebble/sstable"
34 : "github.com/cockroachdb/pebble/vfs"
35 : "github.com/cockroachdb/pebble/wal"
36 : "github.com/prometheus/client_golang/prometheus"
37 : )
38 :
39 : const (
40 : initialMemTableSize = 256 << 10 // 256 KB
41 :
42 : // The max batch size is limited by the uint32 offsets stored in
43 : // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
44 : //
45 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
46 : // end of an allocation fits in uint32.
47 : //
48 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
49 : // 2GB).
50 : maxBatchSize = constants.MaxUint32OrInt
51 :
52 : // The max memtable size is limited by the uint32 offsets stored in
53 : // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
54 : //
55 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
56 : // end of an allocation fits in uint32.
57 : //
58 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
59 : // 2GB).
60 : maxMemTableSize = constants.MaxUint32OrInt
61 : )
62 :
63 : // TableCacheSize can be used to determine the table
64 : // cache size for a single db, given the maximum open
65 : // files which can be used by a table cache which is
66 : // only used by a single db.
67 1 : func TableCacheSize(maxOpenFiles int) int {
68 1 : tableCacheSize := maxOpenFiles - numNonTableCacheFiles
69 1 : if tableCacheSize < minTableCacheSize {
70 1 : tableCacheSize = minTableCacheSize
71 1 : }
72 1 : return tableCacheSize
73 : }
74 :
75 : // Open opens a DB whose files live in the given directory.
76 1 : func Open(dirname string, opts *Options) (db *DB, err error) {
77 1 : // Make a copy of the options so that we don't mutate the passed in options.
78 1 : opts = opts.Clone()
79 1 : opts = opts.EnsureDefaults()
80 1 : if err := opts.Validate(); err != nil {
81 0 : return nil, err
82 0 : }
83 1 : if opts.LoggerAndTracer == nil {
84 1 : opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
85 1 : } else {
86 0 : opts.Logger = opts.LoggerAndTracer
87 0 : }
88 :
89 1 : if invariants.Sometimes(5) {
90 1 : assertComparer := base.MakeAssertComparer(*opts.Comparer)
91 1 : opts.Comparer = &assertComparer
92 1 : }
93 :
94 : // In all error cases, we return db = nil; this is used by various
95 : // deferred cleanups.
96 :
97 : // Open the database and WAL directories first.
98 1 : walDirname, dataDir, err := prepareAndOpenDirs(dirname, opts)
99 1 : if err != nil {
100 0 : return nil, errors.Wrapf(err, "error opening database at %q", dirname)
101 0 : }
102 1 : defer func() {
103 1 : if db == nil {
104 0 : dataDir.Close()
105 0 : }
106 : }()
107 :
108 : // Lock the database directory.
109 1 : var fileLock *Lock
110 1 : if opts.Lock != nil {
111 0 : // The caller already acquired the database lock. Ensure that the
112 0 : // directory matches.
113 0 : if err := opts.Lock.pathMatches(dirname); err != nil {
114 0 : return nil, err
115 0 : }
116 0 : if err := opts.Lock.refForOpen(); err != nil {
117 0 : return nil, err
118 0 : }
119 0 : fileLock = opts.Lock
120 1 : } else {
121 1 : fileLock, err = LockDirectory(dirname, opts.FS)
122 1 : if err != nil {
123 0 : return nil, err
124 0 : }
125 : }
126 1 : defer func() {
127 1 : if db == nil {
128 0 : fileLock.Close()
129 0 : }
130 : }()
131 :
132 : // List the directory contents. This also happens to include WAL log files, if
133 : // they are in the same dir, but we will ignore those below. The provider is
134 : // also given this list, but it ignores non sstable files.
135 1 : ls, err := opts.FS.List(dirname)
136 1 : if err != nil {
137 0 : return nil, err
138 0 : }
139 :
140 : // Establish the format major version.
141 1 : formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
142 1 : if err != nil {
143 0 : return nil, err
144 0 : }
145 1 : defer func() {
146 1 : if db == nil {
147 0 : formatVersionMarker.Close()
148 0 : }
149 : }()
150 :
151 1 : noFormatVersionMarker := formatVersion == FormatDefault
152 1 : if noFormatVersionMarker {
153 1 : // We will initialize the store at the minimum possible format, then upgrade
154 1 : // the format to the desired one. This helps test the format upgrade code.
155 1 : formatVersion = FormatMinSupported
156 1 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
157 1 : formatVersion = FormatMinForSharedObjects
158 1 : }
159 : // There is no format version marker file. There are three cases:
160 : // - we are trying to open an existing store that was created at
161 : // FormatMostCompatible (the only one without a version marker file)
162 : // - we are creating a new store;
163 : // - we are retrying a failed creation.
164 : //
165 : // To error in the first case, we set ErrorIfNotPristine.
166 1 : opts.ErrorIfNotPristine = true
167 1 : defer func() {
168 1 : if err != nil && errors.Is(err, ErrDBNotPristine) {
169 0 : // We must be trying to open an existing store at FormatMostCompatible.
170 0 : // Correct the error in this case -we
171 0 : err = errors.Newf(
172 0 : "pebble: database %q written in format major version 1 which is no longer supported",
173 0 : dirname)
174 0 : }
175 : }()
176 1 : } else {
177 1 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone && formatVersion < FormatMinForSharedObjects {
178 0 : return nil, errors.Newf(
179 0 : "pebble: database %q configured with shared objects but written in too old format major version %d",
180 0 : formatVersion)
181 0 : }
182 : }
183 :
184 : // Find the currently active manifest, if there is one.
185 1 : manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
186 1 : if err != nil {
187 0 : return nil, errors.Wrapf(err, "pebble: database %q", dirname)
188 0 : }
189 1 : defer func() {
190 1 : if db == nil {
191 0 : manifestMarker.Close()
192 0 : }
193 : }()
194 :
195 : // Atomic markers may leave behind obsolete files if there's a crash
196 : // mid-update. Clean these up if we're not in read-only mode.
197 1 : if !opts.ReadOnly {
198 1 : if err := formatVersionMarker.RemoveObsolete(); err != nil {
199 0 : return nil, err
200 0 : }
201 1 : if err := manifestMarker.RemoveObsolete(); err != nil {
202 0 : return nil, err
203 0 : }
204 : }
205 :
206 1 : if opts.Cache == nil {
207 0 : opts.Cache = cache.New(cacheDefaultSize)
208 1 : } else {
209 1 : opts.Cache.Ref()
210 1 : }
211 :
212 1 : d := &DB{
213 1 : cacheID: opts.Cache.NewID(),
214 1 : dirname: dirname,
215 1 : opts: opts,
216 1 : cmp: opts.Comparer.Compare,
217 1 : equal: opts.Comparer.Equal,
218 1 : merge: opts.Merger.Merge,
219 1 : split: opts.Comparer.Split,
220 1 : abbreviatedKey: opts.Comparer.AbbreviatedKey,
221 1 : largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
222 1 : fileLock: fileLock,
223 1 : dataDir: dataDir,
224 1 : closed: new(atomic.Value),
225 1 : closedCh: make(chan struct{}),
226 1 : }
227 1 : d.mu.versions = &versionSet{}
228 1 : d.diskAvailBytes.Store(math.MaxUint64)
229 1 :
230 1 : defer func() {
231 1 : // If an error or panic occurs during open, attempt to release the manually
232 1 : // allocated memory resources. Note that rather than look for an error, we
233 1 : // look for the return of a nil DB pointer.
234 1 : if r := recover(); db == nil {
235 0 : // Release our references to the Cache. Note that both the DB, and
236 0 : // tableCache have a reference. When we release the reference to
237 0 : // the tableCache, and if there are no other references to
238 0 : // the tableCache, then the tableCache will also release its
239 0 : // reference to the cache.
240 0 : opts.Cache.Unref()
241 0 :
242 0 : if d.tableCache != nil {
243 0 : _ = d.tableCache.close()
244 0 : }
245 :
246 0 : for _, mem := range d.mu.mem.queue {
247 0 : switch t := mem.flushable.(type) {
248 0 : case *memTable:
249 0 : manual.Free(t.arenaBuf)
250 0 : t.arenaBuf = nil
251 : }
252 : }
253 0 : if d.cleanupManager != nil {
254 0 : d.cleanupManager.Close()
255 0 : }
256 0 : if d.objProvider != nil {
257 0 : d.objProvider.Close()
258 0 : }
259 0 : if r != nil {
260 0 : panic(r)
261 : }
262 : }
263 : }()
264 :
265 1 : d.commit = newCommitPipeline(commitEnv{
266 1 : logSeqNum: &d.mu.versions.logSeqNum,
267 1 : visibleSeqNum: &d.mu.versions.visibleSeqNum,
268 1 : apply: d.commitApply,
269 1 : write: d.commitWrite,
270 1 : })
271 1 : d.mu.nextJobID = 1
272 1 : d.mu.mem.nextSize = opts.MemTableSize
273 1 : if d.mu.mem.nextSize > initialMemTableSize {
274 1 : d.mu.mem.nextSize = initialMemTableSize
275 1 : }
276 1 : d.mu.compact.cond.L = &d.mu.Mutex
277 1 : d.mu.compact.inProgress = make(map[*compaction]struct{})
278 1 : d.mu.compact.noOngoingFlushStartTime = time.Now()
279 1 : d.mu.snapshots.init()
280 1 : // logSeqNum is the next sequence number that will be assigned.
281 1 : // Start assigning sequence numbers from base.SeqNumStart to leave
282 1 : // room for reserved sequence numbers (see comments around
283 1 : // SeqNumStart).
284 1 : d.mu.versions.logSeqNum.Store(base.SeqNumStart)
285 1 : d.mu.formatVers.vers.Store(uint64(formatVersion))
286 1 : d.mu.formatVers.marker = formatVersionMarker
287 1 :
288 1 : d.timeNow = time.Now
289 1 : d.openedAt = d.timeNow()
290 1 :
291 1 : d.mu.Lock()
292 1 : defer d.mu.Unlock()
293 1 :
294 1 : jobID := d.newJobIDLocked()
295 1 :
296 1 : providerSettings := objstorageprovider.Settings{
297 1 : Logger: opts.Logger,
298 1 : FS: opts.FS,
299 1 : FSDirName: dirname,
300 1 : FSDirInitialListing: ls,
301 1 : FSCleaner: opts.Cleaner,
302 1 : NoSyncOnClose: opts.NoSyncOnClose,
303 1 : BytesPerSync: opts.BytesPerSync,
304 1 : }
305 1 : providerSettings.Local.ReadaheadConfig = opts.Local.ReadaheadConfig
306 1 : providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
307 1 : providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
308 1 : providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
309 1 : providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
310 1 :
311 1 : d.objProvider, err = objstorageprovider.Open(providerSettings)
312 1 : if err != nil {
313 0 : return nil, err
314 0 : }
315 :
316 1 : if !manifestExists {
317 1 : // DB does not exist.
318 1 : if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
319 0 : return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
320 0 : }
321 :
322 : // Create the DB.
323 1 : if err := d.mu.versions.create(
324 1 : jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
325 0 : return nil, err
326 0 : }
327 1 : } else {
328 1 : if opts.ErrorIfExists {
329 0 : return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
330 0 : }
331 : // Load the version set.
332 1 : if err := d.mu.versions.load(
333 1 : dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
334 0 : return nil, err
335 0 : }
336 1 : if opts.ErrorIfNotPristine {
337 0 : liveFileNums := make(map[base.DiskFileNum]struct{})
338 0 : d.mu.versions.addLiveFileNums(liveFileNums)
339 0 : if len(liveFileNums) != 0 {
340 0 : return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
341 0 : }
342 : }
343 : }
344 :
345 : // In read-only mode, we replay directly into the mutable memtable but never
346 : // flush it. We need to delay creation of the memtable until we know the
347 : // sequence number of the first batch that will be inserted.
348 1 : if !d.opts.ReadOnly {
349 1 : var entry *flushableEntry
350 1 : d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
351 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
352 1 : }
353 :
354 1 : d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
355 1 : Buckets: FsyncLatencyBuckets,
356 1 : })
357 1 : walOpts := wal.Options{
358 1 : Primary: wal.Dir{FS: opts.FS, Dirname: walDirname},
359 1 : Secondary: wal.Dir{},
360 1 : MinUnflushedWALNum: wal.NumWAL(d.mu.versions.minUnflushedLogNum),
361 1 : MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
362 1 : NoSyncOnClose: opts.NoSyncOnClose,
363 1 : BytesPerSync: opts.WALBytesPerSync,
364 1 : PreallocateSize: d.walPreallocateSize,
365 1 : MinSyncInterval: opts.WALMinSyncInterval,
366 1 : FsyncLatency: d.mu.log.metrics.fsyncLatency,
367 1 : QueueSemChan: d.commit.logSyncQSem,
368 1 : Logger: opts.Logger,
369 1 : EventListener: walEventListenerAdaptor{l: opts.EventListener},
370 1 : }
371 1 : if opts.WALFailover != nil {
372 1 : walOpts.Secondary = opts.WALFailover.Secondary
373 1 : walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
374 1 : walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
375 1 : Buckets: FsyncLatencyBuckets,
376 1 : })
377 1 : }
378 1 : walDirs := append(walOpts.Dirs(), opts.WALRecoveryDirs...)
379 1 : wals, err := wal.Scan(walDirs...)
380 1 : if err != nil {
381 0 : return nil, err
382 0 : }
383 1 : walManager, err := wal.Init(walOpts, wals)
384 1 : if err != nil {
385 0 : return nil, err
386 0 : }
387 1 : defer func() {
388 1 : if db == nil {
389 0 : walManager.Close()
390 0 : }
391 : }()
392 :
393 1 : d.mu.log.manager = walManager
394 1 :
395 1 : d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
396 1 :
397 1 : if manifestExists && !opts.DisableConsistencyCheck {
398 1 : curVersion := d.mu.versions.currentVersion()
399 1 : if err := checkConsistency(curVersion, dirname, d.objProvider); err != nil {
400 0 : return nil, err
401 0 : }
402 : }
403 :
404 1 : tableCacheSize := TableCacheSize(opts.MaxOpenFiles)
405 1 : d.tableCache = newTableCacheContainer(
406 1 : opts.TableCache, d.cacheID, d.objProvider, d.opts, tableCacheSize,
407 1 : &sstable.CategoryStatsCollector{})
408 1 : d.newIters = d.tableCache.newIters
409 1 : d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
410 1 :
411 1 : d.mu.annotators.totalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
412 0 : return true
413 0 : })
414 1 : d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
415 0 : meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
416 0 : if err != nil {
417 0 : return false
418 0 : }
419 0 : return meta.IsRemote()
420 : })
421 1 : d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
422 0 : meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
423 0 : if err != nil {
424 0 : return false
425 0 : }
426 0 : return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
427 : })
428 :
429 1 : var previousOptionsFileNum base.DiskFileNum
430 1 : var previousOptionsFilename string
431 1 : for _, filename := range ls {
432 1 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
433 1 : if !ok {
434 1 : continue
435 : }
436 :
437 : // Don't reuse any obsolete file numbers to avoid modifying an
438 : // ingested sstable's original external file.
439 1 : d.mu.versions.markFileNumUsed(fn)
440 1 :
441 1 : switch ft {
442 0 : case fileTypeLog:
443 : // Ignore.
444 1 : case fileTypeOptions:
445 1 : if previousOptionsFileNum < fn {
446 1 : previousOptionsFileNum = fn
447 1 : previousOptionsFilename = filename
448 1 : }
449 0 : case fileTypeTemp, fileTypeOldTemp:
450 0 : if !d.opts.ReadOnly {
451 0 : // Some codepaths write to a temporary file and then
452 0 : // rename it to its final location when complete. A
453 0 : // temp file is leftover if a process exits before the
454 0 : // rename. Remove it.
455 0 : err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
456 0 : if err != nil {
457 0 : return nil, err
458 0 : }
459 : }
460 : }
461 : }
462 1 : if n := len(wals); n > 0 {
463 1 : // Don't reuse any obsolete file numbers to avoid modifying an
464 1 : // ingested sstable's original external file.
465 1 : d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
466 1 : }
467 :
468 : // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
469 : // objProvider. This avoids FileNum collisions with obsolete sstables.
470 1 : objects := d.objProvider.List()
471 1 : for _, obj := range objects {
472 1 : d.mu.versions.markFileNumUsed(obj.DiskFileNum)
473 1 : }
474 :
475 : // Validate the most-recent OPTIONS file, if there is one.
476 1 : if previousOptionsFilename != "" {
477 1 : path := opts.FS.PathJoin(dirname, previousOptionsFilename)
478 1 : previousOptions, err := readOptionsFile(opts, path)
479 1 : if err != nil {
480 0 : return nil, err
481 0 : }
482 1 : if err := opts.CheckCompatibility(previousOptions); err != nil {
483 0 : return nil, err
484 0 : }
485 : }
486 :
487 : // Replay any newer log files than the ones named in the manifest.
488 1 : var replayWALs wal.Logs
489 1 : for i, w := range wals {
490 1 : if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
491 1 : replayWALs = wals[i:]
492 1 : break
493 : }
494 : }
495 1 : var ve versionEdit
496 1 : var toFlush flushableList
497 1 : for i, lf := range replayWALs {
498 1 : // WALs other than the last one would have been closed cleanly.
499 1 : //
500 1 : // Note: we used to never require strict WAL tails when reading from older
501 1 : // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
502 1 : // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
503 1 : // compatible Pebble format is newer and guarantees a clean EOF.
504 1 : strictWALTail := i < len(replayWALs)-1
505 1 : flush, maxSeqNum, err := d.replayWAL(jobID, &ve, lf, strictWALTail)
506 1 : if err != nil {
507 0 : return nil, err
508 0 : }
509 1 : toFlush = append(toFlush, flush...)
510 1 : if d.mu.versions.logSeqNum.Load() < maxSeqNum {
511 1 : d.mu.versions.logSeqNum.Store(maxSeqNum)
512 1 : }
513 : }
514 1 : d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
515 1 :
516 1 : if !d.opts.ReadOnly {
517 1 : // Create an empty .log file.
518 1 : newLogNum := d.mu.versions.getNextDiskFileNum()
519 1 :
520 1 : // This logic is slightly different than RocksDB's. Specifically, RocksDB
521 1 : // sets MinUnflushedLogNum to max-recovered-log-num + 1. We set it to the
522 1 : // newLogNum. There should be no difference in using either value.
523 1 : ve.MinUnflushedLogNum = newLogNum
524 1 :
525 1 : // Create the manifest with the updated MinUnflushedLogNum before
526 1 : // creating the new log file. If we created the log file first, a
527 1 : // crash before the manifest is synced could leave two WALs with
528 1 : // unclean tails.
529 1 : d.mu.versions.logLock()
530 1 : if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo {
531 1 : return nil
532 1 : }); err != nil {
533 0 : return nil, err
534 0 : }
535 :
536 1 : for _, entry := range toFlush {
537 1 : entry.readerUnrefLocked(true)
538 1 : }
539 :
540 1 : d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
541 1 : if err != nil {
542 0 : return nil, err
543 0 : }
544 :
545 : // This isn't strictly necessary as we don't use the log number for
546 : // memtables being flushed, only for the next unflushed memtable.
547 1 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
548 : }
549 1 : d.updateReadStateLocked(d.opts.DebugCheck)
550 1 :
551 1 : if !d.opts.ReadOnly {
552 1 : // If the Options specify a format major version higher than the
553 1 : // loaded database's, upgrade it. If this is a new database, this
554 1 : // code path also performs an initial upgrade from the starting
555 1 : // implicit MinSupported version.
556 1 : //
557 1 : // We ratchet the version this far into Open so that migrations have a read
558 1 : // state available. Note that this also results in creating/updating the
559 1 : // format version marker file.
560 1 : if opts.FormatMajorVersion > d.FormatMajorVersion() {
561 1 : if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
562 0 : return nil, err
563 0 : }
564 1 : } else if noFormatVersionMarker {
565 1 : // We are creating a new store. Create the format version marker file.
566 1 : if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
567 0 : return nil, err
568 0 : }
569 : }
570 :
571 : // Write the current options to disk.
572 1 : d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
573 1 : tmpPath := base.MakeFilepath(opts.FS, dirname, fileTypeTemp, d.optionsFileNum)
574 1 : optionsPath := base.MakeFilepath(opts.FS, dirname, fileTypeOptions, d.optionsFileNum)
575 1 :
576 1 : // Write them to a temporary file first, in case we crash before
577 1 : // we're done. A corrupt options file prevents opening the
578 1 : // database.
579 1 : optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
580 1 : if err != nil {
581 0 : return nil, err
582 0 : }
583 1 : serializedOpts := []byte(opts.String())
584 1 : if _, err := optionsFile.Write(serializedOpts); err != nil {
585 0 : return nil, errors.CombineErrors(err, optionsFile.Close())
586 0 : }
587 1 : d.optionsFileSize = uint64(len(serializedOpts))
588 1 : if err := optionsFile.Sync(); err != nil {
589 0 : return nil, errors.CombineErrors(err, optionsFile.Close())
590 0 : }
591 1 : if err := optionsFile.Close(); err != nil {
592 0 : return nil, err
593 0 : }
594 : // Atomically rename to the OPTIONS-XXXXXX path. This rename is
595 : // guaranteed to be atomic because the destination path does not
596 : // exist.
597 1 : if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
598 0 : return nil, err
599 0 : }
600 1 : if err := d.dataDir.Sync(); err != nil {
601 0 : return nil, err
602 0 : }
603 : }
604 :
605 1 : if !d.opts.ReadOnly {
606 1 : d.scanObsoleteFiles(ls)
607 1 : d.deleteObsoleteFiles(jobID)
608 1 : }
609 : // Else, nothing is obsolete.
610 :
611 1 : d.mu.tableStats.cond.L = &d.mu.Mutex
612 1 : d.mu.tableValidation.cond.L = &d.mu.Mutex
613 1 : if !d.opts.ReadOnly {
614 1 : d.maybeCollectTableStatsLocked()
615 1 : }
616 1 : d.calculateDiskAvailableBytes()
617 1 :
618 1 : d.maybeScheduleFlush()
619 1 : d.maybeScheduleCompaction()
620 1 :
621 1 : // Note: this is a no-op if invariants are disabled or race is enabled.
622 1 : //
623 1 : // Setting a finalizer on *DB causes *DB to never be reclaimed and the
624 1 : // finalizer to never be run. The problem is due to this limitation of
625 1 : // finalizers mention in the SetFinalizer docs:
626 1 : //
627 1 : // If a cyclic structure includes a block with a finalizer, that cycle is
628 1 : // not guaranteed to be garbage collected and the finalizer is not
629 1 : // guaranteed to run, because there is no ordering that respects the
630 1 : // dependencies.
631 1 : //
632 1 : // DB has cycles with several of its internal structures: readState,
633 1 : // newIters, tableCache, versions, etc. Each of this individually cause a
634 1 : // cycle and prevent the finalizer from being run. But we can workaround this
635 1 : // finializer limitation by setting a finalizer on another object that is
636 1 : // tied to the lifetime of DB: the DB.closed atomic.Value.
637 1 : dPtr := fmt.Sprintf("%p", d)
638 1 : invariants.SetFinalizer(d.closed, func(obj interface{}) {
639 0 : v := obj.(*atomic.Value)
640 0 : if err := v.Load(); err == nil {
641 0 : fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
642 0 : os.Exit(1)
643 0 : }
644 : })
645 :
646 1 : return d, nil
647 : }
648 :
649 : // prepareAndOpenDirs opens the directories for the store (and creates them if
650 : // necessary).
651 : //
652 : // Returns an error if ReadOnly is set and the directories don't exist.
653 : func prepareAndOpenDirs(
654 : dirname string, opts *Options,
655 1 : ) (walDirname string, dataDir vfs.File, err error) {
656 1 : walDirname = opts.WALDir
657 1 : if opts.WALDir == "" {
658 1 : walDirname = dirname
659 1 : }
660 :
661 : // Create directories if needed.
662 1 : if !opts.ReadOnly {
663 1 : f, err := mkdirAllAndSyncParents(opts.FS, dirname)
664 1 : if err != nil {
665 0 : return "", nil, err
666 0 : }
667 1 : f.Close()
668 1 : if walDirname != dirname {
669 1 : f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
670 1 : if err != nil {
671 0 : return "", nil, err
672 0 : }
673 1 : f.Close()
674 : }
675 1 : if opts.WALFailover != nil {
676 1 : secondary := opts.WALFailover.Secondary
677 1 : f, err := mkdirAllAndSyncParents(secondary.FS, secondary.Dirname)
678 1 : if err != nil {
679 0 : return "", nil, err
680 0 : }
681 1 : f.Close()
682 : }
683 : }
684 :
685 1 : dataDir, err = opts.FS.OpenDir(dirname)
686 1 : if err != nil {
687 0 : if opts.ReadOnly && oserror.IsNotExist(err) {
688 0 : return "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
689 0 : }
690 0 : return "", nil, err
691 : }
692 1 : if opts.ReadOnly && walDirname != dirname {
693 0 : // Check that the wal dir exists.
694 0 : walDir, err := opts.FS.OpenDir(walDirname)
695 0 : if err != nil {
696 0 : dataDir.Close()
697 0 : return "", nil, err
698 0 : }
699 0 : walDir.Close()
700 : }
701 :
702 1 : return walDirname, dataDir, nil
703 : }
704 :
705 : // GetVersion returns the engine version string from the latest options
706 : // file present in dir. Used to check what Pebble or RocksDB version was last
707 : // used to write to the database stored in this directory. An empty string is
708 : // returned if no valid OPTIONS file with a version key was found.
709 0 : func GetVersion(dir string, fs vfs.FS) (string, error) {
710 0 : ls, err := fs.List(dir)
711 0 : if err != nil {
712 0 : return "", err
713 0 : }
714 0 : var version string
715 0 : lastOptionsSeen := base.DiskFileNum(0)
716 0 : for _, filename := range ls {
717 0 : ft, fn, ok := base.ParseFilename(fs, filename)
718 0 : if !ok {
719 0 : continue
720 : }
721 0 : switch ft {
722 0 : case fileTypeOptions:
723 0 : // If this file has a higher number than the last options file
724 0 : // processed, reset version. This is because rocksdb often
725 0 : // writes multiple options files without deleting previous ones.
726 0 : // Otherwise, skip parsing this options file.
727 0 : if fn > lastOptionsSeen {
728 0 : version = ""
729 0 : lastOptionsSeen = fn
730 0 : } else {
731 0 : continue
732 : }
733 0 : f, err := fs.Open(fs.PathJoin(dir, filename))
734 0 : if err != nil {
735 0 : return "", err
736 0 : }
737 0 : data, err := io.ReadAll(f)
738 0 : f.Close()
739 0 :
740 0 : if err != nil {
741 0 : return "", err
742 0 : }
743 0 : err = parseOptions(string(data), func(section, key, value string) error {
744 0 : switch {
745 0 : case section == "Version":
746 0 : switch key {
747 0 : case "pebble_version":
748 0 : version = value
749 0 : case "rocksdb_version":
750 0 : version = fmt.Sprintf("rocksdb v%s", value)
751 : }
752 : }
753 0 : return nil
754 : })
755 0 : if err != nil {
756 0 : return "", err
757 0 : }
758 : }
759 : }
760 0 : return version, nil
761 : }
762 :
763 : // replayWAL replays the edits in the specified WAL. If the DB is in read
764 : // only mode, then the WALs are replayed into memtables and not flushed. If
765 : // the DB is not in read only mode, then the contents of the WAL are
766 : // guaranteed to be flushed. Note that this flushing is very important for
767 : // guaranteeing durability: the application may have had a number of pending
768 : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
769 : // happened but the corresponding data may now be readable from the WAL (while
770 : // sitting in write-back caches in the kernel or the storage device). By
771 : // reading the WAL (including the non-fsynced data) and then flushing all
772 : // these changes (flush does fsyncs), we are able to guarantee that the
773 : // initial state of the DB is durable.
774 : //
775 : // The toFlush return value is a list of flushables associated with the WAL
776 : // being replayed which will be flushed. Once the version edit has been applied
777 : // to the manifest, it is up to the caller of replayWAL to unreference the
778 : // toFlush flushables returned by replayWAL.
779 : //
780 : // d.mu must be held when calling this, but the mutex may be dropped and
781 : // re-acquired during the course of this method.
782 : func (d *DB) replayWAL(
783 : jobID JobID, ve *versionEdit, ll wal.LogicalLog, strictWALTail bool,
784 1 : ) (toFlush flushableList, maxSeqNum base.SeqNum, err error) {
785 1 : rr := ll.OpenForRead()
786 1 : defer rr.Close()
787 1 : var (
788 1 : b Batch
789 1 : buf bytes.Buffer
790 1 : mem *memTable
791 1 : entry *flushableEntry
792 1 : offset wal.Offset
793 1 : lastFlushOffset int64
794 1 : keysReplayed int64 // number of keys replayed
795 1 : batchesReplayed int64 // number of batches replayed
796 1 : )
797 1 :
798 1 : // TODO(jackson): This function is interspersed with panics, in addition to
799 1 : // corruption error propagation. Audit them to ensure we're truly only
800 1 : // panicking where the error points to Pebble bug and not user or
801 1 : // hardware-induced corruption.
802 1 :
803 1 : if d.opts.ReadOnly {
804 0 : // In read-only mode, we replay directly into the mutable memtable which will
805 0 : // never be flushed.
806 0 : mem = d.mu.mem.mutable
807 0 : if mem != nil {
808 0 : entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
809 0 : }
810 : }
811 :
812 : // Flushes the current memtable, if not nil.
813 1 : flushMem := func() {
814 1 : if mem == nil {
815 1 : return
816 1 : }
817 1 : var logSize uint64
818 1 : mergedOffset := offset.Physical + offset.PreviousFilesBytes
819 1 : if mergedOffset >= lastFlushOffset {
820 1 : logSize = uint64(mergedOffset - lastFlushOffset)
821 1 : }
822 : // Else, this was the initial memtable in the read-only case which must have
823 : // been empty, but we need to flush it since we don't want to add to it later.
824 1 : lastFlushOffset = mergedOffset
825 1 : entry.logSize = logSize
826 1 : if !d.opts.ReadOnly {
827 1 : toFlush = append(toFlush, entry)
828 1 : }
829 1 : mem, entry = nil, nil
830 : }
831 : // Creates a new memtable if there is no current memtable.
832 1 : ensureMem := func(seqNum base.SeqNum) {
833 1 : if mem != nil {
834 1 : return
835 1 : }
836 1 : mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
837 1 : if d.opts.ReadOnly {
838 0 : d.mu.mem.mutable = mem
839 0 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
840 0 : }
841 : }
842 :
843 : // updateVE is used to update ve with information about new files created
844 : // during the flush of any flushable not of type ingestedFlushable. For the
845 : // flushable of type ingestedFlushable we use custom handling below.
846 1 : updateVE := func() error {
847 1 : // TODO(bananabrick): See if we can use the actual base level here,
848 1 : // instead of using 1.
849 1 : c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
850 1 : 1 /* base level */, toFlush, d.timeNow())
851 1 : if err != nil {
852 0 : return err
853 0 : }
854 1 : newVE, _, err := d.runCompaction(jobID, c)
855 1 : if err != nil {
856 0 : return errors.Wrapf(err, "running compaction during WAL replay")
857 0 : }
858 1 : ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...)
859 1 : return nil
860 : }
861 1 : defer func() {
862 1 : if err != nil {
863 0 : err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
864 0 : }
865 : }()
866 :
867 1 : for {
868 1 : var r io.Reader
869 1 : var err error
870 1 : r, offset, err = rr.NextRecord()
871 1 : if err == nil {
872 1 : _, err = io.Copy(&buf, r)
873 1 : }
874 1 : if err != nil {
875 1 : // It is common to encounter a zeroed or invalid chunk due to WAL
876 1 : // preallocation and WAL recycling. We need to distinguish these
877 1 : // errors from EOF in order to recognize that the record was
878 1 : // truncated and to avoid replaying subsequent WALs, but want
879 1 : // to otherwise treat them like EOF.
880 1 : if err == io.EOF {
881 1 : break
882 0 : } else if record.IsInvalidRecord(err) && !strictWALTail {
883 0 : break
884 : }
885 0 : return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
886 : }
887 :
888 1 : if buf.Len() < batchrepr.HeaderLen {
889 0 : return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
890 0 : errors.Safe(base.DiskFileNum(ll.Num)), offset)
891 0 : }
892 :
893 1 : if d.opts.ErrorIfNotPristine {
894 0 : return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
895 0 : }
896 :
897 : // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
898 : // which is used below.
899 1 : b = Batch{}
900 1 : b.db = d
901 1 : b.SetRepr(buf.Bytes())
902 1 : seqNum := b.SeqNum()
903 1 : maxSeqNum = seqNum + base.SeqNum(b.Count())
904 1 : keysReplayed += int64(b.Count())
905 1 : batchesReplayed++
906 1 : {
907 1 : br := b.Reader()
908 1 : if kind, encodedFileNum, _, ok, err := br.Next(); err != nil {
909 0 : return nil, 0, err
910 1 : } else if ok && kind == InternalKeyKindIngestSST {
911 1 : fileNums := make([]base.DiskFileNum, 0, b.Count())
912 1 : addFileNum := func(encodedFileNum []byte) {
913 1 : fileNum, n := binary.Uvarint(encodedFileNum)
914 1 : if n <= 0 {
915 0 : panic("pebble: ingest sstable file num is invalid.")
916 : }
917 1 : fileNums = append(fileNums, base.DiskFileNum(fileNum))
918 : }
919 1 : addFileNum(encodedFileNum)
920 1 :
921 1 : for i := 1; i < int(b.Count()); i++ {
922 1 : kind, encodedFileNum, _, ok, err := br.Next()
923 1 : if err != nil {
924 0 : return nil, 0, err
925 0 : }
926 1 : if kind != InternalKeyKindIngestSST {
927 0 : panic("pebble: invalid batch key kind.")
928 : }
929 1 : if !ok {
930 0 : panic("pebble: invalid batch count.")
931 : }
932 1 : addFileNum(encodedFileNum)
933 : }
934 :
935 1 : if _, _, _, ok, err := br.Next(); err != nil {
936 0 : return nil, 0, err
937 1 : } else if ok {
938 0 : panic("pebble: invalid number of entries in batch.")
939 : }
940 :
941 1 : meta := make([]*fileMetadata, len(fileNums))
942 1 : for i, n := range fileNums {
943 1 : var readable objstorage.Readable
944 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, n)
945 1 : if err != nil {
946 0 : return nil, 0, errors.Wrap(err, "pebble: error when looking up ingested SSTs")
947 0 : }
948 1 : if objMeta.IsRemote() {
949 0 : readable, err = d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true})
950 0 : if err != nil {
951 0 : return nil, 0, errors.Wrap(err, "pebble: error when opening flushable ingest files")
952 0 : }
953 1 : } else {
954 1 : path := base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n)
955 1 : f, err := d.opts.FS.Open(path)
956 1 : if err != nil {
957 0 : return nil, 0, err
958 0 : }
959 :
960 1 : readable, err = sstable.NewSimpleReadable(f)
961 1 : if err != nil {
962 0 : return nil, 0, err
963 0 : }
964 : }
965 : // NB: ingestLoad1 will close readable.
966 1 : meta[i], err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(), readable, d.cacheID, base.PhysicalTableFileNum(n))
967 1 : if err != nil {
968 0 : return nil, 0, errors.Wrap(err, "pebble: error when loading flushable ingest files")
969 0 : }
970 : }
971 :
972 1 : if uint32(len(meta)) != b.Count() {
973 0 : panic("pebble: couldn't load all files in WAL entry.")
974 : }
975 :
976 1 : entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), KeyRange{})
977 1 : if err != nil {
978 0 : return nil, 0, err
979 0 : }
980 :
981 1 : if d.opts.ReadOnly {
982 0 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
983 0 : // We added the IngestSST flushable to the queue. But there
984 0 : // must be at least one WAL entry waiting to be replayed. We
985 0 : // have to ensure this newer WAL entry isn't replayed into
986 0 : // the current value of d.mu.mem.mutable because the current
987 0 : // mutable memtable exists before this flushable entry in
988 0 : // the memtable queue. To ensure this, we just need to unset
989 0 : // d.mu.mem.mutable. When a newer WAL is replayed, we will
990 0 : // set d.mu.mem.mutable to a newer value.
991 0 : d.mu.mem.mutable = nil
992 1 : } else {
993 1 : toFlush = append(toFlush, entry)
994 1 : // During WAL replay, the lsm only has L0, hence, the
995 1 : // baseLevel is 1. For the sake of simplicity, we place the
996 1 : // ingested files in L0 here, instead of finding their
997 1 : // target levels. This is a simplification for the sake of
998 1 : // simpler code. It is expected that WAL replay should be
999 1 : // rare, and that flushables of type ingestedFlushable
1000 1 : // should also be rare. So, placing the ingested files in L0
1001 1 : // is alright.
1002 1 : //
1003 1 : // TODO(bananabrick): Maybe refactor this function to allow
1004 1 : // us to easily place ingested files in levels as low as
1005 1 : // possible during WAL replay. It would require breaking up
1006 1 : // the application of ve to the manifest into chunks and is
1007 1 : // not pretty w/o a refactor to this function and how it's
1008 1 : // used.
1009 1 : c, err := newFlush(
1010 1 : d.opts, d.mu.versions.currentVersion(),
1011 1 : 1, /* base level */
1012 1 : []*flushableEntry{entry},
1013 1 : d.timeNow(),
1014 1 : )
1015 1 : if err != nil {
1016 0 : return nil, 0, err
1017 0 : }
1018 1 : for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
1019 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata})
1020 1 : }
1021 : }
1022 1 : return toFlush, maxSeqNum, nil
1023 : }
1024 : }
1025 :
1026 1 : if b.memTableSize >= uint64(d.largeBatchThreshold) {
1027 0 : flushMem()
1028 0 : // Make a copy of the data slice since it is currently owned by buf and will
1029 0 : // be reused in the next iteration.
1030 0 : b.data = slices.Clone(b.data)
1031 0 : b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
1032 0 : if err != nil {
1033 0 : return nil, 0, err
1034 0 : }
1035 0 : entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
1036 0 : // Disable memory accounting by adding a reader ref that will never be
1037 0 : // removed.
1038 0 : entry.readerRefs.Add(1)
1039 0 : if d.opts.ReadOnly {
1040 0 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1041 0 : // We added the flushable batch to the flushable to the queue.
1042 0 : // But there must be at least one WAL entry waiting to be
1043 0 : // replayed. We have to ensure this newer WAL entry isn't
1044 0 : // replayed into the current value of d.mu.mem.mutable because
1045 0 : // the current mutable memtable exists before this flushable
1046 0 : // entry in the memtable queue. To ensure this, we just need to
1047 0 : // unset d.mu.mem.mutable. When a newer WAL is replayed, we will
1048 0 : // set d.mu.mem.mutable to a newer value.
1049 0 : d.mu.mem.mutable = nil
1050 0 : } else {
1051 0 : toFlush = append(toFlush, entry)
1052 0 : }
1053 1 : } else {
1054 1 : ensureMem(seqNum)
1055 1 : if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
1056 0 : return nil, 0, err
1057 0 : }
1058 : // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
1059 : // batch may not initially fit, but will eventually fit (since it is smaller than
1060 : // largeBatchThreshold).
1061 1 : for err == arenaskl.ErrArenaFull {
1062 1 : flushMem()
1063 1 : ensureMem(seqNum)
1064 1 : err = mem.prepare(&b)
1065 1 : if err != nil && err != arenaskl.ErrArenaFull {
1066 0 : return nil, 0, err
1067 0 : }
1068 : }
1069 1 : if err = mem.apply(&b, seqNum); err != nil {
1070 0 : return nil, 0, err
1071 0 : }
1072 1 : mem.writerUnref()
1073 : }
1074 1 : buf.Reset()
1075 : }
1076 :
1077 1 : d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
1078 1 : jobID, base.DiskFileNum(ll.Num).String(), offset, keysReplayed, batchesReplayed)
1079 1 : flushMem()
1080 1 :
1081 1 : // mem is nil here.
1082 1 : if !d.opts.ReadOnly && batchesReplayed > 0 {
1083 1 : err = updateVE()
1084 1 : if err != nil {
1085 0 : return nil, 0, err
1086 0 : }
1087 : }
1088 1 : return toFlush, maxSeqNum, err
1089 : }
1090 :
1091 1 : func readOptionsFile(opts *Options, path string) (string, error) {
1092 1 : f, err := opts.FS.Open(path)
1093 1 : if err != nil {
1094 0 : return "", err
1095 0 : }
1096 1 : defer f.Close()
1097 1 :
1098 1 : data, err := io.ReadAll(f)
1099 1 : if err != nil {
1100 0 : return "", err
1101 0 : }
1102 1 : return string(data), nil
1103 : }
1104 :
1105 : // DBDesc briefly describes high-level state about a database.
1106 : type DBDesc struct {
1107 : // Exists is true if an existing database was found.
1108 : Exists bool
1109 : // FormatMajorVersion indicates the database's current format
1110 : // version.
1111 : FormatMajorVersion FormatMajorVersion
1112 : // ManifestFilename is the filename of the current active manifest,
1113 : // if the database exists.
1114 : ManifestFilename string
1115 : // OptionsFilename is the filename of the most recent OPTIONS file, if it
1116 : // exists.
1117 : OptionsFilename string
1118 : }
1119 :
1120 : // String implements fmt.Stringer.
1121 0 : func (d *DBDesc) String() string {
1122 0 : if !d.Exists {
1123 0 : return "uninitialized"
1124 0 : }
1125 0 : var buf bytes.Buffer
1126 0 : fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
1127 0 : fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
1128 0 : fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
1129 0 : return buf.String()
1130 : }
1131 :
1132 : // Peek looks for an existing database in dirname on the provided FS. It
1133 : // returns a brief description of the database. Peek is read-only and
1134 : // does not open the database
1135 0 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
1136 0 : ls, err := fs.List(dirname)
1137 0 : if err != nil {
1138 0 : return nil, err
1139 0 : }
1140 :
1141 0 : vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
1142 0 : if err != nil {
1143 0 : return nil, err
1144 0 : }
1145 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1146 : // PeekMarker variant that avoids opening the directory.
1147 0 : if err := versMarker.Close(); err != nil {
1148 0 : return nil, err
1149 0 : }
1150 :
1151 : // Find the currently active manifest, if there is one.
1152 0 : manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
1153 0 : if err != nil {
1154 0 : return nil, err
1155 0 : }
1156 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1157 : // PeekMarker variant that avoids opening the directory.
1158 0 : if err := manifestMarker.Close(); err != nil {
1159 0 : return nil, err
1160 0 : }
1161 :
1162 0 : desc := &DBDesc{
1163 0 : Exists: exists,
1164 0 : FormatMajorVersion: vers,
1165 0 : }
1166 0 :
1167 0 : // Find the OPTIONS file with the highest file number within the list of
1168 0 : // directory entries.
1169 0 : var previousOptionsFileNum base.DiskFileNum
1170 0 : for _, filename := range ls {
1171 0 : ft, fn, ok := base.ParseFilename(fs, filename)
1172 0 : if !ok || ft != fileTypeOptions || fn < previousOptionsFileNum {
1173 0 : continue
1174 : }
1175 0 : previousOptionsFileNum = fn
1176 0 : desc.OptionsFilename = fs.PathJoin(dirname, filename)
1177 : }
1178 :
1179 0 : if exists {
1180 0 : desc.ManifestFilename = base.MakeFilepath(fs, dirname, fileTypeManifest, manifestFileNum)
1181 0 : }
1182 0 : return desc, nil
1183 : }
1184 :
1185 : // LockDirectory acquires the database directory lock in the named directory,
1186 : // preventing another process from opening the database. LockDirectory returns a
1187 : // handle to the held lock that may be passed to Open through Options.Lock to
1188 : // subsequently open the database, skipping lock acquistion during Open.
1189 : //
1190 : // LockDirectory may be used to expand the critical section protected by the
1191 : // database lock to include setup before the call to Open.
1192 1 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
1193 1 : fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, fileTypeLock, base.DiskFileNum(0)))
1194 1 : if err != nil {
1195 0 : return nil, err
1196 0 : }
1197 1 : l := &Lock{dirname: dirname, fileLock: fileLock}
1198 1 : l.refs.Store(1)
1199 1 : invariants.SetFinalizer(l, func(obj interface{}) {
1200 1 : if refs := obj.(*Lock).refs.Load(); refs > 0 {
1201 0 : panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
1202 : }
1203 : })
1204 1 : return l, nil
1205 : }
1206 :
1207 : // Lock represents a file lock on a directory. It may be passed to Open through
1208 : // Options.Lock to elide lock aquisition during Open.
1209 : type Lock struct {
1210 : dirname string
1211 : fileLock io.Closer
1212 : // refs is a count of the number of handles on the lock. refs must be 0, 1
1213 : // or 2.
1214 : //
1215 : // When acquired by the client and passed to Open, refs = 1 and the Open
1216 : // call increments it to 2. When the database is closed, it's decremented to
1217 : // 1. Finally when the original caller, calls Close on the Lock, it's
1218 : // drecemented to zero and the underlying file lock is released.
1219 : //
1220 : // When Open acquires the file lock, refs remains at 1 until the database is
1221 : // closed.
1222 : refs atomic.Int32
1223 : }
1224 :
1225 0 : func (l *Lock) refForOpen() error {
1226 0 : // During Open, when a user passed in a lock, the reference count must be
1227 0 : // exactly 1. If it's zero, the lock is no longer held and is invalid. If
1228 0 : // it's 2, the lock is already in use by another database within the
1229 0 : // process.
1230 0 : if !l.refs.CompareAndSwap(1, 2) {
1231 0 : return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
1232 0 : }
1233 0 : return nil
1234 : }
1235 :
1236 : // Close releases the lock, permitting another process to lock and open the
1237 : // database. Close must not be called until after a database using the Lock has
1238 : // been closed.
1239 1 : func (l *Lock) Close() error {
1240 1 : if l.refs.Add(-1) > 0 {
1241 0 : return nil
1242 0 : }
1243 1 : defer func() { l.fileLock = nil }()
1244 1 : return l.fileLock.Close()
1245 : }
1246 :
1247 0 : func (l *Lock) pathMatches(dirname string) error {
1248 0 : if dirname == l.dirname {
1249 0 : return nil
1250 0 : }
1251 : // Check for relative paths, symlinks, etc. This isn't ideal because we're
1252 : // circumventing the vfs.FS interface here.
1253 : //
1254 : // TODO(jackson): We could add support for retrieving file inodes through Stat
1255 : // calls in the VFS interface on platforms where it's available and use that
1256 : // to differentiate.
1257 0 : dirStat, err1 := os.Stat(dirname)
1258 0 : lockDirStat, err2 := os.Stat(l.dirname)
1259 0 : if err1 == nil && err2 == nil && os.SameFile(dirStat, lockDirStat) {
1260 0 : return nil
1261 0 : }
1262 0 : return errors.Join(
1263 0 : errors.Newf("pebble: opts.Lock acquired in %q not %q", l.dirname, dirname),
1264 0 : err1, err2)
1265 : }
1266 :
1267 : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
1268 : // does not exist.
1269 : //
1270 : // Note that errors can be wrapped with more details; use errors.Is().
1271 : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
1272 :
1273 : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
1274 : // already exists.
1275 : //
1276 : // Note that errors can be wrapped with more details; use errors.Is().
1277 : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
1278 :
1279 : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
1280 : // already exists and is not pristine.
1281 : //
1282 : // Note that errors can be wrapped with more details; use errors.Is().
1283 : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
1284 :
1285 : // IsCorruptionError returns true if the given error indicates database
1286 : // corruption.
1287 0 : func IsCorruptionError(err error) bool {
1288 0 : return errors.Is(err, base.ErrCorruption)
1289 0 : }
1290 :
1291 1 : func checkConsistency(v *manifest.Version, dirname string, objProvider objstorage.Provider) error {
1292 1 : var errs []error
1293 1 : dedup := make(map[base.DiskFileNum]struct{})
1294 1 : for level, files := range v.Levels {
1295 1 : iter := files.Iter()
1296 1 : for f := iter.First(); f != nil; f = iter.Next() {
1297 1 : backingState := f.FileBacking
1298 1 : if _, ok := dedup[backingState.DiskFileNum]; ok {
1299 1 : continue
1300 : }
1301 1 : dedup[backingState.DiskFileNum] = struct{}{}
1302 1 : fileNum := backingState.DiskFileNum
1303 1 : fileSize := backingState.Size
1304 1 : // We skip over remote objects; those are instead checked asynchronously
1305 1 : // by the table stats loading job.
1306 1 : meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
1307 1 : var size int64
1308 1 : if err == nil {
1309 1 : if meta.IsRemote() {
1310 1 : continue
1311 : }
1312 1 : size, err = objProvider.Size(meta)
1313 : }
1314 1 : if err != nil {
1315 0 : errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
1316 0 : continue
1317 : }
1318 :
1319 1 : if size != int64(fileSize) {
1320 0 : errs = append(errs, errors.Errorf(
1321 0 : "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
1322 0 : errors.Safe(level), fileNum, objProvider.Path(meta),
1323 0 : errors.Safe(size), errors.Safe(fileSize)))
1324 0 : continue
1325 : }
1326 : }
1327 : }
1328 1 : return errors.Join(errs...)
1329 : }
1330 :
1331 : type walEventListenerAdaptor struct {
1332 : l *EventListener
1333 : }
1334 :
1335 1 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
1336 1 : // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
1337 1 : // is insufficient to infer whether primary or secondary.
1338 1 : wci := WALCreateInfo{
1339 1 : JobID: ci.JobID,
1340 1 : Path: ci.Path,
1341 1 : FileNum: base.DiskFileNum(ci.Num),
1342 1 : RecycledFileNum: ci.RecycledFileNum,
1343 1 : Err: ci.Err,
1344 1 : }
1345 1 : l.l.WALCreated(wci)
1346 1 : }
|