Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "math"
15 : "os"
16 : "slices"
17 : "sync/atomic"
18 : "time"
19 :
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/errors/oserror"
22 : "github.com/cockroachdb/pebble/internal/arenaskl"
23 : "github.com/cockroachdb/pebble/internal/base"
24 : "github.com/cockroachdb/pebble/internal/cache"
25 : "github.com/cockroachdb/pebble/internal/constants"
26 : "github.com/cockroachdb/pebble/internal/invariants"
27 : "github.com/cockroachdb/pebble/internal/manifest"
28 : "github.com/cockroachdb/pebble/internal/manual"
29 : "github.com/cockroachdb/pebble/objstorage"
30 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
31 : "github.com/cockroachdb/pebble/objstorage/remote"
32 : "github.com/cockroachdb/pebble/record"
33 : "github.com/cockroachdb/pebble/sstable"
34 : "github.com/cockroachdb/pebble/vfs"
35 : "github.com/prometheus/client_golang/prometheus"
36 : )
37 :
38 : const (
39 : initialMemTableSize = 256 << 10 // 256 KB
40 :
41 : // The max batch size is limited by the uint32 offsets stored in
42 : // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
43 : //
44 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
45 : // end of an allocation fits in uint32.
46 : //
47 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
48 : // 2GB).
49 : maxBatchSize = constants.MaxUint32OrInt
50 :
51 : // The max memtable size is limited by the uint32 offsets stored in
52 : // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
53 : //
54 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
55 : // end of an allocation fits in uint32.
56 : //
57 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
58 : // 2GB).
59 : maxMemTableSize = constants.MaxUint32OrInt
60 : )
61 :
62 : // TableCacheSize can be used to determine the table
63 : // cache size for a single db, given the maximum open
64 : // files which can be used by a table cache which is
65 : // only used by a single db.
66 2 : func TableCacheSize(maxOpenFiles int) int {
67 2 : tableCacheSize := maxOpenFiles - numNonTableCacheFiles
68 2 : if tableCacheSize < minTableCacheSize {
69 1 : tableCacheSize = minTableCacheSize
70 1 : }
71 2 : return tableCacheSize
72 : }
73 :
74 : // Open opens a DB whose files live in the given directory.
75 2 : func Open(dirname string, opts *Options) (db *DB, err error) {
76 2 : // Make a copy of the options so that we don't mutate the passed in options.
77 2 : opts = opts.Clone()
78 2 : opts = opts.EnsureDefaults()
79 2 : if err := opts.Validate(); err != nil {
80 0 : return nil, err
81 0 : }
82 2 : if opts.LoggerAndTracer == nil {
83 2 : opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
84 2 : } else {
85 1 : opts.Logger = opts.LoggerAndTracer
86 1 : }
87 :
88 : // In all error cases, we return db = nil; this is used by various
89 : // deferred cleanups.
90 :
91 : // Open the database and WAL directories first.
92 2 : walDirname, dataDir, walDir, err := prepareAndOpenDirs(dirname, opts)
93 2 : if err != nil {
94 1 : return nil, errors.Wrapf(err, "error opening database at %q", dirname)
95 1 : }
96 2 : defer func() {
97 2 : if db == nil {
98 1 : if walDir != dataDir {
99 0 : walDir.Close()
100 0 : }
101 1 : dataDir.Close()
102 : }
103 : }()
104 :
105 : // Lock the database directory.
106 2 : var fileLock *Lock
107 2 : if opts.Lock != nil {
108 1 : // The caller already acquired the database lock. Ensure that the
109 1 : // directory matches.
110 1 : if dirname != opts.Lock.dirname {
111 0 : return nil, errors.Newf("pebble: opts.Lock acquired in %q not %q", opts.Lock.dirname, dirname)
112 0 : }
113 1 : if err := opts.Lock.refForOpen(); err != nil {
114 1 : return nil, err
115 1 : }
116 1 : fileLock = opts.Lock
117 2 : } else {
118 2 : fileLock, err = LockDirectory(dirname, opts.FS)
119 2 : if err != nil {
120 1 : return nil, err
121 1 : }
122 : }
123 2 : defer func() {
124 2 : if db == nil {
125 1 : fileLock.Close()
126 1 : }
127 : }()
128 :
129 : // Establish the format major version.
130 2 : formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname)
131 2 : if err != nil {
132 1 : return nil, err
133 1 : }
134 2 : defer func() {
135 2 : if db == nil {
136 1 : formatVersionMarker.Close()
137 1 : }
138 : }()
139 :
140 2 : noFormatVersionMarker := formatVersion == FormatDefault
141 2 : if noFormatVersionMarker {
142 2 : // We will initialize the store at the minimum possible format, then upgrade
143 2 : // the format to the desired one. This helps test the format upgrade code.
144 2 : formatVersion = FormatMinSupported
145 2 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
146 2 : formatVersion = FormatMinForSharedObjects
147 2 : }
148 : // There is no format version marker file. There are three cases:
149 : // - we are trying to open an existing store that was created at
150 : // FormatMostCompatible (the only one without a version marker file)
151 : // - we are creating a new store;
152 : // - we are retrying a failed creation.
153 : //
154 : // To error in the first case, we set ErrorIfNotPristine.
155 2 : opts.ErrorIfNotPristine = true
156 2 : defer func() {
157 2 : if err != nil && errors.Is(err, ErrDBNotPristine) {
158 0 : // We must be trying to open an existing store at FormatMostCompatible.
159 0 : // Correct the error in this case -we
160 0 : err = errors.Newf(
161 0 : "pebble: database %q written in format major version 1 which is no longer supported",
162 0 : dirname)
163 0 : }
164 : }()
165 2 : } else {
166 2 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone && formatVersion < FormatMinForSharedObjects {
167 0 : return nil, errors.Newf(
168 0 : "pebble: database %q configured with shared objects but written in too old format major version %d",
169 0 : formatVersion)
170 0 : }
171 : }
172 :
173 : // Find the currently active manifest, if there is one.
174 2 : manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname)
175 2 : if err != nil {
176 1 : return nil, errors.Wrapf(err, "pebble: database %q", dirname)
177 1 : }
178 2 : defer func() {
179 2 : if db == nil {
180 1 : manifestMarker.Close()
181 1 : }
182 : }()
183 :
184 : // Atomic markers may leave behind obsolete files if there's a crash
185 : // mid-update. Clean these up if we're not in read-only mode.
186 2 : if !opts.ReadOnly {
187 2 : if err := formatVersionMarker.RemoveObsolete(); err != nil {
188 0 : return nil, err
189 0 : }
190 2 : if err := manifestMarker.RemoveObsolete(); err != nil {
191 0 : return nil, err
192 0 : }
193 : }
194 :
195 2 : if opts.Cache == nil {
196 1 : opts.Cache = cache.New(cacheDefaultSize)
197 2 : } else {
198 2 : opts.Cache.Ref()
199 2 : }
200 :
201 2 : d := &DB{
202 2 : cacheID: opts.Cache.NewID(),
203 2 : dirname: dirname,
204 2 : walDirname: walDirname,
205 2 : opts: opts,
206 2 : cmp: opts.Comparer.Compare,
207 2 : equal: opts.equal(),
208 2 : merge: opts.Merger.Merge,
209 2 : split: opts.Comparer.Split,
210 2 : abbreviatedKey: opts.Comparer.AbbreviatedKey,
211 2 : largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
212 2 : fileLock: fileLock,
213 2 : dataDir: dataDir,
214 2 : walDir: walDir,
215 2 : logRecycler: logRecycler{limit: opts.MemTableStopWritesThreshold + 1},
216 2 : closed: new(atomic.Value),
217 2 : closedCh: make(chan struct{}),
218 2 : }
219 2 : d.mu.versions = &versionSet{}
220 2 : d.diskAvailBytes.Store(math.MaxUint64)
221 2 :
222 2 : defer func() {
223 2 : // If an error or panic occurs during open, attempt to release the manually
224 2 : // allocated memory resources. Note that rather than look for an error, we
225 2 : // look for the return of a nil DB pointer.
226 2 : if r := recover(); db == nil {
227 1 : // Release our references to the Cache. Note that both the DB, and
228 1 : // tableCache have a reference. When we release the reference to
229 1 : // the tableCache, and if there are no other references to
230 1 : // the tableCache, then the tableCache will also release its
231 1 : // reference to the cache.
232 1 : opts.Cache.Unref()
233 1 :
234 1 : if d.tableCache != nil {
235 1 : _ = d.tableCache.close()
236 1 : }
237 :
238 1 : for _, mem := range d.mu.mem.queue {
239 1 : switch t := mem.flushable.(type) {
240 1 : case *memTable:
241 1 : manual.Free(t.arenaBuf)
242 1 : t.arenaBuf = nil
243 : }
244 : }
245 1 : if d.cleanupManager != nil {
246 1 : d.cleanupManager.Close()
247 1 : }
248 1 : if d.objProvider != nil {
249 1 : d.objProvider.Close()
250 1 : }
251 1 : if r != nil {
252 1 : panic(r)
253 : }
254 : }
255 : }()
256 :
257 2 : d.commit = newCommitPipeline(commitEnv{
258 2 : logSeqNum: &d.mu.versions.logSeqNum,
259 2 : visibleSeqNum: &d.mu.versions.visibleSeqNum,
260 2 : apply: d.commitApply,
261 2 : write: d.commitWrite,
262 2 : })
263 2 : d.mu.nextJobID = 1
264 2 : d.mu.mem.nextSize = opts.MemTableSize
265 2 : if d.mu.mem.nextSize > initialMemTableSize {
266 2 : d.mu.mem.nextSize = initialMemTableSize
267 2 : }
268 2 : d.mu.compact.cond.L = &d.mu.Mutex
269 2 : d.mu.compact.inProgress = make(map[*compaction]struct{})
270 2 : d.mu.compact.noOngoingFlushStartTime = time.Now()
271 2 : d.mu.snapshots.init()
272 2 : // logSeqNum is the next sequence number that will be assigned.
273 2 : // Start assigning sequence numbers from base.SeqNumStart to leave
274 2 : // room for reserved sequence numbers (see comments around
275 2 : // SeqNumStart).
276 2 : d.mu.versions.logSeqNum.Store(base.SeqNumStart)
277 2 : d.mu.formatVers.vers.Store(uint64(formatVersion))
278 2 : d.mu.formatVers.marker = formatVersionMarker
279 2 :
280 2 : d.timeNow = time.Now
281 2 : d.openedAt = d.timeNow()
282 2 :
283 2 : d.mu.Lock()
284 2 : defer d.mu.Unlock()
285 2 :
286 2 : jobID := d.mu.nextJobID
287 2 : d.mu.nextJobID++
288 2 :
289 2 : if !manifestExists {
290 2 : // DB does not exist.
291 2 : if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
292 1 : return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
293 1 : }
294 :
295 : // Create the DB.
296 2 : if err := d.mu.versions.create(jobID, dirname, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
297 1 : return nil, err
298 1 : }
299 2 : } else {
300 2 : if opts.ErrorIfExists {
301 1 : return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
302 1 : }
303 : // Load the version set.
304 2 : if err := d.mu.versions.load(dirname, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
305 1 : return nil, err
306 1 : }
307 2 : if opts.ErrorIfNotPristine {
308 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
309 1 : d.mu.versions.addLiveFileNums(liveFileNums)
310 1 : if len(liveFileNums) != 0 {
311 1 : return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
312 1 : }
313 : }
314 : }
315 :
316 : // In read-only mode, we replay directly into the mutable memtable but never
317 : // flush it. We need to delay creation of the memtable until we know the
318 : // sequence number of the first batch that will be inserted.
319 2 : if !d.opts.ReadOnly {
320 2 : var entry *flushableEntry
321 2 : d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load())
322 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
323 2 : }
324 :
325 : // List the objects
326 2 : ls, err := opts.FS.List(d.walDirname)
327 2 : if err != nil {
328 1 : return nil, err
329 1 : }
330 2 : if d.dirname != d.walDirname {
331 2 : ls2, err := opts.FS.List(d.dirname)
332 2 : if err != nil {
333 0 : return nil, err
334 0 : }
335 2 : ls = append(ls, ls2...)
336 : }
337 2 : providerSettings := objstorageprovider.Settings{
338 2 : Logger: opts.Logger,
339 2 : FS: opts.FS,
340 2 : FSDirName: dirname,
341 2 : FSDirInitialListing: ls,
342 2 : FSCleaner: opts.Cleaner,
343 2 : NoSyncOnClose: opts.NoSyncOnClose,
344 2 : BytesPerSync: opts.BytesPerSync,
345 2 : }
346 2 : providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
347 2 : providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
348 2 : providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
349 2 : providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
350 2 :
351 2 : d.objProvider, err = objstorageprovider.Open(providerSettings)
352 2 : if err != nil {
353 1 : return nil, err
354 1 : }
355 :
356 2 : d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
357 2 :
358 2 : if manifestExists {
359 2 : curVersion := d.mu.versions.currentVersion()
360 2 : if err := checkConsistency(curVersion, dirname, d.objProvider); err != nil {
361 0 : return nil, err
362 0 : }
363 : }
364 :
365 2 : tableCacheSize := TableCacheSize(opts.MaxOpenFiles)
366 2 : d.tableCache = newTableCacheContainer(
367 2 : opts.TableCache, d.cacheID, d.objProvider, d.opts, tableCacheSize,
368 2 : &sstable.CategoryStatsCollector{})
369 2 : d.newIters = d.tableCache.newIters
370 2 : d.tableNewRangeKeyIter = d.tableCache.newRangeKeyIter
371 2 :
372 2 : // Replay any newer log files than the ones named in the manifest.
373 2 : type fileNumAndName struct {
374 2 : num base.DiskFileNum
375 2 : name string
376 2 : }
377 2 : var logFiles []fileNumAndName
378 2 : var previousOptionsFileNum FileNum
379 2 : var previousOptionsFilename string
380 2 : for _, filename := range ls {
381 2 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
382 2 : if !ok {
383 2 : continue
384 : }
385 :
386 : // Don't reuse any obsolete file numbers to avoid modifying an
387 : // ingested sstable's original external file.
388 2 : if d.mu.versions.nextFileNum <= uint64(fn.FileNum()) {
389 2 : d.mu.versions.nextFileNum = uint64(fn.FileNum()) + 1
390 2 : }
391 :
392 2 : switch ft {
393 2 : case fileTypeLog:
394 2 : if fn >= d.mu.versions.minUnflushedLogNum {
395 2 : logFiles = append(logFiles, fileNumAndName{fn, filename})
396 2 : }
397 2 : if d.logRecycler.minRecycleLogNum <= fn.FileNum() {
398 2 : d.logRecycler.minRecycleLogNum = fn.FileNum() + 1
399 2 : }
400 2 : case fileTypeOptions:
401 2 : if previousOptionsFileNum < fn.FileNum() {
402 2 : previousOptionsFileNum = fn.FileNum()
403 2 : previousOptionsFilename = filename
404 2 : }
405 1 : case fileTypeTemp, fileTypeOldTemp:
406 1 : if !d.opts.ReadOnly {
407 1 : // Some codepaths write to a temporary file and then
408 1 : // rename it to its final location when complete. A
409 1 : // temp file is leftover if a process exits before the
410 1 : // rename. Remove it.
411 1 : err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
412 1 : if err != nil {
413 0 : return nil, err
414 0 : }
415 : }
416 : }
417 : }
418 :
419 : // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
420 : // objProvider. This avoids FileNum collisions with obsolete sstables.
421 2 : objects := d.objProvider.List()
422 2 : for _, obj := range objects {
423 2 : if d.mu.versions.nextFileNum <= uint64(obj.DiskFileNum) {
424 1 : d.mu.versions.nextFileNum = uint64(obj.DiskFileNum) + 1
425 1 : }
426 : }
427 :
428 : // Validate the most-recent OPTIONS file, if there is one.
429 2 : var strictWALTail bool
430 2 : if previousOptionsFilename != "" {
431 2 : path := opts.FS.PathJoin(dirname, previousOptionsFilename)
432 2 : strictWALTail, err = checkOptions(opts, path)
433 2 : if err != nil {
434 1 : return nil, err
435 1 : }
436 : }
437 :
438 2 : slices.SortFunc(logFiles, func(a, b fileNumAndName) int {
439 2 : return cmp.Compare(a.num, b.num)
440 2 : })
441 :
442 2 : var ve versionEdit
443 2 : var toFlush flushableList
444 2 : for i, lf := range logFiles {
445 2 : lastWAL := i == len(logFiles)-1
446 2 : flush, maxSeqNum, err := d.replayWAL(jobID, &ve, opts.FS,
447 2 : opts.FS.PathJoin(d.walDirname, lf.name), lf.num, strictWALTail && !lastWAL)
448 2 : if err != nil {
449 1 : return nil, err
450 1 : }
451 2 : toFlush = append(toFlush, flush...)
452 2 : d.mu.versions.markFileNumUsed(lf.num)
453 2 : if d.mu.versions.logSeqNum.Load() < maxSeqNum {
454 2 : d.mu.versions.logSeqNum.Store(maxSeqNum)
455 2 : }
456 : }
457 2 : d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
458 2 :
459 2 : if !d.opts.ReadOnly {
460 2 : // Create an empty .log file.
461 2 : newLogNum := d.mu.versions.getNextDiskFileNum()
462 2 :
463 2 : // This logic is slightly different than RocksDB's. Specifically, RocksDB
464 2 : // sets MinUnflushedLogNum to max-recovered-log-num + 1. We set it to the
465 2 : // newLogNum. There should be no difference in using either value.
466 2 : ve.MinUnflushedLogNum = newLogNum
467 2 :
468 2 : // Create the manifest with the updated MinUnflushedLogNum before
469 2 : // creating the new log file. If we created the log file first, a
470 2 : // crash before the manifest is synced could leave two WALs with
471 2 : // unclean tails.
472 2 : d.mu.versions.logLock()
473 2 : if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo {
474 2 : return nil
475 2 : }); err != nil {
476 0 : return nil, err
477 0 : }
478 :
479 2 : for _, entry := range toFlush {
480 2 : entry.readerUnrefLocked(true)
481 2 : }
482 :
483 2 : newLogName := base.MakeFilepath(opts.FS, d.walDirname, fileTypeLog, newLogNum)
484 2 : d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: 0})
485 2 : logFile, err := opts.FS.Create(newLogName)
486 2 : if err != nil {
487 1 : return nil, err
488 1 : }
489 2 : if err := d.walDir.Sync(); err != nil {
490 1 : return nil, err
491 1 : }
492 2 : d.opts.EventListener.WALCreated(WALCreateInfo{
493 2 : JobID: jobID,
494 2 : Path: newLogName,
495 2 : FileNum: newLogNum,
496 2 : })
497 2 : // This isn't strictly necessary as we don't use the log number for
498 2 : // memtables being flushed, only for the next unflushed memtable.
499 2 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
500 2 :
501 2 : logFile = vfs.NewSyncingFile(logFile, vfs.SyncingFileOptions{
502 2 : NoSyncOnClose: d.opts.NoSyncOnClose,
503 2 : BytesPerSync: d.opts.WALBytesPerSync,
504 2 : PreallocateSize: d.walPreallocateSize(),
505 2 : })
506 2 : d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
507 2 : Buckets: FsyncLatencyBuckets,
508 2 : })
509 2 :
510 2 : logWriterConfig := record.LogWriterConfig{
511 2 : WALMinSyncInterval: d.opts.WALMinSyncInterval,
512 2 : WALFsyncLatency: d.mu.log.metrics.fsyncLatency,
513 2 : QueueSemChan: d.commit.logSyncQSem,
514 2 : }
515 2 : d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig)
516 2 : d.mu.versions.metrics.WAL.Files++
517 : }
518 2 : d.updateReadStateLocked(d.opts.DebugCheck)
519 2 :
520 2 : if !d.opts.ReadOnly {
521 2 : // If the Options specify a format major version higher than the
522 2 : // loaded database's, upgrade it. If this is a new database, this
523 2 : // code path also performs an initial upgrade from the starting
524 2 : // implicit MinSupported version.
525 2 : //
526 2 : // We ratchet the version this far into Open so that migrations have a read
527 2 : // state available. Note that this also results in creating/updating the
528 2 : // format version marker file.
529 2 : if opts.FormatMajorVersion > d.FormatMajorVersion() {
530 2 : if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
531 0 : return nil, err
532 0 : }
533 2 : } else if noFormatVersionMarker {
534 2 : // We are creating a new store. Create the format version marker file.
535 2 : if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
536 1 : return nil, err
537 1 : }
538 : }
539 :
540 : // Write the current options to disk.
541 2 : d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
542 2 : tmpPath := base.MakeFilepath(opts.FS, dirname, fileTypeTemp, d.optionsFileNum)
543 2 : optionsPath := base.MakeFilepath(opts.FS, dirname, fileTypeOptions, d.optionsFileNum)
544 2 :
545 2 : // Write them to a temporary file first, in case we crash before
546 2 : // we're done. A corrupt options file prevents opening the
547 2 : // database.
548 2 : optionsFile, err := opts.FS.Create(tmpPath)
549 2 : if err != nil {
550 1 : return nil, err
551 1 : }
552 2 : serializedOpts := []byte(opts.String())
553 2 : if _, err := optionsFile.Write(serializedOpts); err != nil {
554 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
555 1 : }
556 2 : d.optionsFileSize = uint64(len(serializedOpts))
557 2 : if err := optionsFile.Sync(); err != nil {
558 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
559 1 : }
560 2 : if err := optionsFile.Close(); err != nil {
561 0 : return nil, err
562 0 : }
563 : // Atomically rename to the OPTIONS-XXXXXX path. This rename is
564 : // guaranteed to be atomic because the destination path does not
565 : // exist.
566 2 : if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
567 1 : return nil, err
568 1 : }
569 2 : if err := d.dataDir.Sync(); err != nil {
570 1 : return nil, err
571 1 : }
572 : }
573 :
574 2 : if !d.opts.ReadOnly {
575 2 : d.scanObsoleteFiles(ls)
576 2 : d.deleteObsoleteFiles(jobID)
577 2 : } else {
578 1 : // All the log files are obsolete.
579 1 : d.mu.versions.metrics.WAL.Files = int64(len(logFiles))
580 1 : }
581 2 : d.mu.tableStats.cond.L = &d.mu.Mutex
582 2 : d.mu.tableValidation.cond.L = &d.mu.Mutex
583 2 : if !d.opts.ReadOnly {
584 2 : d.maybeCollectTableStatsLocked()
585 2 : }
586 2 : d.calculateDiskAvailableBytes()
587 2 :
588 2 : d.maybeScheduleFlush()
589 2 : d.maybeScheduleCompaction()
590 2 :
591 2 : // Note: this is a no-op if invariants are disabled or race is enabled.
592 2 : //
593 2 : // Setting a finalizer on *DB causes *DB to never be reclaimed and the
594 2 : // finalizer to never be run. The problem is due to this limitation of
595 2 : // finalizers mention in the SetFinalizer docs:
596 2 : //
597 2 : // If a cyclic structure includes a block with a finalizer, that cycle is
598 2 : // not guaranteed to be garbage collected and the finalizer is not
599 2 : // guaranteed to run, because there is no ordering that respects the
600 2 : // dependencies.
601 2 : //
602 2 : // DB has cycles with several of its internal structures: readState,
603 2 : // newIters, tableCache, versions, etc. Each of this individually cause a
604 2 : // cycle and prevent the finalizer from being run. But we can workaround this
605 2 : // finializer limitation by setting a finalizer on another object that is
606 2 : // tied to the lifetime of DB: the DB.closed atomic.Value.
607 2 : dPtr := fmt.Sprintf("%p", d)
608 2 : invariants.SetFinalizer(d.closed, func(obj interface{}) {
609 0 : v := obj.(*atomic.Value)
610 0 : if err := v.Load(); err == nil {
611 0 : fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
612 0 : os.Exit(1)
613 0 : }
614 : })
615 :
616 2 : return d, nil
617 : }
618 :
619 : // prepareAndOpenDirs opens the directories for the store (and creates them if
620 : // necessary).
621 : //
622 : // Returns an error if ReadOnly is set and the directories don't exist.
623 : func prepareAndOpenDirs(
624 : dirname string, opts *Options,
625 2 : ) (walDirname string, dataDir vfs.File, walDir vfs.File, err error) {
626 2 : walDirname = opts.WALDir
627 2 : if opts.WALDir == "" {
628 2 : walDirname = dirname
629 2 : }
630 :
631 : // Create directories if needed.
632 2 : if !opts.ReadOnly {
633 2 : if err := opts.FS.MkdirAll(dirname, 0755); err != nil {
634 1 : return "", nil, nil, err
635 1 : }
636 2 : if walDirname != dirname {
637 2 : if err := opts.FS.MkdirAll(walDirname, 0755); err != nil {
638 0 : return "", nil, nil, err
639 0 : }
640 : }
641 : }
642 :
643 2 : dataDir, err = opts.FS.OpenDir(dirname)
644 2 : if err != nil {
645 1 : if opts.ReadOnly && oserror.IsNotExist(err) {
646 1 : return "", nil, nil, errors.Errorf("pebble: database %q does not exist", dirname)
647 1 : }
648 1 : return "", nil, nil, err
649 : }
650 :
651 2 : if walDirname == dirname {
652 2 : walDir = dataDir
653 2 : } else {
654 2 : walDir, err = opts.FS.OpenDir(walDirname)
655 2 : if err != nil {
656 1 : dataDir.Close()
657 1 : return "", nil, nil, err
658 1 : }
659 : }
660 2 : return walDirname, dataDir, walDir, nil
661 : }
662 :
663 : // GetVersion returns the engine version string from the latest options
664 : // file present in dir. Used to check what Pebble or RocksDB version was last
665 : // used to write to the database stored in this directory. An empty string is
666 : // returned if no valid OPTIONS file with a version key was found.
667 1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
668 1 : ls, err := fs.List(dir)
669 1 : if err != nil {
670 0 : return "", err
671 0 : }
672 1 : var version string
673 1 : lastOptionsSeen := FileNum(0)
674 1 : for _, filename := range ls {
675 1 : ft, fn, ok := base.ParseFilename(fs, filename)
676 1 : if !ok {
677 1 : continue
678 : }
679 1 : switch ft {
680 1 : case fileTypeOptions:
681 1 : // If this file has a higher number than the last options file
682 1 : // processed, reset version. This is because rocksdb often
683 1 : // writes multiple options files without deleting previous ones.
684 1 : // Otherwise, skip parsing this options file.
685 1 : if fn.FileNum() > lastOptionsSeen {
686 1 : version = ""
687 1 : lastOptionsSeen = fn.FileNum()
688 1 : } else {
689 1 : continue
690 : }
691 1 : f, err := fs.Open(fs.PathJoin(dir, filename))
692 1 : if err != nil {
693 0 : return "", err
694 0 : }
695 1 : data, err := io.ReadAll(f)
696 1 : f.Close()
697 1 :
698 1 : if err != nil {
699 0 : return "", err
700 0 : }
701 1 : err = parseOptions(string(data), func(section, key, value string) error {
702 1 : switch {
703 1 : case section == "Version":
704 1 : switch key {
705 1 : case "pebble_version":
706 1 : version = value
707 1 : case "rocksdb_version":
708 1 : version = fmt.Sprintf("rocksdb v%s", value)
709 : }
710 : }
711 1 : return nil
712 : })
713 1 : if err != nil {
714 0 : return "", err
715 0 : }
716 : }
717 : }
718 1 : return version, nil
719 : }
720 :
721 : // replayWAL replays the edits in the specified log file. If the DB is in read
722 : // only mode, then the WALs are replayed into memtables and not flushed. If
723 : // the DB is not in read only mode, then the contents of the WAL are
724 : // guaranteed to be flushed. Note that this flushing is very important for
725 : // guaranteeing durability: the application may have had a number of pending
726 : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
727 : // happened but the corresponding data may now be readable from the WAL (while
728 : // sitting in write-back caches in the kernel or the storage device). By
729 : // reading the WAL (including the non-fsynced data) and then flushing all
730 : // these changes (flush does fsyncs), we are able to guarantee that the
731 : // initial state of the DB is durable.
732 : //
733 : // The toFlush return value is a list of flushables associated with the WAL
734 : // being replayed which will be flushed. Once the version edit has been applied
735 : // to the manifest, it is up to the caller of replayWAL to unreference the
736 : // toFlush flushables returned by replayWAL.
737 : //
738 : // d.mu must be held when calling this, but the mutex may be dropped and
739 : // re-acquired during the course of this method.
740 : func (d *DB) replayWAL(
741 : jobID int,
742 : ve *versionEdit,
743 : fs vfs.FS,
744 : filename string,
745 : logNum base.DiskFileNum,
746 : strictWALTail bool,
747 2 : ) (toFlush flushableList, maxSeqNum uint64, err error) {
748 2 : file, err := fs.Open(filename)
749 2 : if err != nil {
750 0 : return nil, 0, err
751 0 : }
752 2 : defer file.Close()
753 2 : var (
754 2 : b Batch
755 2 : buf bytes.Buffer
756 2 : mem *memTable
757 2 : entry *flushableEntry
758 2 : rr = record.NewReader(file, logNum)
759 2 : offset int64 // byte offset in rr
760 2 : lastFlushOffset int64
761 2 : keysReplayed int64 // number of keys replayed
762 2 : batchesReplayed int64 // number of batches replayed
763 2 : )
764 2 :
765 2 : // TODO(jackson): This function is interspersed with panics, in addition to
766 2 : // corruption error propagation. Audit them to ensure we're truly only
767 2 : // panicking where the error points to Pebble bug and not user or
768 2 : // hardware-induced corruption.
769 2 :
770 2 : if d.opts.ReadOnly {
771 1 : // In read-only mode, we replay directly into the mutable memtable which will
772 1 : // never be flushed.
773 1 : mem = d.mu.mem.mutable
774 1 : if mem != nil {
775 1 : entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
776 1 : }
777 : }
778 :
779 : // Flushes the current memtable, if not nil.
780 2 : flushMem := func() {
781 2 : if mem == nil {
782 2 : return
783 2 : }
784 2 : var logSize uint64
785 2 : if offset >= lastFlushOffset {
786 2 : logSize = uint64(offset - lastFlushOffset)
787 2 : }
788 : // Else, this was the initial memtable in the read-only case which must have
789 : // been empty, but we need to flush it since we don't want to add to it later.
790 2 : lastFlushOffset = offset
791 2 : entry.logSize = logSize
792 2 : if !d.opts.ReadOnly {
793 2 : toFlush = append(toFlush, entry)
794 2 : }
795 2 : mem, entry = nil, nil
796 : }
797 : // Creates a new memtable if there is no current memtable.
798 2 : ensureMem := func(seqNum uint64) {
799 2 : if mem != nil {
800 2 : return
801 2 : }
802 2 : mem, entry = d.newMemTable(logNum, seqNum)
803 2 : if d.opts.ReadOnly {
804 1 : d.mu.mem.mutable = mem
805 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
806 1 : }
807 : }
808 :
809 : // updateVE is used to update ve with information about new files created
810 : // during the flush of any flushable not of type ingestedFlushable. For the
811 : // flushable of type ingestedFlushable we use custom handling below.
812 2 : updateVE := func() error {
813 2 : // TODO(bananabrick): See if we can use the actual base level here,
814 2 : // instead of using 1.
815 2 : c := newFlush(d.opts, d.mu.versions.currentVersion(),
816 2 : 1 /* base level */, toFlush, d.timeNow())
817 2 : newVE, _, _, err := d.runCompaction(jobID, c)
818 2 : if err != nil {
819 0 : return errors.Wrapf(err, "running compaction during WAL replay")
820 0 : }
821 2 : ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...)
822 2 : return nil
823 : }
824 2 : defer func() {
825 2 : if err != nil {
826 1 : err = errors.WithDetailf(err, "replaying log %s, offset %d", logNum, offset)
827 1 : }
828 : }()
829 :
830 2 : for {
831 2 : offset = rr.Offset()
832 2 : r, err := rr.Next()
833 2 : if err == nil {
834 2 : _, err = io.Copy(&buf, r)
835 2 : }
836 2 : if err != nil {
837 2 : // It is common to encounter a zeroed or invalid chunk due to WAL
838 2 : // preallocation and WAL recycling. We need to distinguish these
839 2 : // errors from EOF in order to recognize that the record was
840 2 : // truncated and to avoid replaying subsequent WALs, but want
841 2 : // to otherwise treat them like EOF.
842 2 : if err == io.EOF {
843 2 : break
844 1 : } else if record.IsInvalidRecord(err) && !strictWALTail {
845 1 : break
846 : }
847 1 : return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
848 : }
849 :
850 2 : if buf.Len() < batchHeaderLen {
851 0 : return nil, 0, base.CorruptionErrorf("pebble: corrupt log file %q (num %s)",
852 0 : filename, errors.Safe(logNum))
853 0 : }
854 :
855 2 : if d.opts.ErrorIfNotPristine {
856 1 : return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
857 1 : }
858 :
859 : // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
860 : // which is used below.
861 2 : b = Batch{}
862 2 : b.db = d
863 2 : b.SetRepr(buf.Bytes())
864 2 : seqNum := b.SeqNum()
865 2 : maxSeqNum = seqNum + uint64(b.Count())
866 2 : keysReplayed += int64(b.Count())
867 2 : batchesReplayed++
868 2 : {
869 2 : br := b.Reader()
870 2 : if kind, encodedFileNum, _, ok, err := br.Next(); err != nil {
871 0 : return nil, 0, err
872 2 : } else if ok && kind == InternalKeyKindIngestSST {
873 2 : fileNums := make([]base.DiskFileNum, 0, b.Count())
874 2 : addFileNum := func(encodedFileNum []byte) {
875 2 : fileNum, n := binary.Uvarint(encodedFileNum)
876 2 : if n <= 0 {
877 0 : panic("pebble: ingest sstable file num is invalid.")
878 : }
879 2 : fileNums = append(fileNums, base.FileNum(fileNum).DiskFileNum())
880 : }
881 2 : addFileNum(encodedFileNum)
882 2 :
883 2 : for i := 1; i < int(b.Count()); i++ {
884 2 : kind, encodedFileNum, _, ok, err := br.Next()
885 2 : if err != nil {
886 0 : return nil, 0, err
887 0 : }
888 2 : if kind != InternalKeyKindIngestSST {
889 0 : panic("pebble: invalid batch key kind.")
890 : }
891 2 : if !ok {
892 0 : panic("pebble: invalid batch count.")
893 : }
894 2 : addFileNum(encodedFileNum)
895 : }
896 :
897 2 : if _, _, _, ok, err := br.Next(); err != nil {
898 0 : return nil, 0, err
899 2 : } else if ok {
900 0 : panic("pebble: invalid number of entries in batch.")
901 : }
902 :
903 2 : meta := make([]*fileMetadata, len(fileNums))
904 2 : for i, n := range fileNums {
905 2 : var readable objstorage.Readable
906 2 : objMeta, err := d.objProvider.Lookup(fileTypeTable, n)
907 2 : if err != nil {
908 0 : return nil, 0, errors.Wrap(err, "pebble: error when looking up ingested SSTs")
909 0 : }
910 2 : if objMeta.IsRemote() {
911 1 : readable, err = d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true})
912 1 : if err != nil {
913 0 : return nil, 0, errors.Wrap(err, "pebble: error when opening flushable ingest files")
914 0 : }
915 2 : } else {
916 2 : path := base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n)
917 2 : f, err := d.opts.FS.Open(path)
918 2 : if err != nil {
919 0 : return nil, 0, err
920 0 : }
921 :
922 2 : readable, err = sstable.NewSimpleReadable(f)
923 2 : if err != nil {
924 0 : return nil, 0, err
925 0 : }
926 : }
927 : // NB: ingestLoad1 will close readable.
928 2 : meta[i], err = ingestLoad1(d.opts, d.FormatMajorVersion(), readable, d.cacheID, n)
929 2 : if err != nil {
930 0 : return nil, 0, errors.Wrap(err, "pebble: error when loading flushable ingest files")
931 0 : }
932 : }
933 :
934 2 : if uint32(len(meta)) != b.Count() {
935 0 : panic("pebble: couldn't load all files in WAL entry.")
936 : }
937 :
938 2 : entry, err = d.newIngestedFlushableEntry(
939 2 : meta, seqNum, logNum,
940 2 : )
941 2 : if err != nil {
942 0 : return nil, 0, err
943 0 : }
944 :
945 2 : if d.opts.ReadOnly {
946 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
947 1 : // We added the IngestSST flushable to the queue. But there
948 1 : // must be at least one WAL entry waiting to be replayed. We
949 1 : // have to ensure this newer WAL entry isn't replayed into
950 1 : // the current value of d.mu.mem.mutable because the current
951 1 : // mutable memtable exists before this flushable entry in
952 1 : // the memtable queue. To ensure this, we just need to unset
953 1 : // d.mu.mem.mutable. When a newer WAL is replayed, we will
954 1 : // set d.mu.mem.mutable to a newer value.
955 1 : d.mu.mem.mutable = nil
956 2 : } else {
957 2 : toFlush = append(toFlush, entry)
958 2 : // During WAL replay, the lsm only has L0, hence, the
959 2 : // baseLevel is 1. For the sake of simplicity, we place the
960 2 : // ingested files in L0 here, instead of finding their
961 2 : // target levels. This is a simplification for the sake of
962 2 : // simpler code. It is expected that WAL replay should be
963 2 : // rare, and that flushables of type ingestedFlushable
964 2 : // should also be rare. So, placing the ingested files in L0
965 2 : // is alright.
966 2 : //
967 2 : // TODO(bananabrick): Maybe refactor this function to allow
968 2 : // us to easily place ingested files in levels as low as
969 2 : // possible during WAL replay. It would require breaking up
970 2 : // the application of ve to the manifest into chunks and is
971 2 : // not pretty w/o a refactor to this function and how it's
972 2 : // used.
973 2 : c := newFlush(
974 2 : d.opts, d.mu.versions.currentVersion(),
975 2 : 1, /* base level */
976 2 : []*flushableEntry{entry},
977 2 : d.timeNow(),
978 2 : )
979 2 : for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
980 2 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata})
981 2 : }
982 : }
983 2 : return toFlush, maxSeqNum, nil
984 : }
985 : }
986 :
987 2 : if b.memTableSize >= uint64(d.largeBatchThreshold) {
988 1 : flushMem()
989 1 : // Make a copy of the data slice since it is currently owned by buf and will
990 1 : // be reused in the next iteration.
991 1 : b.data = slices.Clone(b.data)
992 1 : b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
993 1 : if err != nil {
994 0 : return nil, 0, err
995 0 : }
996 1 : entry := d.newFlushableEntry(b.flushable, logNum, b.SeqNum())
997 1 : // Disable memory accounting by adding a reader ref that will never be
998 1 : // removed.
999 1 : entry.readerRefs.Add(1)
1000 1 : if d.opts.ReadOnly {
1001 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1002 1 : // We added the flushable batch to the flushable to the queue.
1003 1 : // But there must be at least one WAL entry waiting to be
1004 1 : // replayed. We have to ensure this newer WAL entry isn't
1005 1 : // replayed into the current value of d.mu.mem.mutable because
1006 1 : // the current mutable memtable exists before this flushable
1007 1 : // entry in the memtable queue. To ensure this, we just need to
1008 1 : // unset d.mu.mem.mutable. When a newer WAL is replayed, we will
1009 1 : // set d.mu.mem.mutable to a newer value.
1010 1 : d.mu.mem.mutable = nil
1011 1 : } else {
1012 1 : toFlush = append(toFlush, entry)
1013 1 : }
1014 2 : } else {
1015 2 : ensureMem(seqNum)
1016 2 : if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
1017 0 : return nil, 0, err
1018 0 : }
1019 : // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
1020 : // batch may not initially fit, but will eventually fit (since it is smaller than
1021 : // largeBatchThreshold).
1022 2 : for err == arenaskl.ErrArenaFull {
1023 2 : flushMem()
1024 2 : ensureMem(seqNum)
1025 2 : err = mem.prepare(&b)
1026 2 : if err != nil && err != arenaskl.ErrArenaFull {
1027 0 : return nil, 0, err
1028 0 : }
1029 : }
1030 2 : if err = mem.apply(&b, seqNum); err != nil {
1031 0 : return nil, 0, err
1032 0 : }
1033 2 : mem.writerUnref()
1034 : }
1035 2 : buf.Reset()
1036 : }
1037 :
1038 2 : d.opts.Logger.Infof("[JOB %d] WAL file %s with log number %s stopped reading at offset: %d; replayed %d keys in %d batches", jobID, filename, logNum.String(), offset, keysReplayed, batchesReplayed)
1039 2 : flushMem()
1040 2 :
1041 2 : // mem is nil here.
1042 2 : if !d.opts.ReadOnly && batchesReplayed > 0 {
1043 2 : err = updateVE()
1044 2 : if err != nil {
1045 0 : return nil, 0, err
1046 0 : }
1047 : }
1048 2 : return toFlush, maxSeqNum, err
1049 : }
1050 :
1051 2 : func checkOptions(opts *Options, path string) (strictWALTail bool, err error) {
1052 2 : f, err := opts.FS.Open(path)
1053 2 : if err != nil {
1054 0 : return false, err
1055 0 : }
1056 2 : defer f.Close()
1057 2 :
1058 2 : data, err := io.ReadAll(f)
1059 2 : if err != nil {
1060 0 : return false, err
1061 0 : }
1062 2 : return opts.checkOptions(string(data))
1063 : }
1064 :
1065 : // DBDesc briefly describes high-level state about a database.
1066 : type DBDesc struct {
1067 : // Exists is true if an existing database was found.
1068 : Exists bool
1069 : // FormatMajorVersion indicates the database's current format
1070 : // version.
1071 : FormatMajorVersion FormatMajorVersion
1072 : // ManifestFilename is the filename of the current active manifest,
1073 : // if the database exists.
1074 : ManifestFilename string
1075 : }
1076 :
1077 : // Peek looks for an existing database in dirname on the provided FS. It
1078 : // returns a brief description of the database. Peek is read-only and
1079 : // does not open the database
1080 1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
1081 1 : vers, versMarker, err := lookupFormatMajorVersion(fs, dirname)
1082 1 : if err != nil {
1083 1 : return nil, err
1084 1 : }
1085 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1086 : // PeekMarker variant that avoids opening the directory.
1087 1 : if err := versMarker.Close(); err != nil {
1088 0 : return nil, err
1089 0 : }
1090 :
1091 : // Find the currently active manifest, if there is one.
1092 1 : manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname)
1093 1 : if err != nil {
1094 0 : return nil, err
1095 0 : }
1096 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1097 : // PeekMarker variant that avoids opening the directory.
1098 1 : if err := manifestMarker.Close(); err != nil {
1099 0 : return nil, err
1100 0 : }
1101 :
1102 1 : desc := &DBDesc{
1103 1 : Exists: exists,
1104 1 : FormatMajorVersion: vers,
1105 1 : }
1106 1 : if exists {
1107 1 : desc.ManifestFilename = base.MakeFilepath(fs, dirname, fileTypeManifest, manifestFileNum)
1108 1 : }
1109 1 : return desc, nil
1110 : }
1111 :
1112 : // LockDirectory acquires the database directory lock in the named directory,
1113 : // preventing another process from opening the database. LockDirectory returns a
1114 : // handle to the held lock that may be passed to Open through Options.Lock to
1115 : // subsequently open the database, skipping lock acquistion during Open.
1116 : //
1117 : // LockDirectory may be used to expand the critical section protected by the
1118 : // database lock to include setup before the call to Open.
1119 2 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
1120 2 : fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, fileTypeLock, base.FileNum(0).DiskFileNum()))
1121 2 : if err != nil {
1122 1 : return nil, err
1123 1 : }
1124 2 : l := &Lock{dirname: dirname, fileLock: fileLock}
1125 2 : l.refs.Store(1)
1126 2 : invariants.SetFinalizer(l, func(obj interface{}) {
1127 2 : if refs := obj.(*Lock).refs.Load(); refs > 0 {
1128 0 : panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
1129 : }
1130 : })
1131 2 : return l, nil
1132 : }
1133 :
1134 : // Lock represents a file lock on a directory. It may be passed to Open through
1135 : // Options.Lock to elide lock aquisition during Open.
1136 : type Lock struct {
1137 : dirname string
1138 : fileLock io.Closer
1139 : // refs is a count of the number of handles on the lock. refs must be 0, 1
1140 : // or 2.
1141 : //
1142 : // When acquired by the client and passed to Open, refs = 1 and the Open
1143 : // call increments it to 2. When the database is closed, it's decremented to
1144 : // 1. Finally when the original caller, calls Close on the Lock, it's
1145 : // drecemented to zero and the underlying file lock is released.
1146 : //
1147 : // When Open acquires the file lock, refs remains at 1 until the database is
1148 : // closed.
1149 : refs atomic.Int32
1150 : }
1151 :
1152 1 : func (l *Lock) refForOpen() error {
1153 1 : // During Open, when a user passed in a lock, the reference count must be
1154 1 : // exactly 1. If it's zero, the lock is no longer held and is invalid. If
1155 1 : // it's 2, the lock is already in use by another database within the
1156 1 : // process.
1157 1 : if !l.refs.CompareAndSwap(1, 2) {
1158 1 : return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
1159 1 : }
1160 1 : return nil
1161 : }
1162 :
1163 : // Close releases the lock, permitting another process to lock and open the
1164 : // database. Close must not be called until after a database using the Lock has
1165 : // been closed.
1166 2 : func (l *Lock) Close() error {
1167 2 : if l.refs.Add(-1) > 0 {
1168 1 : return nil
1169 1 : }
1170 2 : defer func() { l.fileLock = nil }()
1171 2 : return l.fileLock.Close()
1172 : }
1173 :
1174 : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
1175 : // does not exist.
1176 : //
1177 : // Note that errors can be wrapped with more details; use errors.Is().
1178 : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
1179 :
1180 : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
1181 : // already exists.
1182 : //
1183 : // Note that errors can be wrapped with more details; use errors.Is().
1184 : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
1185 :
1186 : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
1187 : // already exists and is not pristine.
1188 : //
1189 : // Note that errors can be wrapped with more details; use errors.Is().
1190 : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
1191 :
1192 : // IsCorruptionError returns true if the given error indicates database
1193 : // corruption.
1194 1 : func IsCorruptionError(err error) bool {
1195 1 : return errors.Is(err, base.ErrCorruption)
1196 1 : }
1197 :
1198 2 : func checkConsistency(v *manifest.Version, dirname string, objProvider objstorage.Provider) error {
1199 2 : var errs []error
1200 2 : dedup := make(map[base.DiskFileNum]struct{})
1201 2 : for level, files := range v.Levels {
1202 2 : iter := files.Iter()
1203 2 : for f := iter.First(); f != nil; f = iter.Next() {
1204 2 : backingState := f.FileBacking
1205 2 : if _, ok := dedup[backingState.DiskFileNum]; ok {
1206 2 : continue
1207 : }
1208 2 : dedup[backingState.DiskFileNum] = struct{}{}
1209 2 : fileNum := backingState.DiskFileNum
1210 2 : fileSize := backingState.Size
1211 2 : // We skip over remote objects; those are instead checked asynchronously
1212 2 : // by the table stats loading job.
1213 2 : meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
1214 2 : var size int64
1215 2 : if err == nil {
1216 2 : if meta.IsRemote() {
1217 2 : continue
1218 : }
1219 2 : size, err = objProvider.Size(meta)
1220 : }
1221 2 : if err != nil {
1222 1 : errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
1223 1 : continue
1224 : }
1225 :
1226 2 : if size != int64(fileSize) {
1227 0 : errs = append(errs, errors.Errorf(
1228 0 : "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
1229 0 : errors.Safe(level), fileNum, objProvider.Path(meta),
1230 0 : errors.Safe(size), errors.Safe(fileSize)))
1231 0 : continue
1232 : }
1233 : }
1234 : }
1235 2 : return errors.Join(errs...)
1236 : }
|