Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "math"
14 : "os"
15 : "slices"
16 : "sync/atomic"
17 : "time"
18 :
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/errors/oserror"
21 : "github.com/cockroachdb/pebble/batchrepr"
22 : "github.com/cockroachdb/pebble/internal/arenaskl"
23 : "github.com/cockroachdb/pebble/internal/base"
24 : "github.com/cockroachdb/pebble/internal/cache"
25 : "github.com/cockroachdb/pebble/internal/constants"
26 : "github.com/cockroachdb/pebble/internal/invariants"
27 : "github.com/cockroachdb/pebble/internal/manifest"
28 : "github.com/cockroachdb/pebble/internal/manual"
29 : "github.com/cockroachdb/pebble/objstorage"
30 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
31 : "github.com/cockroachdb/pebble/objstorage/remote"
32 : "github.com/cockroachdb/pebble/record"
33 : "github.com/cockroachdb/pebble/sstable"
34 : "github.com/cockroachdb/pebble/vfs"
35 : "github.com/cockroachdb/pebble/wal"
36 : "github.com/prometheus/client_golang/prometheus"
37 : )
38 :
39 : const (
40 : initialMemTableSize = 256 << 10 // 256 KB
41 :
42 : // The max batch size is limited by the uint32 offsets stored in
43 : // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
44 : //
45 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
46 : // end of an allocation fits in uint32.
47 : //
48 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
49 : // 2GB).
50 : maxBatchSize = constants.MaxUint32OrInt
51 :
52 : // The max memtable size is limited by the uint32 offsets stored in
53 : // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
54 : //
55 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
56 : // end of an allocation fits in uint32.
57 : //
58 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
59 : // 2GB).
60 : maxMemTableSize = constants.MaxUint32OrInt
61 : )
62 :
63 : // TableCacheSize can be used to determine the table
64 : // cache size for a single db, given the maximum open
65 : // files which can be used by a table cache which is
66 : // only used by a single db.
67 2 : func TableCacheSize(maxOpenFiles int) int {
68 2 : tableCacheSize := maxOpenFiles - numNonTableCacheFiles
69 2 : if tableCacheSize < minTableCacheSize {
70 1 : tableCacheSize = minTableCacheSize
71 1 : }
72 2 : return tableCacheSize
73 : }
74 :
75 : // Open opens a DB whose files live in the given directory.
76 2 : func Open(dirname string, opts *Options) (db *DB, err error) {
77 2 : // Make a copy of the options so that we don't mutate the passed in options.
78 2 : opts = opts.Clone()
79 2 : opts = opts.EnsureDefaults()
80 2 : if err := opts.Validate(); err != nil {
81 0 : return nil, err
82 0 : }
83 2 : if opts.LoggerAndTracer == nil {
84 2 : opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
85 2 : } else {
86 1 : opts.Logger = opts.LoggerAndTracer
87 1 : }
88 :
89 : // In all error cases, we return db = nil; this is used by various
90 : // deferred cleanups.
91 :
92 : // Open the database and WAL directories first.
93 2 : walDirname, dataDir, err := prepareAndOpenDirs(dirname, opts)
94 2 : if err != nil {
95 1 : return nil, errors.Wrapf(err, "error opening database at %q", dirname)
96 1 : }
97 2 : defer func() {
98 2 : if db == nil {
99 1 : dataDir.Close()
100 1 : }
101 : }()
102 :
103 : // Lock the database directory.
104 2 : var fileLock *Lock
105 2 : if opts.Lock != nil {
106 1 : // The caller already acquired the database lock. Ensure that the
107 1 : // directory matches.
108 1 : if err := opts.Lock.pathMatches(dirname); err != nil {
109 0 : return nil, err
110 0 : }
111 1 : if err := opts.Lock.refForOpen(); err != nil {
112 1 : return nil, err
113 1 : }
114 1 : fileLock = opts.Lock
115 2 : } else {
116 2 : fileLock, err = LockDirectory(dirname, opts.FS)
117 2 : if err != nil {
118 1 : return nil, err
119 1 : }
120 : }
121 2 : defer func() {
122 2 : if db == nil {
123 1 : fileLock.Close()
124 1 : }
125 : }()
126 :
127 : // List the directory contents. This also happens to include WAL log files, if
128 : // they are in the same dir, but we will ignore those below. The provider is
129 : // also given this list, but it ignores non sstable files.
130 2 : ls, err := opts.FS.List(dirname)
131 2 : if err != nil {
132 1 : return nil, err
133 1 : }
134 :
135 : // Establish the format major version.
136 2 : formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
137 2 : if err != nil {
138 1 : return nil, err
139 1 : }
140 2 : defer func() {
141 2 : if db == nil {
142 1 : formatVersionMarker.Close()
143 1 : }
144 : }()
145 :
146 2 : noFormatVersionMarker := formatVersion == FormatDefault
147 2 : if noFormatVersionMarker {
148 2 : // We will initialize the store at the minimum possible format, then upgrade
149 2 : // the format to the desired one. This helps test the format upgrade code.
150 2 : formatVersion = FormatMinSupported
151 2 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
152 2 : formatVersion = FormatMinForSharedObjects
153 2 : }
154 : // There is no format version marker file. There are three cases:
155 : // - we are trying to open an existing store that was created at
156 : // FormatMostCompatible (the only one without a version marker file)
157 : // - we are creating a new store;
158 : // - we are retrying a failed creation.
159 : //
160 : // To error in the first case, we set ErrorIfNotPristine.
161 2 : opts.ErrorIfNotPristine = true
162 2 : defer func() {
163 2 : if err != nil && errors.Is(err, ErrDBNotPristine) {
164 0 : // We must be trying to open an existing store at FormatMostCompatible.
165 0 : // Correct the error in this case -we
166 0 : err = errors.Newf(
167 0 : "pebble: database %q written in format major version 1 which is no longer supported",
168 0 : dirname)
169 0 : }
170 : }()
171 2 : } else {
172 2 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone && formatVersion < FormatMinForSharedObjects {
173 0 : return nil, errors.Newf(
174 0 : "pebble: database %q configured with shared objects but written in too old format major version %d",
175 0 : formatVersion)
176 0 : }
177 : }
178 :
179 : // Find the currently active manifest, if there is one.
180 2 : manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
181 2 : if err != nil {
182 1 : return nil, errors.Wrapf(err, "pebble: database %q", dirname)
183 1 : }
184 2 : defer func() {
185 2 : if db == nil {
186 1 : manifestMarker.Close()
187 1 : }
188 : }()
189 :
190 : // Atomic markers may leave behind obsolete files if there's a crash
191 : // mid-update. Clean these up if we're not in read-only mode.
192 2 : if !opts.ReadOnly {
193 2 : if err := formatVersionMarker.RemoveObsolete(); err != nil {
194 0 : return nil, err
195 0 : }
196 2 : if err := manifestMarker.RemoveObsolete(); err != nil {
197 0 : return nil, err
198 0 : }
199 : }
200 :
201 2 : if opts.Cache == nil {
202 1 : opts.Cache = cache.New(cacheDefaultSize)
203 2 : } else {
204 2 : opts.Cache.Ref()
205 2 : }
206 :
207 2 : d := &DB{
208 2 : cacheID: opts.Cache.NewID(),
209 2 : dirname: dirname,
210 2 : opts: opts,
211 2 : cmp: opts.Comparer.Compare,
212 2 : equal: opts.Comparer.Equal,
213 2 : merge: opts.Merger.Merge,
214 2 : split: opts.Comparer.Split,
215 2 : abbreviatedKey: opts.Comparer.AbbreviatedKey,
216 2 : largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
217 2 : fileLock: fileLock,
218 2 : dataDir: dataDir,
219 2 : closed: new(atomic.Value),
220 2 : closedCh: make(chan struct{}),
221 2 : }
222 2 : d.mu.versions = &versionSet{}
223 2 : d.diskAvailBytes.Store(math.MaxUint64)
224 2 :
225 2 : defer func() {
226 2 : // If an error or panic occurs during open, attempt to release the manually
227 2 : // allocated memory resources. Note that rather than look for an error, we
228 2 : // look for the return of a nil DB pointer.
229 2 : if r := recover(); db == nil {
230 1 : // Release our references to the Cache. Note that both the DB, and
231 1 : // tableCache have a reference. When we release the reference to
232 1 : // the tableCache, and if there are no other references to
233 1 : // the tableCache, then the tableCache will also release its
234 1 : // reference to the cache.
235 1 : opts.Cache.Unref()
236 1 :
237 1 : if d.tableCache != nil {
238 1 : _ = d.tableCache.close()
239 1 : }
240 :
241 1 : for _, mem := range d.mu.mem.queue {
242 1 : switch t := mem.flushable.(type) {
243 1 : case *memTable:
244 1 : manual.Free(t.arenaBuf)
245 1 : t.arenaBuf = nil
246 : }
247 : }
248 1 : if d.cleanupManager != nil {
249 1 : d.cleanupManager.Close()
250 1 : }
251 1 : if d.objProvider != nil {
252 1 : d.objProvider.Close()
253 1 : }
254 1 : if r != nil {
255 1 : panic(r)
256 : }
257 : }
258 : }()
259 :
260 2 : d.commit = newCommitPipeline(commitEnv{
261 2 : logSeqNum: &d.mu.versions.logSeqNum,
262 2 : visibleSeqNum: &d.mu.versions.visibleSeqNum,
263 2 : apply: d.commitApply,
264 2 : write: d.commitWrite,
265 2 : })
266 2 : d.mu.nextJobID = 1
267 2 : d.mu.mem.nextSize = opts.MemTableSize
268 2 : if d.mu.mem.nextSize > initialMemTableSize {
269 2 : d.mu.mem.nextSize = initialMemTableSize
270 2 : }
271 2 : d.mu.compact.cond.L = &d.mu.Mutex
272 2 : d.mu.compact.inProgress = make(map[*compaction]struct{})
273 2 : d.mu.compact.noOngoingFlushStartTime = time.Now()
274 2 : d.mu.snapshots.init()
275 2 : // logSeqNum is the next sequence number that will be assigned.
276 2 : // Start assigning sequence numbers from base.SeqNumStart to leave
277 2 : // room for reserved sequence numbers (see comments around
278 2 : // SeqNumStart).
279 2 : d.mu.versions.logSeqNum.Store(base.SeqNumStart)
280 2 : d.mu.formatVers.vers.Store(uint64(formatVersion))
281 2 : d.mu.formatVers.marker = formatVersionMarker
282 2 :
283 2 : d.timeNow = time.Now
284 2 : d.openedAt = d.timeNow()
285 2 :
286 2 : d.mu.Lock()
287 2 : defer d.mu.Unlock()
288 2 :
289 2 : jobID := d.newJobIDLocked()
290 2 :
291 2 : providerSettings := objstorageprovider.Settings{
292 2 : Logger: opts.Logger,
293 2 : FS: opts.FS,
294 2 : FSDirName: dirname,
295 2 : FSDirInitialListing: ls,
296 2 : FSCleaner: opts.Cleaner,
297 2 : NoSyncOnClose: opts.NoSyncOnClose,
298 2 : BytesPerSync: opts.BytesPerSync,
299 2 : }
300 2 : providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
301 2 : providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
302 2 : providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
303 2 : providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
304 2 :
305 2 : d.objProvider, err = objstorageprovider.Open(providerSettings)
306 2 : if err != nil {
307 1 : return nil, err
308 1 : }
309 :
310 2 : if !manifestExists {
311 2 : // DB does not exist.
312 2 : if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
313 1 : return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
314 1 : }
315 :
316 : // Create the DB.
317 2 : if err := d.mu.versions.create(
318 2 : jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
319 1 : return nil, err
320 1 : }
321 2 : } else {
322 2 : if opts.ErrorIfExists {
323 1 : return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
324 1 : }
325 : // Load the version set.
326 2 : if err := d.mu.versions.load(
327 2 : dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
328 1 : return nil, err
329 1 : }
330 2 : if opts.ErrorIfNotPristine {
331 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
332 1 : d.mu.versions.addLiveFileNums(liveFileNums)
333 1 : if len(liveFileNums) != 0 {
334 1 : return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
335 1 : }
336 : }
337 : }
338 :
339 : // In read-only mode, we replay directly into the mutable memtable but never
340 : // flush it. We need to delay creation of the memtable until we know the
341 : // sequence number of the first batch that will be inserted.
342 2 : if !d.opts.ReadOnly {
343 2 : var entry *flushableEntry
344 2 : d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
345 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
346 2 : }
347 :
348 2 : d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
349 2 : Buckets: FsyncLatencyBuckets,
350 2 : })
351 2 : walOpts := wal.Options{
352 2 : Primary: wal.Dir{FS: opts.FS, Dirname: walDirname},
353 2 : Secondary: wal.Dir{},
354 2 : MinUnflushedWALNum: wal.NumWAL(d.mu.versions.minUnflushedLogNum),
355 2 : MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
356 2 : NoSyncOnClose: opts.NoSyncOnClose,
357 2 : BytesPerSync: opts.WALBytesPerSync,
358 2 : PreallocateSize: d.walPreallocateSize,
359 2 : MinSyncInterval: opts.WALMinSyncInterval,
360 2 : FsyncLatency: d.mu.log.metrics.fsyncLatency,
361 2 : QueueSemChan: d.commit.logSyncQSem,
362 2 : Logger: opts.Logger,
363 2 : EventListener: walEventListenerAdaptor{l: opts.EventListener},
364 2 : }
365 2 : if opts.WALFailover != nil {
366 2 : walOpts.Secondary = opts.WALFailover.Secondary
367 2 : walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
368 2 : walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
369 2 : Buckets: FsyncLatencyBuckets,
370 2 : })
371 2 : }
372 2 : walDirs := append(walOpts.Dirs(), opts.WALRecoveryDirs...)
373 2 : wals, err := wal.Scan(walDirs...)
374 2 : if err != nil {
375 1 : return nil, err
376 1 : }
377 2 : walManager, err := wal.Init(walOpts, wals)
378 2 : if err != nil {
379 1 : return nil, err
380 1 : }
381 2 : defer func() {
382 2 : if db == nil {
383 1 : walManager.Close()
384 1 : }
385 : }()
386 :
387 2 : d.mu.log.manager = walManager
388 2 :
389 2 : d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
390 2 :
391 2 : if manifestExists && !opts.DisableConsistencyCheck {
392 2 : curVersion := d.mu.versions.currentVersion()
393 2 : if err := checkConsistency(curVersion, dirname, d.objProvider); err != nil {
394 1 : return nil, err
395 1 : }
396 : }
397 :
398 2 : tableCacheSize := TableCacheSize(opts.MaxOpenFiles)
399 2 : d.tableCache = newTableCacheContainer(
400 2 : opts.TableCache, d.cacheID, d.objProvider, d.opts, tableCacheSize,
401 2 : &sstable.CategoryStatsCollector{})
402 2 : d.newIters = d.tableCache.newIters
403 2 : d.tableNewRangeKeyIter = tableNewRangeKeyIter(context.TODO(), d.newIters)
404 2 :
405 2 : var previousOptionsFileNum base.DiskFileNum
406 2 : var previousOptionsFilename string
407 2 : for _, filename := range ls {
408 2 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
409 2 : if !ok {
410 2 : continue
411 : }
412 :
413 : // Don't reuse any obsolete file numbers to avoid modifying an
414 : // ingested sstable's original external file.
415 2 : d.mu.versions.markFileNumUsed(fn)
416 2 :
417 2 : switch ft {
418 0 : case fileTypeLog:
419 : // Ignore.
420 2 : case fileTypeOptions:
421 2 : if previousOptionsFileNum < fn {
422 2 : previousOptionsFileNum = fn
423 2 : previousOptionsFilename = filename
424 2 : }
425 1 : case fileTypeTemp, fileTypeOldTemp:
426 1 : if !d.opts.ReadOnly {
427 1 : // Some codepaths write to a temporary file and then
428 1 : // rename it to its final location when complete. A
429 1 : // temp file is leftover if a process exits before the
430 1 : // rename. Remove it.
431 1 : err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
432 1 : if err != nil {
433 0 : return nil, err
434 0 : }
435 : }
436 : }
437 : }
438 2 : if n := len(wals); n > 0 {
439 2 : // Don't reuse any obsolete file numbers to avoid modifying an
440 2 : // ingested sstable's original external file.
441 2 : d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
442 2 : }
443 :
444 : // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
445 : // objProvider. This avoids FileNum collisions with obsolete sstables.
446 2 : objects := d.objProvider.List()
447 2 : for _, obj := range objects {
448 2 : d.mu.versions.markFileNumUsed(obj.DiskFileNum)
449 2 : }
450 :
451 : // Validate the most-recent OPTIONS file, if there is one.
452 2 : if previousOptionsFilename != "" {
453 2 : path := opts.FS.PathJoin(dirname, previousOptionsFilename)
454 2 : previousOptions, err := readOptionsFile(opts, path)
455 2 : if err != nil {
456 0 : return nil, err
457 0 : }
458 2 : if err := opts.CheckCompatibility(previousOptions); err != nil {
459 1 : return nil, err
460 1 : }
461 : }
462 :
463 : // Replay any newer log files than the ones named in the manifest.
464 2 : var replayWALs wal.Logs
465 2 : for i, w := range wals {
466 2 : if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
467 2 : replayWALs = wals[i:]
468 2 : break
469 : }
470 : }
471 2 : var ve versionEdit
472 2 : var toFlush flushableList
473 2 : for i, lf := range replayWALs {
474 2 : // WALs other than the last one would have been closed cleanly.
475 2 : //
476 2 : // Note: we used to never require strict WAL tails when reading from older
477 2 : // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
478 2 : // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
479 2 : // compatible Pebble format is newer and guarantees a clean EOF.
480 2 : strictWALTail := i < len(replayWALs)-1
481 2 : flush, maxSeqNum, err := d.replayWAL(jobID, &ve, lf, strictWALTail)
482 2 : if err != nil {
483 1 : return nil, err
484 1 : }
485 2 : toFlush = append(toFlush, flush...)
486 2 : if d.mu.versions.logSeqNum.Load() < maxSeqNum {
487 2 : d.mu.versions.logSeqNum.Store(maxSeqNum)
488 2 : }
489 : }
490 2 : d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
491 2 :
492 2 : if !d.opts.ReadOnly {
493 2 : // Create an empty .log file.
494 2 : newLogNum := d.mu.versions.getNextDiskFileNum()
495 2 :
496 2 : // This logic is slightly different than RocksDB's. Specifically, RocksDB
497 2 : // sets MinUnflushedLogNum to max-recovered-log-num + 1. We set it to the
498 2 : // newLogNum. There should be no difference in using either value.
499 2 : ve.MinUnflushedLogNum = newLogNum
500 2 :
501 2 : // Create the manifest with the updated MinUnflushedLogNum before
502 2 : // creating the new log file. If we created the log file first, a
503 2 : // crash before the manifest is synced could leave two WALs with
504 2 : // unclean tails.
505 2 : d.mu.versions.logLock()
506 2 : if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo {
507 2 : return nil
508 2 : }); err != nil {
509 0 : return nil, err
510 0 : }
511 :
512 2 : for _, entry := range toFlush {
513 2 : entry.readerUnrefLocked(true)
514 2 : }
515 :
516 2 : d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
517 2 : if err != nil {
518 1 : return nil, err
519 1 : }
520 :
521 : // This isn't strictly necessary as we don't use the log number for
522 : // memtables being flushed, only for the next unflushed memtable.
523 2 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
524 : }
525 2 : d.updateReadStateLocked(d.opts.DebugCheck)
526 2 :
527 2 : if !d.opts.ReadOnly {
528 2 : // If the Options specify a format major version higher than the
529 2 : // loaded database's, upgrade it. If this is a new database, this
530 2 : // code path also performs an initial upgrade from the starting
531 2 : // implicit MinSupported version.
532 2 : //
533 2 : // We ratchet the version this far into Open so that migrations have a read
534 2 : // state available. Note that this also results in creating/updating the
535 2 : // format version marker file.
536 2 : if opts.FormatMajorVersion > d.FormatMajorVersion() {
537 2 : if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
538 0 : return nil, err
539 0 : }
540 2 : } else if noFormatVersionMarker {
541 2 : // We are creating a new store. Create the format version marker file.
542 2 : if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
543 1 : return nil, err
544 1 : }
545 : }
546 :
547 : // Write the current options to disk.
548 2 : d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
549 2 : tmpPath := base.MakeFilepath(opts.FS, dirname, fileTypeTemp, d.optionsFileNum)
550 2 : optionsPath := base.MakeFilepath(opts.FS, dirname, fileTypeOptions, d.optionsFileNum)
551 2 :
552 2 : // Write them to a temporary file first, in case we crash before
553 2 : // we're done. A corrupt options file prevents opening the
554 2 : // database.
555 2 : optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
556 2 : if err != nil {
557 1 : return nil, err
558 1 : }
559 2 : serializedOpts := []byte(opts.String())
560 2 : if _, err := optionsFile.Write(serializedOpts); err != nil {
561 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
562 1 : }
563 2 : d.optionsFileSize = uint64(len(serializedOpts))
564 2 : if err := optionsFile.Sync(); err != nil {
565 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
566 1 : }
567 2 : if err := optionsFile.Close(); err != nil {
568 0 : return nil, err
569 0 : }
570 : // Atomically rename to the OPTIONS-XXXXXX path. This rename is
571 : // guaranteed to be atomic because the destination path does not
572 : // exist.
573 2 : if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
574 1 : return nil, err
575 1 : }
576 2 : if err := d.dataDir.Sync(); err != nil {
577 1 : return nil, err
578 1 : }
579 : }
580 :
581 2 : if !d.opts.ReadOnly {
582 2 : d.scanObsoleteFiles(ls)
583 2 : d.deleteObsoleteFiles(jobID)
584 2 : }
585 : // Else, nothing is obsolete.
586 :
587 2 : d.mu.tableStats.cond.L = &d.mu.Mutex
588 2 : d.mu.tableValidation.cond.L = &d.mu.Mutex
589 2 : if !d.opts.ReadOnly {
590 2 : d.maybeCollectTableStatsLocked()
591 2 : }
592 2 : d.calculateDiskAvailableBytes()
593 2 :
594 2 : d.maybeScheduleFlush()
595 2 : d.maybeScheduleCompaction()
596 2 :
597 2 : // Note: this is a no-op if invariants are disabled or race is enabled.
598 2 : //
599 2 : // Setting a finalizer on *DB causes *DB to never be reclaimed and the
600 2 : // finalizer to never be run. The problem is due to this limitation of
601 2 : // finalizers mention in the SetFinalizer docs:
602 2 : //
603 2 : // If a cyclic structure includes a block with a finalizer, that cycle is
604 2 : // not guaranteed to be garbage collected and the finalizer is not
605 2 : // guaranteed to run, because there is no ordering that respects the
606 2 : // dependencies.
607 2 : //
608 2 : // DB has cycles with several of its internal structures: readState,
609 2 : // newIters, tableCache, versions, etc. Each of this individually cause a
610 2 : // cycle and prevent the finalizer from being run. But we can workaround this
611 2 : // finializer limitation by setting a finalizer on another object that is
612 2 : // tied to the lifetime of DB: the DB.closed atomic.Value.
613 2 : dPtr := fmt.Sprintf("%p", d)
614 2 : invariants.SetFinalizer(d.closed, func(obj interface{}) {
615 0 : v := obj.(*atomic.Value)
616 0 : if err := v.Load(); err == nil {
617 0 : fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
618 0 : os.Exit(1)
619 0 : }
620 : })
621 :
622 2 : return d, nil
623 : }
624 :
625 : // prepareAndOpenDirs opens the directories for the store (and creates them if
626 : // necessary).
627 : //
628 : // Returns an error if ReadOnly is set and the directories don't exist.
629 : func prepareAndOpenDirs(
630 : dirname string, opts *Options,
631 2 : ) (walDirname string, dataDir vfs.File, err error) {
632 2 : walDirname = opts.WALDir
633 2 : if opts.WALDir == "" {
634 2 : walDirname = dirname
635 2 : }
636 :
637 : // Create directories if needed.
638 2 : if !opts.ReadOnly {
639 2 : f, err := mkdirAllAndSyncParents(opts.FS, dirname)
640 2 : if err != nil {
641 1 : return "", nil, err
642 1 : }
643 2 : f.Close()
644 2 : if walDirname != dirname {
645 2 : f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
646 2 : if err != nil {
647 0 : return "", nil, err
648 0 : }
649 2 : f.Close()
650 : }
651 2 : if opts.WALFailover != nil {
652 2 : secondary := opts.WALFailover.Secondary
653 2 : f, err := mkdirAllAndSyncParents(secondary.FS, secondary.Dirname)
654 2 : if err != nil {
655 0 : return "", nil, err
656 0 : }
657 2 : f.Close()
658 : }
659 : }
660 :
661 2 : dataDir, err = opts.FS.OpenDir(dirname)
662 2 : if err != nil {
663 1 : if opts.ReadOnly && oserror.IsNotExist(err) {
664 1 : return "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
665 1 : }
666 1 : return "", nil, err
667 : }
668 2 : if opts.ReadOnly && walDirname != dirname {
669 1 : // Check that the wal dir exists.
670 1 : walDir, err := opts.FS.OpenDir(walDirname)
671 1 : if err != nil {
672 1 : dataDir.Close()
673 1 : return "", nil, err
674 1 : }
675 1 : walDir.Close()
676 : }
677 :
678 2 : return walDirname, dataDir, nil
679 : }
680 :
681 : // GetVersion returns the engine version string from the latest options
682 : // file present in dir. Used to check what Pebble or RocksDB version was last
683 : // used to write to the database stored in this directory. An empty string is
684 : // returned if no valid OPTIONS file with a version key was found.
685 1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
686 1 : ls, err := fs.List(dir)
687 1 : if err != nil {
688 0 : return "", err
689 0 : }
690 1 : var version string
691 1 : lastOptionsSeen := base.DiskFileNum(0)
692 1 : for _, filename := range ls {
693 1 : ft, fn, ok := base.ParseFilename(fs, filename)
694 1 : if !ok {
695 1 : continue
696 : }
697 1 : switch ft {
698 1 : case fileTypeOptions:
699 1 : // If this file has a higher number than the last options file
700 1 : // processed, reset version. This is because rocksdb often
701 1 : // writes multiple options files without deleting previous ones.
702 1 : // Otherwise, skip parsing this options file.
703 1 : if fn > lastOptionsSeen {
704 1 : version = ""
705 1 : lastOptionsSeen = fn
706 1 : } else {
707 0 : continue
708 : }
709 1 : f, err := fs.Open(fs.PathJoin(dir, filename))
710 1 : if err != nil {
711 0 : return "", err
712 0 : }
713 1 : data, err := io.ReadAll(f)
714 1 : f.Close()
715 1 :
716 1 : if err != nil {
717 0 : return "", err
718 0 : }
719 1 : err = parseOptions(string(data), func(section, key, value string) error {
720 1 : switch {
721 1 : case section == "Version":
722 1 : switch key {
723 1 : case "pebble_version":
724 1 : version = value
725 1 : case "rocksdb_version":
726 1 : version = fmt.Sprintf("rocksdb v%s", value)
727 : }
728 : }
729 1 : return nil
730 : })
731 1 : if err != nil {
732 0 : return "", err
733 0 : }
734 : }
735 : }
736 1 : return version, nil
737 : }
738 :
739 : // replayWAL replays the edits in the specified WAL. If the DB is in read
740 : // only mode, then the WALs are replayed into memtables and not flushed. If
741 : // the DB is not in read only mode, then the contents of the WAL are
742 : // guaranteed to be flushed. Note that this flushing is very important for
743 : // guaranteeing durability: the application may have had a number of pending
744 : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
745 : // happened but the corresponding data may now be readable from the WAL (while
746 : // sitting in write-back caches in the kernel or the storage device). By
747 : // reading the WAL (including the non-fsynced data) and then flushing all
748 : // these changes (flush does fsyncs), we are able to guarantee that the
749 : // initial state of the DB is durable.
750 : //
751 : // The toFlush return value is a list of flushables associated with the WAL
752 : // being replayed which will be flushed. Once the version edit has been applied
753 : // to the manifest, it is up to the caller of replayWAL to unreference the
754 : // toFlush flushables returned by replayWAL.
755 : //
756 : // d.mu must be held when calling this, but the mutex may be dropped and
757 : // re-acquired during the course of this method.
758 : func (d *DB) replayWAL(
759 : jobID JobID, ve *versionEdit, ll wal.LogicalLog, strictWALTail bool,
760 2 : ) (toFlush flushableList, maxSeqNum uint64, err error) {
761 2 : rr := ll.OpenForRead()
762 2 : defer rr.Close()
763 2 : var (
764 2 : b Batch
765 2 : buf bytes.Buffer
766 2 : mem *memTable
767 2 : entry *flushableEntry
768 2 : offset int64 // byte offset in rr
769 2 : lastFlushOffset int64
770 2 : keysReplayed int64 // number of keys replayed
771 2 : batchesReplayed int64 // number of batches replayed
772 2 : )
773 2 :
774 2 : // TODO(jackson): This function is interspersed with panics, in addition to
775 2 : // corruption error propagation. Audit them to ensure we're truly only
776 2 : // panicking where the error points to Pebble bug and not user or
777 2 : // hardware-induced corruption.
778 2 :
779 2 : if d.opts.ReadOnly {
780 1 : // In read-only mode, we replay directly into the mutable memtable which will
781 1 : // never be flushed.
782 1 : mem = d.mu.mem.mutable
783 1 : if mem != nil {
784 1 : entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
785 1 : }
786 : }
787 :
788 : // Flushes the current memtable, if not nil.
789 2 : flushMem := func() {
790 2 : if mem == nil {
791 2 : return
792 2 : }
793 2 : var logSize uint64
794 2 : if offset >= lastFlushOffset {
795 2 : logSize = uint64(offset - lastFlushOffset)
796 2 : }
797 : // Else, this was the initial memtable in the read-only case which must have
798 : // been empty, but we need to flush it since we don't want to add to it later.
799 2 : lastFlushOffset = offset
800 2 : entry.logSize = logSize
801 2 : if !d.opts.ReadOnly {
802 2 : toFlush = append(toFlush, entry)
803 2 : }
804 2 : mem, entry = nil, nil
805 : }
806 : // Creates a new memtable if there is no current memtable.
807 2 : ensureMem := func(seqNum uint64) {
808 2 : if mem != nil {
809 2 : return
810 2 : }
811 2 : mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
812 2 : if d.opts.ReadOnly {
813 1 : d.mu.mem.mutable = mem
814 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
815 1 : }
816 : }
817 :
818 : // updateVE is used to update ve with information about new files created
819 : // during the flush of any flushable not of type ingestedFlushable. For the
820 : // flushable of type ingestedFlushable we use custom handling below.
821 2 : updateVE := func() error {
822 2 : // TODO(bananabrick): See if we can use the actual base level here,
823 2 : // instead of using 1.
824 2 : c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
825 2 : 1 /* base level */, toFlush, d.timeNow())
826 2 : if err != nil {
827 0 : return err
828 0 : }
829 2 : newVE, _, err := d.runCompaction(jobID, c)
830 2 : if err != nil {
831 0 : return errors.Wrapf(err, "running compaction during WAL replay")
832 0 : }
833 2 : ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...)
834 2 : return nil
835 : }
836 2 : defer func() {
837 2 : if err != nil {
838 1 : err = errors.WithDetailf(err, "replaying wal %d, offset %d", ll.Num, offset)
839 1 : }
840 : }()
841 :
842 2 : for {
843 2 : r, offset, err := rr.NextRecord()
844 2 : if err == nil {
845 2 : _, err = io.Copy(&buf, r)
846 2 : }
847 2 : if err != nil {
848 2 : // It is common to encounter a zeroed or invalid chunk due to WAL
849 2 : // preallocation and WAL recycling. We need to distinguish these
850 2 : // errors from EOF in order to recognize that the record was
851 2 : // truncated and to avoid replaying subsequent WALs, but want
852 2 : // to otherwise treat them like EOF.
853 2 : if err == io.EOF {
854 2 : break
855 1 : } else if record.IsInvalidRecord(err) && !strictWALTail {
856 1 : break
857 : }
858 1 : return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
859 : }
860 :
861 2 : if buf.Len() < batchrepr.HeaderLen {
862 0 : return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
863 0 : errors.Safe(base.DiskFileNum(ll.Num)), offset)
864 0 : }
865 :
866 2 : if d.opts.ErrorIfNotPristine {
867 1 : return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
868 1 : }
869 :
870 : // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
871 : // which is used below.
872 2 : b = Batch{}
873 2 : b.db = d
874 2 : b.SetRepr(buf.Bytes())
875 2 : seqNum := b.SeqNum()
876 2 : maxSeqNum = seqNum + uint64(b.Count())
877 2 : keysReplayed += int64(b.Count())
878 2 : batchesReplayed++
879 2 : {
880 2 : br := b.Reader()
881 2 : if kind, encodedFileNum, _, ok, err := br.Next(); err != nil {
882 0 : return nil, 0, err
883 2 : } else if ok && kind == InternalKeyKindIngestSST {
884 2 : fileNums := make([]base.DiskFileNum, 0, b.Count())
885 2 : addFileNum := func(encodedFileNum []byte) {
886 2 : fileNum, n := binary.Uvarint(encodedFileNum)
887 2 : if n <= 0 {
888 0 : panic("pebble: ingest sstable file num is invalid.")
889 : }
890 2 : fileNums = append(fileNums, base.DiskFileNum(fileNum))
891 : }
892 2 : addFileNum(encodedFileNum)
893 2 :
894 2 : for i := 1; i < int(b.Count()); i++ {
895 2 : kind, encodedFileNum, _, ok, err := br.Next()
896 2 : if err != nil {
897 0 : return nil, 0, err
898 0 : }
899 2 : if kind != InternalKeyKindIngestSST {
900 0 : panic("pebble: invalid batch key kind.")
901 : }
902 2 : if !ok {
903 0 : panic("pebble: invalid batch count.")
904 : }
905 2 : addFileNum(encodedFileNum)
906 : }
907 :
908 2 : if _, _, _, ok, err := br.Next(); err != nil {
909 0 : return nil, 0, err
910 2 : } else if ok {
911 0 : panic("pebble: invalid number of entries in batch.")
912 : }
913 :
914 2 : meta := make([]*fileMetadata, len(fileNums))
915 2 : for i, n := range fileNums {
916 2 : var readable objstorage.Readable
917 2 : objMeta, err := d.objProvider.Lookup(fileTypeTable, n)
918 2 : if err != nil {
919 0 : return nil, 0, errors.Wrap(err, "pebble: error when looking up ingested SSTs")
920 0 : }
921 2 : if objMeta.IsRemote() {
922 1 : readable, err = d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true})
923 1 : if err != nil {
924 0 : return nil, 0, errors.Wrap(err, "pebble: error when opening flushable ingest files")
925 0 : }
926 2 : } else {
927 2 : path := base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n)
928 2 : f, err := d.opts.FS.Open(path)
929 2 : if err != nil {
930 0 : return nil, 0, err
931 0 : }
932 :
933 2 : readable, err = sstable.NewSimpleReadable(f)
934 2 : if err != nil {
935 0 : return nil, 0, err
936 0 : }
937 : }
938 : // NB: ingestLoad1 will close readable.
939 2 : meta[i], err = ingestLoad1(d.opts, d.FormatMajorVersion(), readable, d.cacheID, base.PhysicalTableFileNum(n))
940 2 : if err != nil {
941 0 : return nil, 0, errors.Wrap(err, "pebble: error when loading flushable ingest files")
942 0 : }
943 : }
944 :
945 2 : if uint32(len(meta)) != b.Count() {
946 0 : panic("pebble: couldn't load all files in WAL entry.")
947 : }
948 :
949 2 : entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), KeyRange{})
950 2 : if err != nil {
951 0 : return nil, 0, err
952 0 : }
953 :
954 2 : if d.opts.ReadOnly {
955 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
956 1 : // We added the IngestSST flushable to the queue. But there
957 1 : // must be at least one WAL entry waiting to be replayed. We
958 1 : // have to ensure this newer WAL entry isn't replayed into
959 1 : // the current value of d.mu.mem.mutable because the current
960 1 : // mutable memtable exists before this flushable entry in
961 1 : // the memtable queue. To ensure this, we just need to unset
962 1 : // d.mu.mem.mutable. When a newer WAL is replayed, we will
963 1 : // set d.mu.mem.mutable to a newer value.
964 1 : d.mu.mem.mutable = nil
965 2 : } else {
966 2 : toFlush = append(toFlush, entry)
967 2 : // During WAL replay, the lsm only has L0, hence, the
968 2 : // baseLevel is 1. For the sake of simplicity, we place the
969 2 : // ingested files in L0 here, instead of finding their
970 2 : // target levels. This is a simplification for the sake of
971 2 : // simpler code. It is expected that WAL replay should be
972 2 : // rare, and that flushables of type ingestedFlushable
973 2 : // should also be rare. So, placing the ingested files in L0
974 2 : // is alright.
975 2 : //
976 2 : // TODO(bananabrick): Maybe refactor this function to allow
977 2 : // us to easily place ingested files in levels as low as
978 2 : // possible during WAL replay. It would require breaking up
979 2 : // the application of ve to the manifest into chunks and is
980 2 : // not pretty w/o a refactor to this function and how it's
981 2 : // used.
982 2 : c, err := newFlush(
983 2 : d.opts, d.mu.versions.currentVersion(),
984 2 : 1, /* base level */
985 2 : []*flushableEntry{entry},
986 2 : d.timeNow(),
987 2 : )
988 2 : if err != nil {
989 0 : return nil, 0, err
990 0 : }
991 2 : for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
992 2 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata})
993 2 : }
994 : }
995 2 : return toFlush, maxSeqNum, nil
996 : }
997 : }
998 :
999 2 : if b.memTableSize >= uint64(d.largeBatchThreshold) {
1000 1 : flushMem()
1001 1 : // Make a copy of the data slice since it is currently owned by buf and will
1002 1 : // be reused in the next iteration.
1003 1 : b.data = slices.Clone(b.data)
1004 1 : b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
1005 1 : if err != nil {
1006 0 : return nil, 0, err
1007 0 : }
1008 1 : entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
1009 1 : // Disable memory accounting by adding a reader ref that will never be
1010 1 : // removed.
1011 1 : entry.readerRefs.Add(1)
1012 1 : if d.opts.ReadOnly {
1013 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1014 1 : // We added the flushable batch to the flushable to the queue.
1015 1 : // But there must be at least one WAL entry waiting to be
1016 1 : // replayed. We have to ensure this newer WAL entry isn't
1017 1 : // replayed into the current value of d.mu.mem.mutable because
1018 1 : // the current mutable memtable exists before this flushable
1019 1 : // entry in the memtable queue. To ensure this, we just need to
1020 1 : // unset d.mu.mem.mutable. When a newer WAL is replayed, we will
1021 1 : // set d.mu.mem.mutable to a newer value.
1022 1 : d.mu.mem.mutable = nil
1023 1 : } else {
1024 1 : toFlush = append(toFlush, entry)
1025 1 : }
1026 2 : } else {
1027 2 : ensureMem(seqNum)
1028 2 : if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
1029 0 : return nil, 0, err
1030 0 : }
1031 : // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
1032 : // batch may not initially fit, but will eventually fit (since it is smaller than
1033 : // largeBatchThreshold).
1034 2 : for err == arenaskl.ErrArenaFull {
1035 2 : flushMem()
1036 2 : ensureMem(seqNum)
1037 2 : err = mem.prepare(&b)
1038 2 : if err != nil && err != arenaskl.ErrArenaFull {
1039 0 : return nil, 0, err
1040 0 : }
1041 : }
1042 2 : if err = mem.apply(&b, seqNum); err != nil {
1043 0 : return nil, 0, err
1044 0 : }
1045 2 : mem.writerUnref()
1046 : }
1047 2 : buf.Reset()
1048 : }
1049 :
1050 2 : d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %d; replayed %d keys in %d batches",
1051 2 : jobID, base.DiskFileNum(ll.Num).String(), offset, keysReplayed, batchesReplayed)
1052 2 : flushMem()
1053 2 :
1054 2 : // mem is nil here.
1055 2 : if !d.opts.ReadOnly && batchesReplayed > 0 {
1056 2 : err = updateVE()
1057 2 : if err != nil {
1058 0 : return nil, 0, err
1059 0 : }
1060 : }
1061 2 : return toFlush, maxSeqNum, err
1062 : }
1063 :
1064 2 : func readOptionsFile(opts *Options, path string) (string, error) {
1065 2 : f, err := opts.FS.Open(path)
1066 2 : if err != nil {
1067 0 : return "", err
1068 0 : }
1069 2 : defer f.Close()
1070 2 :
1071 2 : data, err := io.ReadAll(f)
1072 2 : if err != nil {
1073 0 : return "", err
1074 0 : }
1075 2 : return string(data), nil
1076 : }
1077 :
1078 : // DBDesc briefly describes high-level state about a database.
1079 : type DBDesc struct {
1080 : // Exists is true if an existing database was found.
1081 : Exists bool
1082 : // FormatMajorVersion indicates the database's current format
1083 : // version.
1084 : FormatMajorVersion FormatMajorVersion
1085 : // ManifestFilename is the filename of the current active manifest,
1086 : // if the database exists.
1087 : ManifestFilename string
1088 : // OptionsFilename is the filename of the most recent OPTIONS file, if it
1089 : // exists.
1090 : OptionsFilename string
1091 : }
1092 :
1093 : // String implements fmt.Stringer.
1094 1 : func (d *DBDesc) String() string {
1095 1 : if !d.Exists {
1096 1 : return "uninitialized"
1097 1 : }
1098 1 : var buf bytes.Buffer
1099 1 : fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
1100 1 : fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
1101 1 : fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
1102 1 : return buf.String()
1103 : }
1104 :
1105 : // Peek looks for an existing database in dirname on the provided FS. It
1106 : // returns a brief description of the database. Peek is read-only and
1107 : // does not open the database
1108 1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
1109 1 : ls, err := fs.List(dirname)
1110 1 : if err != nil {
1111 1 : return nil, err
1112 1 : }
1113 :
1114 1 : vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
1115 1 : if err != nil {
1116 0 : return nil, err
1117 0 : }
1118 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1119 : // PeekMarker variant that avoids opening the directory.
1120 1 : if err := versMarker.Close(); err != nil {
1121 0 : return nil, err
1122 0 : }
1123 :
1124 : // Find the currently active manifest, if there is one.
1125 1 : manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
1126 1 : if err != nil {
1127 0 : return nil, err
1128 0 : }
1129 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1130 : // PeekMarker variant that avoids opening the directory.
1131 1 : if err := manifestMarker.Close(); err != nil {
1132 0 : return nil, err
1133 0 : }
1134 :
1135 1 : desc := &DBDesc{
1136 1 : Exists: exists,
1137 1 : FormatMajorVersion: vers,
1138 1 : }
1139 1 :
1140 1 : // Find the OPTIONS file with the highest file number within the list of
1141 1 : // directory entries.
1142 1 : var previousOptionsFileNum base.DiskFileNum
1143 1 : for _, filename := range ls {
1144 1 : ft, fn, ok := base.ParseFilename(fs, filename)
1145 1 : if !ok || ft != fileTypeOptions || fn < previousOptionsFileNum {
1146 1 : continue
1147 : }
1148 1 : previousOptionsFileNum = fn
1149 1 : desc.OptionsFilename = fs.PathJoin(dirname, filename)
1150 : }
1151 :
1152 1 : if exists {
1153 1 : desc.ManifestFilename = base.MakeFilepath(fs, dirname, fileTypeManifest, manifestFileNum)
1154 1 : }
1155 1 : return desc, nil
1156 : }
1157 :
1158 : // LockDirectory acquires the database directory lock in the named directory,
1159 : // preventing another process from opening the database. LockDirectory returns a
1160 : // handle to the held lock that may be passed to Open through Options.Lock to
1161 : // subsequently open the database, skipping lock acquistion during Open.
1162 : //
1163 : // LockDirectory may be used to expand the critical section protected by the
1164 : // database lock to include setup before the call to Open.
1165 2 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
1166 2 : fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, fileTypeLock, base.DiskFileNum(0)))
1167 2 : if err != nil {
1168 1 : return nil, err
1169 1 : }
1170 2 : l := &Lock{dirname: dirname, fileLock: fileLock}
1171 2 : l.refs.Store(1)
1172 2 : invariants.SetFinalizer(l, func(obj interface{}) {
1173 2 : if refs := obj.(*Lock).refs.Load(); refs > 0 {
1174 0 : panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
1175 : }
1176 : })
1177 2 : return l, nil
1178 : }
1179 :
1180 : // Lock represents a file lock on a directory. It may be passed to Open through
1181 : // Options.Lock to elide lock aquisition during Open.
1182 : type Lock struct {
1183 : dirname string
1184 : fileLock io.Closer
1185 : // refs is a count of the number of handles on the lock. refs must be 0, 1
1186 : // or 2.
1187 : //
1188 : // When acquired by the client and passed to Open, refs = 1 and the Open
1189 : // call increments it to 2. When the database is closed, it's decremented to
1190 : // 1. Finally when the original caller, calls Close on the Lock, it's
1191 : // drecemented to zero and the underlying file lock is released.
1192 : //
1193 : // When Open acquires the file lock, refs remains at 1 until the database is
1194 : // closed.
1195 : refs atomic.Int32
1196 : }
1197 :
1198 1 : func (l *Lock) refForOpen() error {
1199 1 : // During Open, when a user passed in a lock, the reference count must be
1200 1 : // exactly 1. If it's zero, the lock is no longer held and is invalid. If
1201 1 : // it's 2, the lock is already in use by another database within the
1202 1 : // process.
1203 1 : if !l.refs.CompareAndSwap(1, 2) {
1204 1 : return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
1205 1 : }
1206 1 : return nil
1207 : }
1208 :
1209 : // Close releases the lock, permitting another process to lock and open the
1210 : // database. Close must not be called until after a database using the Lock has
1211 : // been closed.
1212 2 : func (l *Lock) Close() error {
1213 2 : if l.refs.Add(-1) > 0 {
1214 1 : return nil
1215 1 : }
1216 2 : defer func() { l.fileLock = nil }()
1217 2 : return l.fileLock.Close()
1218 : }
1219 :
1220 1 : func (l *Lock) pathMatches(dirname string) error {
1221 1 : if dirname == l.dirname {
1222 1 : return nil
1223 1 : }
1224 : // Check for relative paths, symlinks, etc. This isn't ideal because we're
1225 : // circumventing the vfs.FS interface here.
1226 : //
1227 : // TODO(jackson): We could add support for retrieving file inodes through Stat
1228 : // calls in the VFS interface on platforms where it's available and use that
1229 : // to differentiate.
1230 1 : dirStat, err1 := os.Stat(dirname)
1231 1 : lockDirStat, err2 := os.Stat(l.dirname)
1232 1 : if err1 == nil && err2 == nil && os.SameFile(dirStat, lockDirStat) {
1233 1 : return nil
1234 1 : }
1235 0 : return errors.Join(
1236 0 : errors.Newf("pebble: opts.Lock acquired in %q not %q", l.dirname, dirname),
1237 0 : err1, err2)
1238 : }
1239 :
1240 : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
1241 : // does not exist.
1242 : //
1243 : // Note that errors can be wrapped with more details; use errors.Is().
1244 : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
1245 :
1246 : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
1247 : // already exists.
1248 : //
1249 : // Note that errors can be wrapped with more details; use errors.Is().
1250 : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
1251 :
1252 : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
1253 : // already exists and is not pristine.
1254 : //
1255 : // Note that errors can be wrapped with more details; use errors.Is().
1256 : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
1257 :
1258 : // IsCorruptionError returns true if the given error indicates database
1259 : // corruption.
1260 1 : func IsCorruptionError(err error) bool {
1261 1 : return errors.Is(err, base.ErrCorruption)
1262 1 : }
1263 :
1264 2 : func checkConsistency(v *manifest.Version, dirname string, objProvider objstorage.Provider) error {
1265 2 : var errs []error
1266 2 : dedup := make(map[base.DiskFileNum]struct{})
1267 2 : for level, files := range v.Levels {
1268 2 : iter := files.Iter()
1269 2 : for f := iter.First(); f != nil; f = iter.Next() {
1270 2 : backingState := f.FileBacking
1271 2 : if _, ok := dedup[backingState.DiskFileNum]; ok {
1272 2 : continue
1273 : }
1274 2 : dedup[backingState.DiskFileNum] = struct{}{}
1275 2 : fileNum := backingState.DiskFileNum
1276 2 : fileSize := backingState.Size
1277 2 : // We skip over remote objects; those are instead checked asynchronously
1278 2 : // by the table stats loading job.
1279 2 : meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
1280 2 : var size int64
1281 2 : if err == nil {
1282 2 : if meta.IsRemote() {
1283 2 : continue
1284 : }
1285 2 : size, err = objProvider.Size(meta)
1286 : }
1287 2 : if err != nil {
1288 1 : errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
1289 1 : continue
1290 : }
1291 :
1292 2 : if size != int64(fileSize) {
1293 0 : errs = append(errs, errors.Errorf(
1294 0 : "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
1295 0 : errors.Safe(level), fileNum, objProvider.Path(meta),
1296 0 : errors.Safe(size), errors.Safe(fileSize)))
1297 0 : continue
1298 : }
1299 : }
1300 : }
1301 2 : return errors.Join(errs...)
1302 : }
1303 :
1304 : type walEventListenerAdaptor struct {
1305 : l *EventListener
1306 : }
1307 :
1308 2 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
1309 2 : // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
1310 2 : // is insufficient to infer whether primary or secondary.
1311 2 : wci := WALCreateInfo{
1312 2 : JobID: ci.JobID,
1313 2 : Path: ci.Path,
1314 2 : FileNum: base.DiskFileNum(ci.Num),
1315 2 : RecycledFileNum: ci.RecycledFileNum,
1316 2 : Err: ci.Err,
1317 2 : }
1318 2 : l.l.WALCreated(wci)
1319 2 : }
|