Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "math"
14 : "os"
15 : "slices"
16 : "sync/atomic"
17 : "time"
18 :
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/errors/oserror"
21 : "github.com/cockroachdb/pebble/batchrepr"
22 : "github.com/cockroachdb/pebble/internal/arenaskl"
23 : "github.com/cockroachdb/pebble/internal/base"
24 : "github.com/cockroachdb/pebble/internal/cache"
25 : "github.com/cockroachdb/pebble/internal/constants"
26 : "github.com/cockroachdb/pebble/internal/invariants"
27 : "github.com/cockroachdb/pebble/internal/manifest"
28 : "github.com/cockroachdb/pebble/internal/manual"
29 : "github.com/cockroachdb/pebble/objstorage"
30 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
31 : "github.com/cockroachdb/pebble/objstorage/remote"
32 : "github.com/cockroachdb/pebble/record"
33 : "github.com/cockroachdb/pebble/sstable"
34 : "github.com/cockroachdb/pebble/vfs"
35 : "github.com/cockroachdb/pebble/wal"
36 : "github.com/prometheus/client_golang/prometheus"
37 : )
38 :
39 : const (
40 : initialMemTableSize = 256 << 10 // 256 KB
41 :
42 : // The max batch size is limited by the uint32 offsets stored in
43 : // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
44 : //
45 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
46 : // end of an allocation fits in uint32.
47 : //
48 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
49 : // 2GB).
50 : maxBatchSize = constants.MaxUint32OrInt
51 :
52 : // The max memtable size is limited by the uint32 offsets stored in
53 : // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
54 : //
55 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
56 : // end of an allocation fits in uint32.
57 : //
58 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
59 : // 2GB).
60 : maxMemTableSize = constants.MaxUint32OrInt
61 : )
62 :
63 : // TableCacheSize can be used to determine the table
64 : // cache size for a single db, given the maximum open
65 : // files which can be used by a table cache which is
66 : // only used by a single db.
67 2 : func TableCacheSize(maxOpenFiles int) int {
68 2 : tableCacheSize := maxOpenFiles - numNonTableCacheFiles
69 2 : if tableCacheSize < minTableCacheSize {
70 1 : tableCacheSize = minTableCacheSize
71 1 : }
72 2 : return tableCacheSize
73 : }
74 :
75 : // Open opens a DB whose files live in the given directory.
76 2 : func Open(dirname string, opts *Options) (db *DB, err error) {
77 2 : // Make a copy of the options so that we don't mutate the passed in options.
78 2 : opts = opts.Clone()
79 2 : opts = opts.EnsureDefaults()
80 2 : if err := opts.Validate(); err != nil {
81 0 : return nil, err
82 0 : }
83 2 : if opts.LoggerAndTracer == nil {
84 2 : opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
85 2 : } else {
86 1 : opts.Logger = opts.LoggerAndTracer
87 1 : }
88 :
89 2 : if invariants.Sometimes(5) {
90 2 : assertComparer := base.MakeAssertComparer(*opts.Comparer)
91 2 : opts.Comparer = &assertComparer
92 2 : }
93 :
94 : // In all error cases, we return db = nil; this is used by various
95 : // deferred cleanups.
96 :
97 : // Open the database and WAL directories first.
98 2 : walDirname, dataDir, err := prepareAndOpenDirs(dirname, opts)
99 2 : if err != nil {
100 1 : return nil, errors.Wrapf(err, "error opening database at %q", dirname)
101 1 : }
102 2 : defer func() {
103 2 : if db == nil {
104 1 : dataDir.Close()
105 1 : }
106 : }()
107 :
108 : // Lock the database directory.
109 2 : var fileLock *Lock
110 2 : if opts.Lock != nil {
111 1 : // The caller already acquired the database lock. Ensure that the
112 1 : // directory matches.
113 1 : if err := opts.Lock.pathMatches(dirname); err != nil {
114 0 : return nil, err
115 0 : }
116 1 : if err := opts.Lock.refForOpen(); err != nil {
117 1 : return nil, err
118 1 : }
119 1 : fileLock = opts.Lock
120 2 : } else {
121 2 : fileLock, err = LockDirectory(dirname, opts.FS)
122 2 : if err != nil {
123 1 : return nil, err
124 1 : }
125 : }
126 2 : defer func() {
127 2 : if db == nil {
128 1 : fileLock.Close()
129 1 : }
130 : }()
131 :
132 : // List the directory contents. This also happens to include WAL log files, if
133 : // they are in the same dir, but we will ignore those below. The provider is
134 : // also given this list, but it ignores non sstable files.
135 2 : ls, err := opts.FS.List(dirname)
136 2 : if err != nil {
137 1 : return nil, err
138 1 : }
139 :
140 : // Establish the format major version.
141 2 : formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
142 2 : if err != nil {
143 1 : return nil, err
144 1 : }
145 2 : defer func() {
146 2 : if db == nil {
147 1 : formatVersionMarker.Close()
148 1 : }
149 : }()
150 :
151 2 : noFormatVersionMarker := formatVersion == FormatDefault
152 2 : if noFormatVersionMarker {
153 2 : // We will initialize the store at the minimum possible format, then upgrade
154 2 : // the format to the desired one. This helps test the format upgrade code.
155 2 : formatVersion = FormatMinSupported
156 2 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
157 2 : formatVersion = FormatMinForSharedObjects
158 2 : }
159 : // There is no format version marker file. There are three cases:
160 : // - we are trying to open an existing store that was created at
161 : // FormatMostCompatible (the only one without a version marker file)
162 : // - we are creating a new store;
163 : // - we are retrying a failed creation.
164 : //
165 : // To error in the first case, we set ErrorIfNotPristine.
166 2 : opts.ErrorIfNotPristine = true
167 2 : defer func() {
168 2 : if err != nil && errors.Is(err, ErrDBNotPristine) {
169 0 : // We must be trying to open an existing store at FormatMostCompatible.
170 0 : // Correct the error in this case -we
171 0 : err = errors.Newf(
172 0 : "pebble: database %q written in format major version 1 which is no longer supported",
173 0 : dirname)
174 0 : }
175 : }()
176 2 : } else {
177 2 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone && formatVersion < FormatMinForSharedObjects {
178 0 : return nil, errors.Newf(
179 0 : "pebble: database %q configured with shared objects but written in too old format major version %d",
180 0 : dirname, formatVersion)
181 0 : }
182 : }
183 :
184 : // Find the currently active manifest, if there is one.
185 2 : manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
186 2 : if err != nil {
187 1 : return nil, errors.Wrapf(err, "pebble: database %q", dirname)
188 1 : }
189 2 : defer func() {
190 2 : if db == nil {
191 1 : manifestMarker.Close()
192 1 : }
193 : }()
194 :
195 : // Atomic markers may leave behind obsolete files if there's a crash
196 : // mid-update. Clean these up if we're not in read-only mode.
197 2 : if !opts.ReadOnly {
198 2 : if err := formatVersionMarker.RemoveObsolete(); err != nil {
199 0 : return nil, err
200 0 : }
201 2 : if err := manifestMarker.RemoveObsolete(); err != nil {
202 0 : return nil, err
203 0 : }
204 : }
205 :
206 2 : if opts.Cache == nil {
207 1 : opts.Cache = cache.New(cacheDefaultSize)
208 2 : } else {
209 2 : opts.Cache.Ref()
210 2 : }
211 :
212 2 : d := &DB{
213 2 : cacheID: opts.Cache.NewID(),
214 2 : dirname: dirname,
215 2 : opts: opts,
216 2 : cmp: opts.Comparer.Compare,
217 2 : equal: opts.Comparer.Equal,
218 2 : merge: opts.Merger.Merge,
219 2 : split: opts.Comparer.Split,
220 2 : abbreviatedKey: opts.Comparer.AbbreviatedKey,
221 2 : largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
222 2 : fileLock: fileLock,
223 2 : dataDir: dataDir,
224 2 : closed: new(atomic.Value),
225 2 : closedCh: make(chan struct{}),
226 2 : }
227 2 : d.mu.versions = &versionSet{}
228 2 : d.diskAvailBytes.Store(math.MaxUint64)
229 2 :
230 2 : defer func() {
231 2 : // If an error or panic occurs during open, attempt to release the manually
232 2 : // allocated memory resources. Note that rather than look for an error, we
233 2 : // look for the return of a nil DB pointer.
234 2 : if r := recover(); db == nil {
235 1 : // If there's an unused, recycled memtable, we need to release its memory.
236 1 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
237 1 : d.freeMemTable(obsoleteMemTable)
238 1 : }
239 :
240 : // Release our references to the Cache. Note that both the DB, and
241 : // tableCache have a reference. When we release the reference to
242 : // the tableCache, and if there are no other references to
243 : // the tableCache, then the tableCache will also release its
244 : // reference to the cache.
245 1 : opts.Cache.Unref()
246 1 :
247 1 : if d.tableCache != nil {
248 1 : _ = d.tableCache.close()
249 1 : }
250 :
251 1 : for _, mem := range d.mu.mem.queue {
252 1 : switch t := mem.flushable.(type) {
253 1 : case *memTable:
254 1 : manual.Free(manual.MemTable, t.arenaBuf)
255 1 : t.arenaBuf = nil
256 : }
257 : }
258 1 : if d.cleanupManager != nil {
259 1 : d.cleanupManager.Close()
260 1 : }
261 1 : if d.objProvider != nil {
262 1 : d.objProvider.Close()
263 1 : }
264 1 : if r != nil {
265 1 : panic(r)
266 : }
267 : }
268 : }()
269 :
270 2 : d.commit = newCommitPipeline(commitEnv{
271 2 : logSeqNum: &d.mu.versions.logSeqNum,
272 2 : visibleSeqNum: &d.mu.versions.visibleSeqNum,
273 2 : apply: d.commitApply,
274 2 : write: d.commitWrite,
275 2 : })
276 2 : d.mu.nextJobID = 1
277 2 : d.mu.mem.nextSize = opts.MemTableSize
278 2 : if d.mu.mem.nextSize > initialMemTableSize {
279 2 : d.mu.mem.nextSize = initialMemTableSize
280 2 : }
281 2 : d.mu.compact.cond.L = &d.mu.Mutex
282 2 : d.mu.compact.inProgress = make(map[*compaction]struct{})
283 2 : d.mu.compact.noOngoingFlushStartTime = time.Now()
284 2 : d.mu.snapshots.init()
285 2 : // logSeqNum is the next sequence number that will be assigned.
286 2 : // Start assigning sequence numbers from base.SeqNumStart to leave
287 2 : // room for reserved sequence numbers (see comments around
288 2 : // SeqNumStart).
289 2 : d.mu.versions.logSeqNum.Store(base.SeqNumStart)
290 2 : d.mu.formatVers.vers.Store(uint64(formatVersion))
291 2 : d.mu.formatVers.marker = formatVersionMarker
292 2 :
293 2 : d.timeNow = time.Now
294 2 : d.openedAt = d.timeNow()
295 2 :
296 2 : d.mu.Lock()
297 2 : defer d.mu.Unlock()
298 2 :
299 2 : jobID := d.newJobIDLocked()
300 2 :
301 2 : providerSettings := objstorageprovider.Settings{
302 2 : Logger: opts.Logger,
303 2 : FS: opts.FS,
304 2 : FSDirName: dirname,
305 2 : FSDirInitialListing: ls,
306 2 : FSCleaner: opts.Cleaner,
307 2 : NoSyncOnClose: opts.NoSyncOnClose,
308 2 : BytesPerSync: opts.BytesPerSync,
309 2 : }
310 2 : providerSettings.Local.ReadaheadConfig = opts.Local.ReadaheadConfig
311 2 : providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
312 2 : providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
313 2 : providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
314 2 : providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
315 2 :
316 2 : d.objProvider, err = objstorageprovider.Open(providerSettings)
317 2 : if err != nil {
318 1 : return nil, err
319 1 : }
320 :
321 2 : if !manifestExists {
322 2 : // DB does not exist.
323 2 : if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
324 1 : return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
325 1 : }
326 :
327 : // Create the DB.
328 2 : if err := d.mu.versions.create(
329 2 : jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
330 1 : return nil, err
331 1 : }
332 2 : } else {
333 2 : if opts.ErrorIfExists {
334 1 : return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
335 1 : }
336 : // Load the version set.
337 2 : if err := d.mu.versions.load(
338 2 : dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
339 1 : return nil, err
340 1 : }
341 2 : if opts.ErrorIfNotPristine {
342 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
343 1 : d.mu.versions.addLiveFileNums(liveFileNums)
344 1 : if len(liveFileNums) != 0 {
345 1 : return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
346 1 : }
347 : }
348 : }
349 :
350 : // In read-only mode, we replay directly into the mutable memtable but never
351 : // flush it. We need to delay creation of the memtable until we know the
352 : // sequence number of the first batch that will be inserted.
353 2 : if !d.opts.ReadOnly {
354 2 : var entry *flushableEntry
355 2 : d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
356 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
357 2 : }
358 :
359 2 : d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
360 2 : Buckets: FsyncLatencyBuckets,
361 2 : })
362 2 : walOpts := wal.Options{
363 2 : Primary: wal.Dir{FS: opts.FS, Dirname: walDirname},
364 2 : Secondary: wal.Dir{},
365 2 : MinUnflushedWALNum: wal.NumWAL(d.mu.versions.minUnflushedLogNum),
366 2 : MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
367 2 : NoSyncOnClose: opts.NoSyncOnClose,
368 2 : BytesPerSync: opts.WALBytesPerSync,
369 2 : PreallocateSize: d.walPreallocateSize,
370 2 : MinSyncInterval: opts.WALMinSyncInterval,
371 2 : FsyncLatency: d.mu.log.metrics.fsyncLatency,
372 2 : QueueSemChan: d.commit.logSyncQSem,
373 2 : Logger: opts.Logger,
374 2 : EventListener: walEventListenerAdaptor{l: opts.EventListener},
375 2 : }
376 2 : if opts.WALFailover != nil {
377 2 : walOpts.Secondary = opts.WALFailover.Secondary
378 2 : walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
379 2 : walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
380 2 : Buckets: FsyncLatencyBuckets,
381 2 : })
382 2 : }
383 2 : walDirs := append(walOpts.Dirs(), opts.WALRecoveryDirs...)
384 2 : wals, err := wal.Scan(walDirs...)
385 2 : if err != nil {
386 1 : return nil, err
387 1 : }
388 2 : walManager, err := wal.Init(walOpts, wals)
389 2 : if err != nil {
390 1 : return nil, err
391 1 : }
392 2 : defer func() {
393 2 : if db == nil {
394 1 : walManager.Close()
395 1 : }
396 : }()
397 :
398 2 : d.mu.log.manager = walManager
399 2 :
400 2 : d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
401 2 :
402 2 : if manifestExists && !opts.DisableConsistencyCheck {
403 2 : curVersion := d.mu.versions.currentVersion()
404 2 : if err := checkConsistency(curVersion, dirname, d.objProvider); err != nil {
405 1 : return nil, err
406 1 : }
407 : }
408 :
409 2 : tableCacheSize := TableCacheSize(opts.MaxOpenFiles)
410 2 : d.tableCache = newTableCacheContainer(
411 2 : opts.TableCache, d.cacheID, d.objProvider, d.opts, tableCacheSize,
412 2 : &sstable.CategoryStatsCollector{})
413 2 : d.newIters = d.tableCache.newIters
414 2 : d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
415 2 :
416 2 : d.mu.annotators.totalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
417 1 : return true
418 1 : })
419 2 : d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
420 1 : meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
421 1 : if err != nil {
422 0 : return false
423 0 : }
424 1 : return meta.IsRemote()
425 : })
426 2 : d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
427 1 : meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
428 1 : if err != nil {
429 0 : return false
430 0 : }
431 1 : return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
432 : })
433 :
434 2 : var previousOptionsFileNum base.DiskFileNum
435 2 : var previousOptionsFilename string
436 2 : for _, filename := range ls {
437 2 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
438 2 : if !ok {
439 2 : continue
440 : }
441 :
442 : // Don't reuse any obsolete file numbers to avoid modifying an
443 : // ingested sstable's original external file.
444 2 : d.mu.versions.markFileNumUsed(fn)
445 2 :
446 2 : switch ft {
447 0 : case fileTypeLog:
448 : // Ignore.
449 2 : case fileTypeOptions:
450 2 : if previousOptionsFileNum < fn {
451 2 : previousOptionsFileNum = fn
452 2 : previousOptionsFilename = filename
453 2 : }
454 1 : case fileTypeTemp, fileTypeOldTemp:
455 1 : if !d.opts.ReadOnly {
456 1 : // Some codepaths write to a temporary file and then
457 1 : // rename it to its final location when complete. A
458 1 : // temp file is leftover if a process exits before the
459 1 : // rename. Remove it.
460 1 : err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
461 1 : if err != nil {
462 0 : return nil, err
463 0 : }
464 : }
465 : }
466 : }
467 2 : if n := len(wals); n > 0 {
468 2 : // Don't reuse any obsolete file numbers to avoid modifying an
469 2 : // ingested sstable's original external file.
470 2 : d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
471 2 : }
472 :
473 : // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
474 : // objProvider. This avoids FileNum collisions with obsolete sstables.
475 2 : objects := d.objProvider.List()
476 2 : for _, obj := range objects {
477 2 : d.mu.versions.markFileNumUsed(obj.DiskFileNum)
478 2 : }
479 :
480 : // Validate the most-recent OPTIONS file, if there is one.
481 2 : if previousOptionsFilename != "" {
482 2 : path := opts.FS.PathJoin(dirname, previousOptionsFilename)
483 2 : previousOptions, err := readOptionsFile(opts, path)
484 2 : if err != nil {
485 0 : return nil, err
486 0 : }
487 2 : if err := opts.CheckCompatibility(previousOptions); err != nil {
488 1 : return nil, err
489 1 : }
490 : }
491 :
492 : // Replay any newer log files than the ones named in the manifest.
493 2 : var replayWALs wal.Logs
494 2 : for i, w := range wals {
495 2 : if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
496 2 : replayWALs = wals[i:]
497 2 : break
498 : }
499 : }
500 2 : var flushableIngests []*ingestedFlushable
501 2 : for i, lf := range replayWALs {
502 2 : // WALs other than the last one would have been closed cleanly.
503 2 : //
504 2 : // Note: we used to never require strict WAL tails when reading from older
505 2 : // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
506 2 : // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
507 2 : // compatible Pebble format is newer and guarantees a clean EOF.
508 2 : strictWALTail := i < len(replayWALs)-1
509 2 : fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
510 2 : if err != nil {
511 1 : return nil, err
512 1 : }
513 2 : if len(fi) > 0 {
514 2 : flushableIngests = append(flushableIngests, fi...)
515 2 : }
516 2 : if d.mu.versions.logSeqNum.Load() < maxSeqNum {
517 2 : d.mu.versions.logSeqNum.Store(maxSeqNum)
518 2 : }
519 : }
520 2 : if d.mu.mem.mutable == nil {
521 2 : // Recreate the mutable memtable if replayWAL got rid of it.
522 2 : var entry *flushableEntry
523 2 : d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
524 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
525 2 : }
526 2 : d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
527 2 :
528 2 : if !d.opts.ReadOnly {
529 2 : d.maybeScheduleFlush()
530 2 : for d.mu.compact.flushing {
531 2 : d.mu.compact.cond.Wait()
532 2 : }
533 :
534 : // Create an empty .log file for the mutable memtable.
535 2 : newLogNum := d.mu.versions.getNextDiskFileNum()
536 2 : d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
537 2 : if err != nil {
538 1 : return nil, err
539 1 : }
540 :
541 : // This isn't strictly necessary as we don't use the log number for
542 : // memtables being flushed, only for the next unflushed memtable.
543 2 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
544 : }
545 2 : d.updateReadStateLocked(d.opts.DebugCheck)
546 2 :
547 2 : if !d.opts.ReadOnly {
548 2 : // If the Options specify a format major version higher than the
549 2 : // loaded database's, upgrade it. If this is a new database, this
550 2 : // code path also performs an initial upgrade from the starting
551 2 : // implicit MinSupported version.
552 2 : //
553 2 : // We ratchet the version this far into Open so that migrations have a read
554 2 : // state available. Note that this also results in creating/updating the
555 2 : // format version marker file.
556 2 : if opts.FormatMajorVersion > d.FormatMajorVersion() {
557 2 : if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
558 0 : return nil, err
559 0 : }
560 2 : } else if noFormatVersionMarker {
561 2 : // We are creating a new store. Create the format version marker file.
562 2 : if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
563 1 : return nil, err
564 1 : }
565 : }
566 :
567 : // Write the current options to disk.
568 2 : d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
569 2 : tmpPath := base.MakeFilepath(opts.FS, dirname, fileTypeTemp, d.optionsFileNum)
570 2 : optionsPath := base.MakeFilepath(opts.FS, dirname, fileTypeOptions, d.optionsFileNum)
571 2 :
572 2 : // Write them to a temporary file first, in case we crash before
573 2 : // we're done. A corrupt options file prevents opening the
574 2 : // database.
575 2 : optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
576 2 : if err != nil {
577 1 : return nil, err
578 1 : }
579 2 : serializedOpts := []byte(opts.String())
580 2 : if _, err := optionsFile.Write(serializedOpts); err != nil {
581 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
582 1 : }
583 2 : d.optionsFileSize = uint64(len(serializedOpts))
584 2 : if err := optionsFile.Sync(); err != nil {
585 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
586 1 : }
587 2 : if err := optionsFile.Close(); err != nil {
588 0 : return nil, err
589 0 : }
590 : // Atomically rename to the OPTIONS-XXXXXX path. This rename is
591 : // guaranteed to be atomic because the destination path does not
592 : // exist.
593 2 : if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
594 1 : return nil, err
595 1 : }
596 2 : if err := d.dataDir.Sync(); err != nil {
597 1 : return nil, err
598 1 : }
599 : }
600 :
601 2 : if !d.opts.ReadOnly {
602 2 : // Get a fresh list of files, in case some of the earlier flushes/compactions
603 2 : // have deleted some files.
604 2 : ls, err := opts.FS.List(dirname)
605 2 : if err != nil {
606 1 : return nil, err
607 1 : }
608 2 : d.scanObsoleteFiles(ls, flushableIngests)
609 2 : d.deleteObsoleteFiles(jobID)
610 : }
611 : // Else, nothing is obsolete.
612 :
613 2 : d.mu.tableStats.cond.L = &d.mu.Mutex
614 2 : d.mu.tableValidation.cond.L = &d.mu.Mutex
615 2 : if !d.opts.ReadOnly {
616 2 : d.maybeCollectTableStatsLocked()
617 2 : }
618 2 : d.calculateDiskAvailableBytes()
619 2 :
620 2 : d.maybeScheduleFlush()
621 2 : d.maybeScheduleCompaction()
622 2 :
623 2 : // Note: this is a no-op if invariants are disabled or race is enabled.
624 2 : //
625 2 : // Setting a finalizer on *DB causes *DB to never be reclaimed and the
626 2 : // finalizer to never be run. The problem is due to this limitation of
627 2 : // finalizers mention in the SetFinalizer docs:
628 2 : //
629 2 : // If a cyclic structure includes a block with a finalizer, that cycle is
630 2 : // not guaranteed to be garbage collected and the finalizer is not
631 2 : // guaranteed to run, because there is no ordering that respects the
632 2 : // dependencies.
633 2 : //
634 2 : // DB has cycles with several of its internal structures: readState,
635 2 : // newIters, tableCache, versions, etc. Each of this individually cause a
636 2 : // cycle and prevent the finalizer from being run. But we can workaround this
637 2 : // finializer limitation by setting a finalizer on another object that is
638 2 : // tied to the lifetime of DB: the DB.closed atomic.Value.
639 2 : dPtr := fmt.Sprintf("%p", d)
640 2 : invariants.SetFinalizer(d.closed, func(obj interface{}) {
641 0 : v := obj.(*atomic.Value)
642 0 : if err := v.Load(); err == nil {
643 0 : fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
644 0 : os.Exit(1)
645 0 : }
646 : })
647 :
648 2 : return d, nil
649 : }
650 :
651 : // prepareAndOpenDirs opens the directories for the store (and creates them if
652 : // necessary).
653 : //
654 : // Returns an error if ReadOnly is set and the directories don't exist.
655 : func prepareAndOpenDirs(
656 : dirname string, opts *Options,
657 2 : ) (walDirname string, dataDir vfs.File, err error) {
658 2 : walDirname = opts.WALDir
659 2 : if opts.WALDir == "" {
660 2 : walDirname = dirname
661 2 : }
662 :
663 : // Create directories if needed.
664 2 : if !opts.ReadOnly {
665 2 : f, err := mkdirAllAndSyncParents(opts.FS, dirname)
666 2 : if err != nil {
667 1 : return "", nil, err
668 1 : }
669 2 : f.Close()
670 2 : if walDirname != dirname {
671 2 : f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
672 2 : if err != nil {
673 0 : return "", nil, err
674 0 : }
675 2 : f.Close()
676 : }
677 2 : if opts.WALFailover != nil {
678 2 : secondary := opts.WALFailover.Secondary
679 2 : f, err := mkdirAllAndSyncParents(secondary.FS, secondary.Dirname)
680 2 : if err != nil {
681 0 : return "", nil, err
682 0 : }
683 2 : f.Close()
684 : }
685 : }
686 :
687 2 : dataDir, err = opts.FS.OpenDir(dirname)
688 2 : if err != nil {
689 1 : if opts.ReadOnly && oserror.IsNotExist(err) {
690 1 : return "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
691 1 : }
692 1 : return "", nil, err
693 : }
694 2 : if opts.ReadOnly && walDirname != dirname {
695 1 : // Check that the wal dir exists.
696 1 : walDir, err := opts.FS.OpenDir(walDirname)
697 1 : if err != nil {
698 1 : dataDir.Close()
699 1 : return "", nil, err
700 1 : }
701 1 : walDir.Close()
702 : }
703 :
704 2 : return walDirname, dataDir, nil
705 : }
706 :
707 : // GetVersion returns the engine version string from the latest options
708 : // file present in dir. Used to check what Pebble or RocksDB version was last
709 : // used to write to the database stored in this directory. An empty string is
710 : // returned if no valid OPTIONS file with a version key was found.
711 1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
712 1 : ls, err := fs.List(dir)
713 1 : if err != nil {
714 0 : return "", err
715 0 : }
716 1 : var version string
717 1 : lastOptionsSeen := base.DiskFileNum(0)
718 1 : for _, filename := range ls {
719 1 : ft, fn, ok := base.ParseFilename(fs, filename)
720 1 : if !ok {
721 1 : continue
722 : }
723 1 : switch ft {
724 1 : case fileTypeOptions:
725 1 : // If this file has a higher number than the last options file
726 1 : // processed, reset version. This is because rocksdb often
727 1 : // writes multiple options files without deleting previous ones.
728 1 : // Otherwise, skip parsing this options file.
729 1 : if fn > lastOptionsSeen {
730 1 : version = ""
731 1 : lastOptionsSeen = fn
732 1 : } else {
733 0 : continue
734 : }
735 1 : f, err := fs.Open(fs.PathJoin(dir, filename))
736 1 : if err != nil {
737 0 : return "", err
738 0 : }
739 1 : data, err := io.ReadAll(f)
740 1 : f.Close()
741 1 :
742 1 : if err != nil {
743 0 : return "", err
744 0 : }
745 1 : err = parseOptions(string(data), func(section, key, value string) error {
746 1 : switch {
747 1 : case section == "Version":
748 1 : switch key {
749 1 : case "pebble_version":
750 1 : version = value
751 1 : case "rocksdb_version":
752 1 : version = fmt.Sprintf("rocksdb v%s", value)
753 : }
754 : }
755 1 : return nil
756 : })
757 1 : if err != nil {
758 0 : return "", err
759 0 : }
760 : }
761 : }
762 1 : return version, nil
763 : }
764 :
765 : func (d *DB) replayIngestedFlushable(
766 : b *Batch, logNum base.DiskFileNum,
767 2 : ) (entry *flushableEntry, err error) {
768 2 : br := b.Reader()
769 2 : seqNum := b.SeqNum()
770 2 :
771 2 : fileNums := make([]base.DiskFileNum, 0, b.Count())
772 2 : var exciseSpan KeyRange
773 2 : addFileNum := func(encodedFileNum []byte) {
774 2 : fileNum, n := binary.Uvarint(encodedFileNum)
775 2 : if n <= 0 {
776 0 : panic("pebble: ingest sstable file num is invalid")
777 : }
778 2 : fileNums = append(fileNums, base.DiskFileNum(fileNum))
779 : }
780 :
781 2 : for i := 0; i < int(b.Count()); i++ {
782 2 : kind, key, val, ok, err := br.Next()
783 2 : if err != nil {
784 0 : return nil, err
785 0 : }
786 2 : if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
787 0 : panic("pebble: invalid batch key kind")
788 : }
789 2 : if !ok {
790 0 : panic("pebble: invalid batch count")
791 : }
792 2 : if kind == base.InternalKeyKindExcise {
793 1 : if exciseSpan.Valid() {
794 0 : panic("pebble: multiple excise spans in a single batch")
795 : }
796 1 : exciseSpan.Start = slices.Clone(key)
797 1 : exciseSpan.End = slices.Clone(val)
798 1 : continue
799 : }
800 2 : addFileNum(key)
801 : }
802 :
803 2 : if _, _, _, ok, err := br.Next(); err != nil {
804 0 : return nil, err
805 2 : } else if ok {
806 0 : panic("pebble: invalid number of entries in batch")
807 : }
808 :
809 2 : meta := make([]*fileMetadata, len(fileNums))
810 2 : for i, n := range fileNums {
811 2 : readable, err := d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true})
812 2 : if err != nil {
813 0 : return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
814 0 : }
815 : // NB: ingestLoad1 will close readable.
816 2 : meta[i], err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(), readable, d.cacheID, base.PhysicalTableFileNum(n))
817 2 : if err != nil {
818 0 : return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
819 0 : }
820 : }
821 :
822 2 : numFiles := len(meta)
823 2 : if exciseSpan.Valid() {
824 1 : numFiles++
825 1 : }
826 2 : if uint32(numFiles) != b.Count() {
827 0 : panic("pebble: couldn't load all files in WAL entry")
828 : }
829 :
830 2 : return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
831 : }
832 :
833 : // replayWAL replays the edits in the specified WAL. If the DB is in read
834 : // only mode, then the WALs are replayed into memtables and not flushed. If
835 : // the DB is not in read only mode, then the contents of the WAL are
836 : // guaranteed to be flushed when a flush is scheduled after this method is run.
837 : // Note that this flushing is very important for guaranteeing durability:
838 : // the application may have had a number of pending
839 : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
840 : // happened but the corresponding data may now be readable from the WAL (while
841 : // sitting in write-back caches in the kernel or the storage device). By
842 : // reading the WAL (including the non-fsynced data) and then flushing all
843 : // these changes (flush does fsyncs), we are able to guarantee that the
844 : // initial state of the DB is durable.
845 : //
846 : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
847 : // WALs into the flushable queue. Flushing of the queue is expected to be handled
848 : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
849 : //
850 : // d.mu must be held when calling this, but the mutex may be dropped and
851 : // re-acquired during the course of this method.
852 : func (d *DB) replayWAL(
853 : jobID JobID, ll wal.LogicalLog, strictWALTail bool,
854 2 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
855 2 : rr := ll.OpenForRead()
856 2 : defer rr.Close()
857 2 : var (
858 2 : b Batch
859 2 : buf bytes.Buffer
860 2 : mem *memTable
861 2 : entry *flushableEntry
862 2 : offset wal.Offset
863 2 : lastFlushOffset int64
864 2 : keysReplayed int64 // number of keys replayed
865 2 : batchesReplayed int64 // number of batches replayed
866 2 : )
867 2 :
868 2 : // TODO(jackson): This function is interspersed with panics, in addition to
869 2 : // corruption error propagation. Audit them to ensure we're truly only
870 2 : // panicking where the error points to Pebble bug and not user or
871 2 : // hardware-induced corruption.
872 2 :
873 2 : // "Flushes" (ie. closes off) the current memtable, if not nil.
874 2 : flushMem := func() {
875 2 : if mem == nil {
876 2 : return
877 2 : }
878 2 : mem.writerUnref()
879 2 : if d.mu.mem.mutable == mem {
880 2 : d.mu.mem.mutable = nil
881 2 : }
882 2 : entry.flushForced = !d.opts.ReadOnly
883 2 : var logSize uint64
884 2 : mergedOffset := offset.Physical + offset.PreviousFilesBytes
885 2 : if mergedOffset >= lastFlushOffset {
886 2 : logSize = uint64(mergedOffset - lastFlushOffset)
887 2 : }
888 : // Else, this was the initial memtable in the read-only case which must have
889 : // been empty, but we need to flush it since we don't want to add to it later.
890 2 : lastFlushOffset = mergedOffset
891 2 : entry.logSize = logSize
892 2 : mem, entry = nil, nil
893 : }
894 :
895 2 : mem = d.mu.mem.mutable
896 2 : if mem != nil {
897 2 : entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
898 2 : if !d.opts.ReadOnly {
899 2 : flushMem()
900 2 : }
901 : }
902 :
903 : // Creates a new memtable if there is no current memtable.
904 2 : ensureMem := func(seqNum base.SeqNum) {
905 2 : if mem != nil {
906 2 : return
907 2 : }
908 2 : mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
909 2 : d.mu.mem.mutable = mem
910 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
911 : }
912 :
913 2 : defer func() {
914 2 : if err != nil {
915 1 : err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
916 1 : }
917 : }()
918 :
919 2 : for {
920 2 : var r io.Reader
921 2 : var err error
922 2 : r, offset, err = rr.NextRecord()
923 2 : if err == nil {
924 2 : _, err = io.Copy(&buf, r)
925 2 : }
926 2 : if err != nil {
927 2 : // It is common to encounter a zeroed or invalid chunk due to WAL
928 2 : // preallocation and WAL recycling. We need to distinguish these
929 2 : // errors from EOF in order to recognize that the record was
930 2 : // truncated and to avoid replaying subsequent WALs, but want
931 2 : // to otherwise treat them like EOF.
932 2 : if err == io.EOF {
933 2 : break
934 1 : } else if record.IsInvalidRecord(err) && !strictWALTail {
935 1 : break
936 : }
937 1 : return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
938 : }
939 :
940 2 : if buf.Len() < batchrepr.HeaderLen {
941 0 : return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
942 0 : errors.Safe(base.DiskFileNum(ll.Num)), offset)
943 0 : }
944 :
945 2 : if d.opts.ErrorIfNotPristine {
946 1 : return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
947 1 : }
948 :
949 : // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
950 : // which is used below.
951 2 : b = Batch{}
952 2 : b.db = d
953 2 : b.SetRepr(buf.Bytes())
954 2 : seqNum := b.SeqNum()
955 2 : maxSeqNum = seqNum + base.SeqNum(b.Count())
956 2 : keysReplayed += int64(b.Count())
957 2 : batchesReplayed++
958 2 : {
959 2 : br := b.Reader()
960 2 : if kind, _, _, ok, err := br.Next(); err != nil {
961 0 : return nil, 0, err
962 2 : } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
963 2 : // We're in the flushable ingests (+ possibly excises) case.
964 2 : //
965 2 : // Ingests require an up-to-date view of the LSM to determine the target
966 2 : // level of ingested sstables, and to accurately compute excises. Instead of
967 2 : // doing an ingest in this function, we just enqueue a flushable ingest
968 2 : // in the flushables queue and run a regular flush.
969 2 : flushMem()
970 2 : // mem is nil here.
971 2 : entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
972 2 : if err != nil {
973 0 : return nil, 0, err
974 0 : }
975 2 : fi := entry.flushable.(*ingestedFlushable)
976 2 : flushableIngests = append(flushableIngests, fi)
977 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
978 2 : // A flushable ingest is always followed by a WAL rotation.
979 2 : break
980 : }
981 : }
982 :
983 2 : if b.memTableSize >= uint64(d.largeBatchThreshold) {
984 2 : flushMem()
985 2 : // Make a copy of the data slice since it is currently owned by buf and will
986 2 : // be reused in the next iteration.
987 2 : b.data = slices.Clone(b.data)
988 2 : b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
989 2 : if err != nil {
990 0 : return nil, 0, err
991 0 : }
992 2 : entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
993 2 : // Disable memory accounting by adding a reader ref that will never be
994 2 : // removed.
995 2 : entry.readerRefs.Add(1)
996 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
997 2 : } else {
998 2 : ensureMem(seqNum)
999 2 : if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
1000 0 : return nil, 0, err
1001 0 : }
1002 : // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
1003 : // batch may not initially fit, but will eventually fit (since it is smaller than
1004 : // largeBatchThreshold).
1005 2 : for err == arenaskl.ErrArenaFull {
1006 2 : flushMem()
1007 2 : ensureMem(seqNum)
1008 2 : err = mem.prepare(&b)
1009 2 : if err != nil && err != arenaskl.ErrArenaFull {
1010 0 : return nil, 0, err
1011 0 : }
1012 : }
1013 2 : if err = mem.apply(&b, seqNum); err != nil {
1014 0 : return nil, 0, err
1015 0 : }
1016 2 : mem.writerUnref()
1017 : }
1018 2 : buf.Reset()
1019 : }
1020 :
1021 2 : d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
1022 2 : jobID, base.DiskFileNum(ll.Num).String(), offset, keysReplayed, batchesReplayed)
1023 2 : if !d.opts.ReadOnly {
1024 2 : flushMem()
1025 2 : }
1026 :
1027 : // mem is nil here, if !ReadOnly.
1028 2 : return flushableIngests, maxSeqNum, err
1029 : }
1030 :
1031 2 : func readOptionsFile(opts *Options, path string) (string, error) {
1032 2 : f, err := opts.FS.Open(path)
1033 2 : if err != nil {
1034 0 : return "", err
1035 0 : }
1036 2 : defer f.Close()
1037 2 :
1038 2 : data, err := io.ReadAll(f)
1039 2 : if err != nil {
1040 0 : return "", err
1041 0 : }
1042 2 : return string(data), nil
1043 : }
1044 :
1045 : // DBDesc briefly describes high-level state about a database.
1046 : type DBDesc struct {
1047 : // Exists is true if an existing database was found.
1048 : Exists bool
1049 : // FormatMajorVersion indicates the database's current format
1050 : // version.
1051 : FormatMajorVersion FormatMajorVersion
1052 : // ManifestFilename is the filename of the current active manifest,
1053 : // if the database exists.
1054 : ManifestFilename string
1055 : // OptionsFilename is the filename of the most recent OPTIONS file, if it
1056 : // exists.
1057 : OptionsFilename string
1058 : }
1059 :
1060 : // String implements fmt.Stringer.
1061 1 : func (d *DBDesc) String() string {
1062 1 : if !d.Exists {
1063 1 : return "uninitialized"
1064 1 : }
1065 1 : var buf bytes.Buffer
1066 1 : fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
1067 1 : fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
1068 1 : fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
1069 1 : return buf.String()
1070 : }
1071 :
1072 : // Peek looks for an existing database in dirname on the provided FS. It
1073 : // returns a brief description of the database. Peek is read-only and
1074 : // does not open the database
1075 1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
1076 1 : ls, err := fs.List(dirname)
1077 1 : if err != nil {
1078 1 : return nil, err
1079 1 : }
1080 :
1081 1 : vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
1082 1 : if err != nil {
1083 0 : return nil, err
1084 0 : }
1085 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1086 : // PeekMarker variant that avoids opening the directory.
1087 1 : if err := versMarker.Close(); err != nil {
1088 0 : return nil, err
1089 0 : }
1090 :
1091 : // Find the currently active manifest, if there is one.
1092 1 : manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
1093 1 : if err != nil {
1094 0 : return nil, err
1095 0 : }
1096 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1097 : // PeekMarker variant that avoids opening the directory.
1098 1 : if err := manifestMarker.Close(); err != nil {
1099 0 : return nil, err
1100 0 : }
1101 :
1102 1 : desc := &DBDesc{
1103 1 : Exists: exists,
1104 1 : FormatMajorVersion: vers,
1105 1 : }
1106 1 :
1107 1 : // Find the OPTIONS file with the highest file number within the list of
1108 1 : // directory entries.
1109 1 : var previousOptionsFileNum base.DiskFileNum
1110 1 : for _, filename := range ls {
1111 1 : ft, fn, ok := base.ParseFilename(fs, filename)
1112 1 : if !ok || ft != fileTypeOptions || fn < previousOptionsFileNum {
1113 1 : continue
1114 : }
1115 1 : previousOptionsFileNum = fn
1116 1 : desc.OptionsFilename = fs.PathJoin(dirname, filename)
1117 : }
1118 :
1119 1 : if exists {
1120 1 : desc.ManifestFilename = base.MakeFilepath(fs, dirname, fileTypeManifest, manifestFileNum)
1121 1 : }
1122 1 : return desc, nil
1123 : }
1124 :
1125 : // LockDirectory acquires the database directory lock in the named directory,
1126 : // preventing another process from opening the database. LockDirectory returns a
1127 : // handle to the held lock that may be passed to Open through Options.Lock to
1128 : // subsequently open the database, skipping lock acquistion during Open.
1129 : //
1130 : // LockDirectory may be used to expand the critical section protected by the
1131 : // database lock to include setup before the call to Open.
1132 2 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
1133 2 : fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, fileTypeLock, base.DiskFileNum(0)))
1134 2 : if err != nil {
1135 1 : return nil, err
1136 1 : }
1137 2 : l := &Lock{dirname: dirname, fileLock: fileLock}
1138 2 : l.refs.Store(1)
1139 2 : invariants.SetFinalizer(l, func(obj interface{}) {
1140 2 : if refs := obj.(*Lock).refs.Load(); refs > 0 {
1141 0 : panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
1142 : }
1143 : })
1144 2 : return l, nil
1145 : }
1146 :
1147 : // Lock represents a file lock on a directory. It may be passed to Open through
1148 : // Options.Lock to elide lock aquisition during Open.
1149 : type Lock struct {
1150 : dirname string
1151 : fileLock io.Closer
1152 : // refs is a count of the number of handles on the lock. refs must be 0, 1
1153 : // or 2.
1154 : //
1155 : // When acquired by the client and passed to Open, refs = 1 and the Open
1156 : // call increments it to 2. When the database is closed, it's decremented to
1157 : // 1. Finally when the original caller, calls Close on the Lock, it's
1158 : // drecemented to zero and the underlying file lock is released.
1159 : //
1160 : // When Open acquires the file lock, refs remains at 1 until the database is
1161 : // closed.
1162 : refs atomic.Int32
1163 : }
1164 :
1165 1 : func (l *Lock) refForOpen() error {
1166 1 : // During Open, when a user passed in a lock, the reference count must be
1167 1 : // exactly 1. If it's zero, the lock is no longer held and is invalid. If
1168 1 : // it's 2, the lock is already in use by another database within the
1169 1 : // process.
1170 1 : if !l.refs.CompareAndSwap(1, 2) {
1171 1 : return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
1172 1 : }
1173 1 : return nil
1174 : }
1175 :
1176 : // Close releases the lock, permitting another process to lock and open the
1177 : // database. Close must not be called until after a database using the Lock has
1178 : // been closed.
1179 2 : func (l *Lock) Close() error {
1180 2 : if l.refs.Add(-1) > 0 {
1181 1 : return nil
1182 1 : }
1183 2 : defer func() { l.fileLock = nil }()
1184 2 : return l.fileLock.Close()
1185 : }
1186 :
1187 1 : func (l *Lock) pathMatches(dirname string) error {
1188 1 : if dirname == l.dirname {
1189 1 : return nil
1190 1 : }
1191 : // Check for relative paths, symlinks, etc. This isn't ideal because we're
1192 : // circumventing the vfs.FS interface here.
1193 : //
1194 : // TODO(jackson): We could add support for retrieving file inodes through Stat
1195 : // calls in the VFS interface on platforms where it's available and use that
1196 : // to differentiate.
1197 1 : dirStat, err1 := os.Stat(dirname)
1198 1 : lockDirStat, err2 := os.Stat(l.dirname)
1199 1 : if err1 == nil && err2 == nil && os.SameFile(dirStat, lockDirStat) {
1200 1 : return nil
1201 1 : }
1202 0 : return errors.Join(
1203 0 : errors.Newf("pebble: opts.Lock acquired in %q not %q", l.dirname, dirname),
1204 0 : err1, err2)
1205 : }
1206 :
1207 : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
1208 : // does not exist.
1209 : //
1210 : // Note that errors can be wrapped with more details; use errors.Is().
1211 : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
1212 :
1213 : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
1214 : // already exists.
1215 : //
1216 : // Note that errors can be wrapped with more details; use errors.Is().
1217 : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
1218 :
1219 : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
1220 : // already exists and is not pristine.
1221 : //
1222 : // Note that errors can be wrapped with more details; use errors.Is().
1223 : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
1224 :
1225 : // IsCorruptionError returns true if the given error indicates database
1226 : // corruption.
1227 1 : func IsCorruptionError(err error) bool {
1228 1 : return errors.Is(err, base.ErrCorruption)
1229 1 : }
1230 :
1231 2 : func checkConsistency(v *manifest.Version, dirname string, objProvider objstorage.Provider) error {
1232 2 : var errs []error
1233 2 : dedup := make(map[base.DiskFileNum]struct{})
1234 2 : for level, files := range v.Levels {
1235 2 : iter := files.Iter()
1236 2 : for f := iter.First(); f != nil; f = iter.Next() {
1237 2 : backingState := f.FileBacking
1238 2 : if _, ok := dedup[backingState.DiskFileNum]; ok {
1239 2 : continue
1240 : }
1241 2 : dedup[backingState.DiskFileNum] = struct{}{}
1242 2 : fileNum := backingState.DiskFileNum
1243 2 : fileSize := backingState.Size
1244 2 : // We skip over remote objects; those are instead checked asynchronously
1245 2 : // by the table stats loading job.
1246 2 : meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
1247 2 : var size int64
1248 2 : if err == nil {
1249 2 : if meta.IsRemote() {
1250 2 : continue
1251 : }
1252 2 : size, err = objProvider.Size(meta)
1253 : }
1254 2 : if err != nil {
1255 1 : errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
1256 1 : continue
1257 : }
1258 :
1259 2 : if size != int64(fileSize) {
1260 0 : errs = append(errs, errors.Errorf(
1261 0 : "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
1262 0 : errors.Safe(level), fileNum, objProvider.Path(meta),
1263 0 : errors.Safe(size), errors.Safe(fileSize)))
1264 0 : continue
1265 : }
1266 : }
1267 : }
1268 2 : return errors.Join(errs...)
1269 : }
1270 :
1271 : type walEventListenerAdaptor struct {
1272 : l *EventListener
1273 : }
1274 :
1275 2 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
1276 2 : // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
1277 2 : // is insufficient to infer whether primary or secondary.
1278 2 : wci := WALCreateInfo{
1279 2 : JobID: ci.JobID,
1280 2 : Path: ci.Path,
1281 2 : FileNum: base.DiskFileNum(ci.Num),
1282 2 : RecycledFileNum: ci.RecycledFileNum,
1283 2 : Err: ci.Err,
1284 2 : }
1285 2 : l.l.WALCreated(wci)
1286 2 : }
|